1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17};
18
19use anyhow::{Context, Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use tracing::{Span, trace_span};
27use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
28use turbo_tasks::{
29 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
30 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
31 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, ValueTypeId,
32 backend::{
33 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
34 TurboTasksExecutionError, TypedCellContent, VerificationMode,
35 },
36 event::{Event, EventDescription, EventListener},
37 message_queue::TimingEvent,
38 registry::get_value_type,
39 scope::scope_and_block,
40 task_statistics::TaskStatisticsApi,
41 trace::TraceRawVcs,
42 turbo_tasks,
43 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
44};
45
46pub use self::{
47 operation::AnyOperation,
48 storage::{SpecificTaskDataCategory, TaskDataCategory},
49};
50#[cfg(feature = "trace_task_dirty")]
51use crate::backend::operation::TaskDirtyCause;
52use crate::{
53 backend::{
54 operation::{
55 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
56 CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
57 ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
58 TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
59 is_root_node, make_task_dirty_internal, prepare_new_children,
60 },
61 storage::Storage,
62 storage_schema::{TaskStorage, TaskStorageAccessors},
63 },
64 backing_storage::BackingStorage,
65 data::{
66 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
67 InProgressState, InProgressStateInner, OutputValue, TransientTask,
68 },
69 utils::{
70 arc_or_owned::ArcOrOwned,
71 chunked_vec::ChunkedVec,
72 dash_map_drop_contents::drop_contents,
73 dash_map_raw_entry::{RawEntry, raw_entry},
74 ptr_eq_arc::PtrEqArc,
75 shard_amount::compute_shard_amount,
76 sharded::Sharded,
77 swap_retain,
78 },
79};
80
81const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
85
86const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
87
88static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92 .ok()
93 .and_then(|v| v.parse::<u64>().ok())
94 .map(Duration::from_millis)
95 .unwrap_or(Duration::from_secs(2))
96});
97
98struct SnapshotRequest {
99 snapshot_requested: bool,
100 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
101}
102
103impl SnapshotRequest {
104 fn new() -> Self {
105 Self {
106 snapshot_requested: false,
107 suspended_operations: FxHashSet::default(),
108 }
109 }
110}
111
112pub enum StorageMode {
113 ReadOnly,
115 ReadWrite,
118 ReadWriteOnShutdown,
121}
122
123pub struct BackendOptions {
124 pub dependency_tracking: bool,
129
130 pub active_tracking: bool,
136
137 pub storage_mode: Option<StorageMode>,
139
140 pub num_workers: Option<usize>,
143
144 pub small_preallocation: bool,
146}
147
148impl Default for BackendOptions {
149 fn default() -> Self {
150 Self {
151 dependency_tracking: true,
152 active_tracking: true,
153 storage_mode: Some(StorageMode::ReadWrite),
154 num_workers: None,
155 small_preallocation: false,
156 }
157 }
158}
159
160pub enum TurboTasksBackendJob {
161 InitialSnapshot,
162 FollowUpSnapshot,
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
168
169struct TurboTasksBackendInner<B: BackingStorage> {
170 options: BackendOptions,
171
172 start_time: Instant,
173
174 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
175 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
176
177 persisted_task_cache_log: Option<TaskCacheLog>,
178 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
179
180 storage: Storage,
181
182 local_is_partial: AtomicBool,
184
185 in_progress_operations: AtomicUsize,
191
192 snapshot_request: Mutex<SnapshotRequest>,
193 operations_suspended: Condvar,
197 snapshot_completed: Condvar,
200 last_snapshot: AtomicU64,
202
203 stopping: AtomicBool,
204 stopping_event: Event,
205 idle_start_event: Event,
206 idle_end_event: Event,
207 #[cfg(feature = "verify_aggregation_graph")]
208 is_idle: AtomicBool,
209
210 task_statistics: TaskStatisticsApi,
211
212 backing_storage: B,
213
214 #[cfg(feature = "verify_aggregation_graph")]
215 root_tasks: Mutex<FxHashSet<TaskId>>,
216}
217
218impl<B: BackingStorage> TurboTasksBackend<B> {
219 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
220 Self(Arc::new(TurboTasksBackendInner::new(
221 options,
222 backing_storage,
223 )))
224 }
225
226 pub fn backing_storage(&self) -> &B {
227 &self.0.backing_storage
228 }
229}
230
231impl<B: BackingStorage> TurboTasksBackendInner<B> {
232 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
233 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
234 let need_log = matches!(
235 options.storage_mode,
236 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
237 );
238 if !options.dependency_tracking {
239 options.active_tracking = false;
240 }
241 let small_preallocation = options.small_preallocation;
242 let next_task_id = backing_storage
243 .next_free_task_id()
244 .expect("Failed to get task id");
245 Self {
246 options,
247 start_time: Instant::now(),
248 persisted_task_id_factory: IdFactoryWithReuse::new(
249 next_task_id,
250 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
251 ),
252 transient_task_id_factory: IdFactoryWithReuse::new(
253 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
254 TaskId::MAX,
255 ),
256 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
257 task_cache: FxDashMap::default(),
258 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
259 storage: Storage::new(shard_amount, small_preallocation),
260 in_progress_operations: AtomicUsize::new(0),
261 snapshot_request: Mutex::new(SnapshotRequest::new()),
262 operations_suspended: Condvar::new(),
263 snapshot_completed: Condvar::new(),
264 last_snapshot: AtomicU64::new(0),
265 stopping: AtomicBool::new(false),
266 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
267 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
268 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
269 #[cfg(feature = "verify_aggregation_graph")]
270 is_idle: AtomicBool::new(false),
271 task_statistics: TaskStatisticsApi::default(),
272 backing_storage,
273 #[cfg(feature = "verify_aggregation_graph")]
274 root_tasks: Default::default(),
275 }
276 }
277
278 fn execute_context<'a>(
279 &'a self,
280 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
281 ) -> impl ExecuteContext<'a> {
282 ExecuteContextImpl::new(self, turbo_tasks)
283 }
284
285 unsafe fn execute_context_with_tx<'e, 'tx>(
289 &'e self,
290 tx: Option<&'e B::ReadTransaction<'tx>>,
291 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
292 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
293 where
294 'tx: 'e,
295 {
296 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
298 }
299
300 fn suspending_requested(&self) -> bool {
301 self.should_persist()
302 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
303 }
304
305 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
306 #[cold]
307 fn operation_suspend_point_cold<B: BackingStorage>(
308 this: &TurboTasksBackendInner<B>,
309 suspend: impl FnOnce() -> AnyOperation,
310 ) {
311 let operation = Arc::new(suspend());
312 let mut snapshot_request = this.snapshot_request.lock();
313 if snapshot_request.snapshot_requested {
314 snapshot_request
315 .suspended_operations
316 .insert(operation.clone().into());
317 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
318 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
319 if value == SNAPSHOT_REQUESTED_BIT {
320 this.operations_suspended.notify_all();
321 }
322 this.snapshot_completed
323 .wait_while(&mut snapshot_request, |snapshot_request| {
324 snapshot_request.snapshot_requested
325 });
326 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
327 snapshot_request
328 .suspended_operations
329 .remove(&operation.into());
330 }
331 }
332
333 if self.suspending_requested() {
334 operation_suspend_point_cold(self, suspend);
335 }
336 }
337
338 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
339 if !self.should_persist() {
340 return OperationGuard { backend: None };
341 }
342 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
343 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
344 let mut snapshot_request = self.snapshot_request.lock();
345 if snapshot_request.snapshot_requested {
346 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
347 if value == SNAPSHOT_REQUESTED_BIT {
348 self.operations_suspended.notify_all();
349 }
350 self.snapshot_completed
351 .wait_while(&mut snapshot_request, |snapshot_request| {
352 snapshot_request.snapshot_requested
353 });
354 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
355 }
356 }
357 OperationGuard {
358 backend: Some(self),
359 }
360 }
361
362 fn should_persist(&self) -> bool {
363 matches!(
364 self.options.storage_mode,
365 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
366 )
367 }
368
369 fn should_restore(&self) -> bool {
370 self.options.storage_mode.is_some()
371 }
372
373 fn should_track_dependencies(&self) -> bool {
374 self.options.dependency_tracking
375 }
376
377 fn should_track_activeness(&self) -> bool {
378 self.options.active_tracking
379 }
380
381 fn track_cache_hit(&self, task_type: &CachedTaskType) {
382 self.task_statistics
383 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
384 }
385
386 fn track_cache_miss(&self, task_type: &CachedTaskType) {
387 self.task_statistics
388 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
389 }
390}
391
392pub(crate) struct OperationGuard<'a, B: BackingStorage> {
393 backend: Option<&'a TurboTasksBackendInner<B>>,
394}
395
396impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
397 fn drop(&mut self) {
398 if let Some(backend) = self.backend {
399 let fetch_sub = backend
400 .in_progress_operations
401 .fetch_sub(1, Ordering::AcqRel);
402 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
403 backend.operations_suspended.notify_all();
404 }
405 }
406 }
407}
408
409struct TaskExecutionCompletePrepareResult {
411 pub new_children: FxHashSet<TaskId>,
412 pub is_now_immutable: bool,
413 #[cfg(feature = "verify_determinism")]
414 pub no_output_set: bool,
415 pub new_output: Option<OutputValue>,
416 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
417}
418
419impl<B: BackingStorage> TurboTasksBackendInner<B> {
421 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
425 &'l self,
426 tx: Option<&'l B::ReadTransaction<'tx>>,
427 parent_task: Option<TaskId>,
428 child_task: TaskId,
429 task_type: Option<ArcOrOwned<CachedTaskType>>,
430 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
431 ) {
432 operation::ConnectChildOperation::run(parent_task, child_task, task_type, unsafe {
433 self.execute_context_with_tx(tx, turbo_tasks)
434 });
435 }
436
437 fn connect_child(
438 &self,
439 parent_task: Option<TaskId>,
440 child_task: TaskId,
441 task_type: Option<ArcOrOwned<CachedTaskType>>,
442 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
443 ) {
444 operation::ConnectChildOperation::run(
445 parent_task,
446 child_task,
447 task_type,
448 self.execute_context(turbo_tasks),
449 );
450 }
451
452 fn try_read_task_output(
453 self: &Arc<Self>,
454 task_id: TaskId,
455 reader: Option<TaskId>,
456 options: ReadOutputOptions,
457 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
458 ) -> Result<Result<RawVc, EventListener>> {
459 self.assert_not_persistent_calling_transient(reader, task_id, None);
460
461 let mut ctx = self.execute_context(turbo_tasks);
462 let need_reader_task = if self.should_track_dependencies()
463 && !matches!(options.tracking, ReadTracking::Untracked)
464 && reader.is_some_and(|reader_id| reader_id != task_id)
465 && let Some(reader_id) = reader
466 && reader_id != task_id
467 {
468 Some(reader_id)
469 } else {
470 None
471 };
472 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
473 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
477 (task, Some(reader))
478 } else {
479 (ctx.task(task_id, TaskDataCategory::All), None)
480 };
481
482 fn listen_to_done_event(
483 reader_description: Option<EventDescription>,
484 tracking: ReadTracking,
485 done_event: &Event,
486 ) -> EventListener {
487 done_event.listen_with_note(move || {
488 move || {
489 if let Some(reader_description) = reader_description.as_ref() {
490 format!(
491 "try_read_task_output from {} ({})",
492 reader_description, tracking
493 )
494 } else {
495 format!("try_read_task_output ({})", tracking)
496 }
497 }
498 })
499 }
500
501 fn check_in_progress(
502 task: &impl TaskGuard,
503 reader_description: Option<EventDescription>,
504 tracking: ReadTracking,
505 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
506 {
507 match task.get_in_progress() {
508 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
509 listen_to_done_event(reader_description, tracking, done_event),
510 ))),
511 Some(InProgressState::InProgress(box InProgressStateInner {
512 done_event, ..
513 })) => Some(Ok(Err(listen_to_done_event(
514 reader_description,
515 tracking,
516 done_event,
517 )))),
518 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
519 "{} was canceled",
520 task.get_task_description()
521 ))),
522 None => None,
523 }
524 }
525
526 if matches!(options.consistency, ReadConsistency::Strong) {
527 loop {
529 let aggregation_number = get_aggregation_number(&task);
530 if is_root_node(aggregation_number) {
531 break;
532 }
533 drop(task);
534 drop(reader_task);
535 {
536 let _span = tracing::trace_span!(
537 "make root node for strongly consistent read",
538 %task_id
539 )
540 .entered();
541 AggregationUpdateQueue::run(
542 AggregationUpdateJob::UpdateAggregationNumber {
543 task_id,
544 base_aggregation_number: u32::MAX,
545 distance: None,
546 },
547 &mut ctx,
548 );
549 }
550 (task, reader_task) = if let Some(reader_id) = need_reader_task {
551 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
553 (task, Some(reader))
554 } else {
555 (ctx.task(task_id, TaskDataCategory::All), None)
556 }
557 }
558
559 let is_dirty = task.is_dirty();
560
561 let has_dirty_containers = task.has_dirty_containers();
563 if has_dirty_containers || is_dirty.is_some() {
564 let activeness = task.get_activeness_mut();
565 let mut task_ids_to_schedule: Vec<_> = Vec::new();
566 let activeness = if let Some(activeness) = activeness {
568 activeness.set_active_until_clean();
572 activeness
573 } else {
574 if ctx.should_track_activeness() {
578 task_ids_to_schedule = task.dirty_containers().collect();
580 task_ids_to_schedule.push(task_id);
581 }
582 let activeness =
583 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
584 activeness.set_active_until_clean();
585 activeness
586 };
587 let listener = activeness.all_clean_event.listen_with_note(move || {
588 let this = self.clone();
589 let tt = turbo_tasks.pin();
590 move || {
591 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
592 let mut ctx = this.execute_context(tt);
593 let mut visited = FxHashSet::default();
594 fn indent(s: &str) -> String {
595 s.split_inclusive('\n')
596 .flat_map(|line: &str| [" ", line].into_iter())
597 .collect::<String>()
598 }
599 fn get_info(
600 ctx: &mut impl ExecuteContext<'_>,
601 task_id: TaskId,
602 parent_and_count: Option<(TaskId, i32)>,
603 visited: &mut FxHashSet<TaskId>,
604 ) -> String {
605 let task = ctx.task(task_id, TaskDataCategory::All);
606 let is_dirty = task.is_dirty();
607 let in_progress =
608 task.get_in_progress()
609 .map_or("not in progress", |p| match p {
610 InProgressState::InProgress(_) => "in progress",
611 InProgressState::Scheduled { .. } => "scheduled",
612 InProgressState::Canceled => "canceled",
613 });
614 let activeness = task.get_activeness().map_or_else(
615 || "not active".to_string(),
616 |activeness| format!("{activeness:?}"),
617 );
618 let aggregation_number = get_aggregation_number(&task);
619 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
620 {
621 let uppers = get_uppers(&task);
622 !uppers.contains(&parent_task_id)
623 } else {
624 false
625 };
626
627 let has_dirty_containers = task.has_dirty_containers();
629
630 let task_description = task.get_task_description();
631 let is_dirty_label = if let Some(parent_priority) = is_dirty {
632 format!(", dirty({parent_priority})")
633 } else {
634 String::new()
635 };
636 let has_dirty_containers_label = if has_dirty_containers {
637 ", dirty containers"
638 } else {
639 ""
640 };
641 let count = if let Some((_, count)) = parent_and_count {
642 format!(" {count}")
643 } else {
644 String::new()
645 };
646 let mut info = format!(
647 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
648 {in_progress}, \
649 {activeness}{is_dirty_label}{has_dirty_containers_label})",
650 );
651 let children: Vec<_> = task.dirty_containers_with_count().collect();
652 drop(task);
653
654 if missing_upper {
655 info.push_str("\n ERROR: missing upper connection");
656 }
657
658 if has_dirty_containers || !children.is_empty() {
659 writeln!(info, "\n dirty tasks:").unwrap();
660
661 for (child_task_id, count) in children {
662 let task_description = ctx
663 .task(child_task_id, TaskDataCategory::Data)
664 .get_task_description();
665 if visited.insert(child_task_id) {
666 let child_info = get_info(
667 ctx,
668 child_task_id,
669 Some((task_id, count)),
670 visited,
671 );
672 info.push_str(&indent(&child_info));
673 if !info.ends_with('\n') {
674 info.push('\n');
675 }
676 } else {
677 writeln!(
678 info,
679 " {child_task_id} {task_description} {count} \
680 (already visited)"
681 )
682 .unwrap();
683 }
684 }
685 }
686 info
687 }
688 let info = get_info(&mut ctx, task_id, None, &mut visited);
689 format!(
690 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
691 )
692 }
693 });
694 drop(reader_task);
695 drop(task);
696 if !task_ids_to_schedule.is_empty() {
697 let mut queue = AggregationUpdateQueue::new();
698 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
699 queue.execute(&mut ctx);
700 }
701
702 return Ok(Err(listener));
703 }
704 }
705
706 let reader_description = reader_task
707 .as_ref()
708 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
709 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
710 {
711 return value;
712 }
713
714 if let Some(output) = task.get_output() {
715 let result = match output {
716 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
717 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
718 OutputValue::Error(error) => Err(error
719 .with_task_context(task.get_task_type(), Some(task_id))
720 .into()),
721 };
722 if let Some(mut reader_task) = reader_task
723 && options.tracking.should_track(result.is_err())
724 && (!task.immutable() || cfg!(feature = "verify_immutable"))
725 {
726 #[cfg(feature = "trace_task_output_dependencies")]
727 let _span = tracing::trace_span!(
728 "add output dependency",
729 task = %task_id,
730 dependent_task = ?reader
731 )
732 .entered();
733 let mut queue = LeafDistanceUpdateQueue::new();
734 let reader = reader.unwrap();
735 if task.add_output_dependent(reader) {
736 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
738 let reader_leaf_distance =
739 reader_task.get_leaf_distance().copied().unwrap_or_default();
740 if reader_leaf_distance.distance <= leaf_distance.distance {
741 queue.push(
742 reader,
743 leaf_distance.distance,
744 leaf_distance.max_distance_in_buffer,
745 );
746 }
747 }
748
749 drop(task);
750
751 if !reader_task.remove_outdated_output_dependencies(&task_id) {
757 let _ = reader_task.add_output_dependencies(task_id);
758 }
759 drop(reader_task);
760
761 queue.execute(&mut ctx);
762 }
763
764 return result;
765 }
766 drop(reader_task);
767
768 let note = EventDescription::new(|| {
769 move || {
770 if let Some(reader) = reader_description.as_ref() {
771 format!("try_read_task_output (recompute) from {reader}",)
772 } else {
773 "try_read_task_output (recompute, untracked)".to_string()
774 }
775 }
776 });
777
778 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
780 TaskExecutionReason::OutputNotAvailable,
781 EventDescription::new(|| task.get_task_desc_fn()),
782 note,
783 );
784
785 let old = task.set_in_progress(in_progress_state);
788 debug_assert!(old.is_none(), "InProgress already exists");
789 ctx.schedule_task(task, TaskPriority::Initial);
790
791 Ok(Err(listener))
792 }
793
794 fn try_read_task_cell(
795 &self,
796 task_id: TaskId,
797 reader: Option<TaskId>,
798 cell: CellId,
799 options: ReadCellOptions,
800 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
801 ) -> Result<Result<TypedCellContent, EventListener>> {
802 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
803
804 fn add_cell_dependency(
805 task_id: TaskId,
806 mut task: impl TaskGuard,
807 reader: Option<TaskId>,
808 reader_task: Option<impl TaskGuard>,
809 cell: CellId,
810 key: Option<u64>,
811 ) {
812 if let Some(mut reader_task) = reader_task
813 && (!task.immutable() || cfg!(feature = "verify_immutable"))
814 {
815 let reader = reader.unwrap();
816 let _ = task.add_cell_dependents((cell, key, reader));
817 drop(task);
818
819 let target = CellRef {
825 task: task_id,
826 cell,
827 };
828 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
829 let _ = reader_task.add_cell_dependencies((target, key));
830 }
831 drop(reader_task);
832 }
833 }
834
835 let ReadCellOptions {
836 is_serializable_cell_content,
837 tracking,
838 final_read_hint,
839 } = options;
840
841 let mut ctx = self.execute_context(turbo_tasks);
842 let (mut task, reader_task) = if self.should_track_dependencies()
843 && !matches!(tracking, ReadCellTracking::Untracked)
844 && let Some(reader_id) = reader
845 && reader_id != task_id
846 {
847 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
851 (task, Some(reader))
852 } else {
853 (ctx.task(task_id, TaskDataCategory::All), None)
854 };
855
856 let content = if final_read_hint {
857 task.remove_cell_data(is_serializable_cell_content, cell)
858 } else {
859 task.get_cell_data(is_serializable_cell_content, cell)
860 };
861 if let Some(content) = content {
862 if tracking.should_track(false) {
863 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
864 }
865 return Ok(Ok(TypedCellContent(
866 cell.type_id,
867 CellContent(Some(content.reference)),
868 )));
869 }
870
871 let in_progress = task.get_in_progress();
872 if matches!(
873 in_progress,
874 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
875 ) {
876 return Ok(Err(self
877 .listen_to_cell(&mut task, task_id, &reader_task, cell)
878 .0));
879 }
880 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
881
882 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
884 let Some(max_id) = max_id else {
885 let task_desc = task.get_task_description();
886 if tracking.should_track(true) {
887 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
888 }
889 bail!(
890 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
891 );
892 };
893 if cell.index >= max_id {
894 let task_desc = task.get_task_description();
895 if tracking.should_track(true) {
896 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
897 }
898 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
899 }
900
901 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
906 drop(reader_task);
907 if !new_listener {
908 return Ok(Err(listener));
909 }
910
911 let _span = tracing::trace_span!(
912 "recomputation",
913 cell_type = get_value_type(cell.type_id).global_name,
914 cell_index = cell.index
915 )
916 .entered();
917
918 if is_cancelled {
920 bail!("{} was canceled", task.get_task_description());
921 }
922
923 let _ = task.add_scheduled(
924 TaskExecutionReason::CellNotAvailable,
925 EventDescription::new(|| task.get_task_desc_fn()),
926 );
927 ctx.schedule_task(task, TaskPriority::Initial);
928
929 Ok(Err(listener))
930 }
931
932 fn listen_to_cell(
933 &self,
934 task: &mut impl TaskGuard,
935 task_id: TaskId,
936 reader_task: &Option<impl TaskGuard>,
937 cell: CellId,
938 ) -> (EventListener, bool) {
939 let note = || {
940 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
941 move || {
942 if let Some(reader_desc) = reader_desc.as_ref() {
943 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
944 } else {
945 "try_read_task_cell (in progress, untracked)".to_string()
946 }
947 }
948 };
949 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
950 let listener = in_progress.event.listen_with_note(note);
952 return (listener, false);
953 }
954 let in_progress = InProgressCellState::new(task_id, cell);
955 let listener = in_progress.event.listen_with_note(note);
956 let old = task.insert_in_progress_cells(cell, in_progress);
957 debug_assert!(old.is_none(), "InProgressCell already exists");
958 (listener, true)
959 }
960
961 fn snapshot_and_persist(
962 &self,
963 parent_span: Option<tracing::Id>,
964 reason: &str,
965 ) -> Option<(Instant, bool)> {
966 let snapshot_span =
967 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
968 .entered();
969 let start = Instant::now();
970 debug_assert!(self.should_persist());
971
972 let suspended_operations;
973 {
974 let _span = tracing::info_span!("blocking").entered();
975 let mut snapshot_request = self.snapshot_request.lock();
976 snapshot_request.snapshot_requested = true;
977 let active_operations = self
978 .in_progress_operations
979 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
980 if active_operations != 0 {
981 self.operations_suspended
982 .wait_while(&mut snapshot_request, |_| {
983 self.in_progress_operations.load(Ordering::Relaxed)
984 != SNAPSHOT_REQUESTED_BIT
985 });
986 }
987 suspended_operations = snapshot_request
988 .suspended_operations
989 .iter()
990 .map(|op| op.arc().clone())
991 .collect::<Vec<_>>();
992 }
993 self.storage.start_snapshot();
994 let mut persisted_task_cache_log = self
995 .persisted_task_cache_log
996 .as_ref()
997 .map(|l| l.take(|i| i))
998 .unwrap_or_default();
999 let mut snapshot_request = self.snapshot_request.lock();
1000 snapshot_request.snapshot_requested = false;
1001 self.in_progress_operations
1002 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1003 self.snapshot_completed.notify_all();
1004 let snapshot_time = Instant::now();
1005 drop(snapshot_request);
1006
1007 #[cfg(feature = "print_cache_item_size")]
1008 #[derive(Default)]
1009 struct TaskCacheStats {
1010 data: usize,
1011 data_compressed: usize,
1012 data_count: usize,
1013 meta: usize,
1014 meta_compressed: usize,
1015 meta_count: usize,
1016 upper_count: usize,
1017 collectibles_count: usize,
1018 aggregated_collectibles_count: usize,
1019 children_count: usize,
1020 followers_count: usize,
1021 collectibles_dependents_count: usize,
1022 aggregated_dirty_containers_count: usize,
1023 output_size: usize,
1024 }
1025 #[cfg(feature = "print_cache_item_size")]
1026 impl TaskCacheStats {
1027 fn compressed_size(data: &[u8]) -> Result<usize> {
1028 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1029 data,
1030 &mut Vec::new(),
1031 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1032 )?)
1033 }
1034
1035 fn add_data(&mut self, data: &[u8]) {
1036 self.data += data.len();
1037 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1038 self.data_count += 1;
1039 }
1040
1041 fn add_meta(&mut self, data: &[u8]) {
1042 self.meta += data.len();
1043 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1044 self.meta_count += 1;
1045 }
1046
1047 fn add_counts(&mut self, storage: &TaskStorage) {
1048 let counts = storage.meta_counts();
1049 self.upper_count += counts.upper;
1050 self.collectibles_count += counts.collectibles;
1051 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1052 self.children_count += counts.children;
1053 self.followers_count += counts.followers;
1054 self.collectibles_dependents_count += counts.collectibles_dependents;
1055 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1056 if let Some(output) = storage.get_output() {
1057 use turbo_bincode::turbo_bincode_encode;
1058
1059 self.output_size += turbo_bincode_encode(&output)
1060 .map(|data| data.len())
1061 .unwrap_or(0);
1062 }
1063 }
1064 }
1065 #[cfg(feature = "print_cache_item_size")]
1066 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1067 Mutex::new(FxHashMap::default());
1068
1069 let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1070 if task_id.is_transient() {
1071 return (None, None);
1072 }
1073
1074 let meta_restored = inner.flags.meta_restored();
1075 let data_restored = inner.flags.data_restored();
1076
1077 let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1079 let data = data_restored.then(|| inner.clone_data_snapshot());
1080
1081 (meta, data)
1082 };
1083 let process = |task_id: TaskId,
1084 (meta, data): (Option<TaskStorage>, Option<TaskStorage>),
1085 buffer: &mut TurboBincodeBuffer| {
1086 #[cfg(feature = "print_cache_item_size")]
1087 if let Some(ref m) = meta {
1088 task_cache_stats
1089 .lock()
1090 .entry(self.debug_get_task_name(task_id))
1091 .or_default()
1092 .add_counts(m);
1093 }
1094 (
1095 task_id,
1096 meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
1097 data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
1098 )
1099 };
1100 let process_snapshot =
1101 |task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
1102 if task_id.is_transient() {
1103 return (task_id, None, None);
1104 }
1105
1106 #[cfg(feature = "print_cache_item_size")]
1107 if inner.flags.meta_modified() {
1108 task_cache_stats
1109 .lock()
1110 .entry(self.debug_get_task_name(task_id))
1111 .or_default()
1112 .add_counts(&inner);
1113 }
1114
1115 (
1117 task_id,
1118 inner.flags.meta_modified().then(|| {
1119 encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
1120 }),
1121 inner.flags.data_modified().then(|| {
1122 encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
1123 }),
1124 )
1125 };
1126
1127 let snapshot = self
1128 .storage
1129 .take_snapshot(&preprocess, &process, &process_snapshot);
1130
1131 let task_snapshots = snapshot
1132 .into_iter()
1133 .filter_map(|iter| {
1134 let mut iter = iter
1135 .filter_map(
1136 |(task_id, meta, data): (
1137 _,
1138 Option<Result<SmallVec<_>>>,
1139 Option<Result<SmallVec<_>>>,
1140 )| {
1141 let meta = match meta {
1142 Some(Ok(meta)) => {
1143 #[cfg(feature = "print_cache_item_size")]
1144 task_cache_stats
1145 .lock()
1146 .entry(self.debug_get_task_name(task_id))
1147 .or_default()
1148 .add_meta(&meta);
1149 Some(meta)
1150 }
1151 None => None,
1152 Some(Err(err)) => {
1153 println!(
1154 "Serializing task {} failed (meta): {:?}",
1155 self.debug_get_task_description(task_id),
1156 err
1157 );
1158 None
1159 }
1160 };
1161 let data = match data {
1162 Some(Ok(data)) => {
1163 #[cfg(feature = "print_cache_item_size")]
1164 task_cache_stats
1165 .lock()
1166 .entry(self.debug_get_task_name(task_id))
1167 .or_default()
1168 .add_data(&data);
1169 Some(data)
1170 }
1171 None => None,
1172 Some(Err(err)) => {
1173 println!(
1174 "Serializing task {} failed (data): {:?}",
1175 self.debug_get_task_description(task_id),
1176 err
1177 );
1178 None
1179 }
1180 };
1181 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1182 },
1183 )
1184 .peekable();
1185 iter.peek().is_some().then_some(iter)
1186 })
1187 .collect::<Vec<_>>();
1188
1189 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1190
1191 drop(snapshot_span);
1192
1193 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1194 return Some((snapshot_time, false));
1195 }
1196
1197 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1198 {
1199 if let Err(err) = self.backing_storage.save_snapshot(
1200 suspended_operations,
1201 persisted_task_cache_log,
1202 task_snapshots,
1203 ) {
1204 println!("Persisting failed: {err:?}");
1205 return None;
1206 }
1207 #[cfg(feature = "print_cache_item_size")]
1208 {
1209 let mut task_cache_stats = task_cache_stats
1210 .into_inner()
1211 .into_iter()
1212 .collect::<Vec<_>>();
1213 if !task_cache_stats.is_empty() {
1214 use turbo_tasks::util::FormatBytes;
1215
1216 use crate::utils::markdown_table::print_markdown_table;
1217
1218 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1219 (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1220 .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1221 });
1222 println!(
1223 "Task cache stats: {} ({})",
1224 FormatBytes(
1225 task_cache_stats
1226 .iter()
1227 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1228 .sum::<usize>()
1229 ),
1230 FormatBytes(
1231 task_cache_stats
1232 .iter()
1233 .map(|(_, s)| s.data + s.meta)
1234 .sum::<usize>()
1235 )
1236 );
1237
1238 print_markdown_table(
1239 [
1240 "Task",
1241 " Total Size",
1242 " Data Size",
1243 " Data Count x Avg",
1244 " Data Count x Avg",
1245 " Meta Size",
1246 " Meta Count x Avg",
1247 " Meta Count x Avg",
1248 " Uppers",
1249 " Coll",
1250 " Agg Coll",
1251 " Children",
1252 " Followers",
1253 " Coll Deps",
1254 " Agg Dirty",
1255 " Output Size",
1256 ],
1257 task_cache_stats.iter(),
1258 |(task_desc, stats)| {
1259 [
1260 task_desc.to_string(),
1261 format!(
1262 " {} ({})",
1263 FormatBytes(stats.data_compressed + stats.meta_compressed),
1264 FormatBytes(stats.data + stats.meta)
1265 ),
1266 format!(
1267 " {} ({})",
1268 FormatBytes(stats.data_compressed),
1269 FormatBytes(stats.data)
1270 ),
1271 format!(" {} x", stats.data_count,),
1272 format!(
1273 "{} ({})",
1274 FormatBytes(
1275 stats
1276 .data_compressed
1277 .checked_div(stats.data_count)
1278 .unwrap_or(0)
1279 ),
1280 FormatBytes(
1281 stats.data.checked_div(stats.data_count).unwrap_or(0)
1282 ),
1283 ),
1284 format!(
1285 " {} ({})",
1286 FormatBytes(stats.meta_compressed),
1287 FormatBytes(stats.meta)
1288 ),
1289 format!(" {} x", stats.meta_count,),
1290 format!(
1291 "{} ({})",
1292 FormatBytes(
1293 stats
1294 .meta_compressed
1295 .checked_div(stats.meta_count)
1296 .unwrap_or(0)
1297 ),
1298 FormatBytes(
1299 stats.meta.checked_div(stats.meta_count).unwrap_or(0)
1300 ),
1301 ),
1302 format!(" {}", stats.upper_count),
1303 format!(" {}", stats.collectibles_count),
1304 format!(" {}", stats.aggregated_collectibles_count),
1305 format!(" {}", stats.children_count),
1306 format!(" {}", stats.followers_count),
1307 format!(" {}", stats.collectibles_dependents_count),
1308 format!(" {}", stats.aggregated_dirty_containers_count),
1309 format!(" {}", FormatBytes(stats.output_size)),
1310 ]
1311 },
1312 );
1313 }
1314 }
1315 }
1316
1317 let elapsed = start.elapsed();
1318 if elapsed > Duration::from_secs(10) {
1320 turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1321 "Finished writing to filesystem cache".to_string(),
1322 elapsed,
1323 )));
1324 }
1325
1326 Some((snapshot_time, true))
1327 }
1328
1329 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1330 if self.should_restore() {
1331 let uncompleted_operations = self
1335 .backing_storage
1336 .uncompleted_operations()
1337 .expect("Failed to get uncompleted operations");
1338 if !uncompleted_operations.is_empty() {
1339 let mut ctx = self.execute_context(turbo_tasks);
1340 for op in uncompleted_operations {
1341 op.execute(&mut ctx);
1342 }
1343 }
1344 }
1345
1346 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1349 let _span = trace_span!("persisting background job").entered();
1351 let _span = tracing::info_span!("thread").entered();
1352 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1353 }
1354 }
1355
1356 fn stopping(&self) {
1357 self.stopping.store(true, Ordering::Release);
1358 self.stopping_event.notify(usize::MAX);
1359 }
1360
1361 #[allow(unused_variables)]
1362 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1363 #[cfg(feature = "verify_aggregation_graph")]
1364 {
1365 self.is_idle.store(false, Ordering::Release);
1366 self.verify_aggregation_graph(turbo_tasks, false);
1367 }
1368 if self.should_persist() {
1369 self.snapshot_and_persist(Span::current().into(), "stop");
1370 }
1371 drop_contents(&self.task_cache);
1372 self.storage.drop_contents();
1373 if let Err(err) = self.backing_storage.shutdown() {
1374 println!("Shutting down failed: {err}");
1375 }
1376 }
1377
1378 #[allow(unused_variables)]
1379 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1380 self.idle_start_event.notify(usize::MAX);
1381
1382 #[cfg(feature = "verify_aggregation_graph")]
1383 {
1384 use tokio::select;
1385
1386 self.is_idle.store(true, Ordering::Release);
1387 let this = self.clone();
1388 let turbo_tasks = turbo_tasks.pin();
1389 tokio::task::spawn(async move {
1390 select! {
1391 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1392 }
1394 _ = this.idle_end_event.listen() => {
1395 return;
1396 }
1397 }
1398 if !this.is_idle.load(Ordering::Relaxed) {
1399 return;
1400 }
1401 this.verify_aggregation_graph(&*turbo_tasks, true);
1402 });
1403 }
1404 }
1405
1406 fn idle_end(&self) {
1407 #[cfg(feature = "verify_aggregation_graph")]
1408 self.is_idle.store(false, Ordering::Release);
1409 self.idle_end_event.notify(usize::MAX);
1410 }
1411
1412 fn get_or_create_persistent_task(
1413 &self,
1414 task_type: CachedTaskType,
1415 parent_task: Option<TaskId>,
1416 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1417 ) -> TaskId {
1418 if let Some(task_id) = self.task_cache.get(&task_type) {
1420 let task_id = *task_id;
1421 self.track_cache_hit(&task_type);
1422 self.connect_child(
1423 parent_task,
1424 task_id,
1425 Some(ArcOrOwned::Owned(task_type)),
1426 turbo_tasks,
1427 );
1428 return task_id;
1429 }
1430
1431 let check_backing_storage =
1432 self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1433 let tx = check_backing_storage
1434 .then(|| self.backing_storage.start_read_transaction())
1435 .flatten();
1436 let (task_id, task_type) = {
1437 if let Some(task_id) = unsafe {
1439 check_backing_storage
1440 .then(|| {
1441 self.backing_storage
1442 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1443 .expect("Failed to lookup task id")
1444 })
1445 .flatten()
1446 } {
1447 self.track_cache_hit(&task_type);
1450 let task_type = match raw_entry(&self.task_cache, &task_type) {
1451 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1452 RawEntry::Vacant(e) => {
1453 let task_type = Arc::new(task_type);
1454 e.insert(task_type.clone(), task_id);
1455 ArcOrOwned::Arc(task_type)
1456 }
1457 };
1458 (task_id, task_type)
1459 } else {
1460 let (task_id, mut task_type) = match raw_entry(&self.task_cache, &task_type) {
1463 RawEntry::Occupied(e) => {
1464 let task_id = *e.get();
1465 drop(e);
1466 self.track_cache_hit(&task_type);
1467 (task_id, ArcOrOwned::Owned(task_type))
1468 }
1469 RawEntry::Vacant(e) => {
1470 let task_type = Arc::new(task_type);
1471 let task_id = self.persisted_task_id_factory.get();
1472 e.insert(task_type.clone(), task_id);
1473 self.track_cache_miss(&task_type);
1474 (task_id, ArcOrOwned::Arc(task_type))
1475 }
1476 };
1477 if let Some(log) = &self.persisted_task_cache_log {
1478 let task_type_arc: Arc<CachedTaskType> = Arc::from(task_type);
1479 log.lock(task_id).push((task_type_arc.clone(), task_id));
1480 task_type = ArcOrOwned::Arc(task_type_arc);
1481 }
1482 (task_id, task_type)
1483 }
1484 };
1485
1486 unsafe {
1488 self.connect_child_with_tx(
1489 tx.as_ref(),
1490 parent_task,
1491 task_id,
1492 Some(task_type),
1493 turbo_tasks,
1494 )
1495 };
1496
1497 task_id
1498 }
1499
1500 fn get_or_create_transient_task(
1501 &self,
1502 task_type: CachedTaskType,
1503 parent_task: Option<TaskId>,
1504 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1505 ) -> TaskId {
1506 if let Some(parent_task) = parent_task
1507 && !parent_task.is_transient()
1508 {
1509 self.panic_persistent_calling_transient(
1510 self.debug_get_task_description(parent_task),
1511 Some(&task_type),
1512 None,
1513 );
1514 }
1515 if let Some(task_id) = self.task_cache.get(&task_type) {
1517 let task_id = *task_id;
1518 self.track_cache_hit(&task_type);
1519 self.connect_child(
1520 parent_task,
1521 task_id,
1522 Some(ArcOrOwned::Owned(task_type)),
1523 turbo_tasks,
1524 );
1525 return task_id;
1526 }
1527 match raw_entry(&self.task_cache, &task_type) {
1529 RawEntry::Occupied(e) => {
1530 let task_id = *e.get();
1531 drop(e);
1532 self.track_cache_hit(&task_type);
1533 self.connect_child(
1534 parent_task,
1535 task_id,
1536 Some(ArcOrOwned::Owned(task_type)),
1537 turbo_tasks,
1538 );
1539 task_id
1540 }
1541 RawEntry::Vacant(e) => {
1542 let task_type = Arc::new(task_type);
1543 let task_id = self.transient_task_id_factory.get();
1544 e.insert(task_type.clone(), task_id);
1545 self.track_cache_miss(&task_type);
1546 self.connect_child(
1547 parent_task,
1548 task_id,
1549 Some(ArcOrOwned::Arc(task_type)),
1550 turbo_tasks,
1551 );
1552
1553 task_id
1554 }
1555 }
1556 }
1557
1558 fn debug_trace_transient_task(
1561 &self,
1562 task_type: &CachedTaskType,
1563 cell_id: Option<CellId>,
1564 ) -> DebugTraceTransientTask {
1565 fn inner_id(
1568 backend: &TurboTasksBackendInner<impl BackingStorage>,
1569 task_id: TaskId,
1570 cell_type_id: Option<ValueTypeId>,
1571 visited_set: &mut FxHashSet<TaskId>,
1572 ) -> DebugTraceTransientTask {
1573 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1574 if visited_set.contains(&task_id) {
1575 let task_name = task_type.get_name();
1576 DebugTraceTransientTask::Collapsed {
1577 task_name,
1578 cell_type_id,
1579 }
1580 } else {
1581 inner_cached(backend, &task_type, cell_type_id, visited_set)
1582 }
1583 } else {
1584 DebugTraceTransientTask::Uncached { cell_type_id }
1585 }
1586 }
1587 fn inner_cached(
1588 backend: &TurboTasksBackendInner<impl BackingStorage>,
1589 task_type: &CachedTaskType,
1590 cell_type_id: Option<ValueTypeId>,
1591 visited_set: &mut FxHashSet<TaskId>,
1592 ) -> DebugTraceTransientTask {
1593 let task_name = task_type.get_name();
1594
1595 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1596 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1597 return None;
1601 };
1602 if task_id.is_transient() {
1603 Some(Box::new(inner_id(
1604 backend,
1605 task_id,
1606 cause_self_raw_vc.try_get_type_id(),
1607 visited_set,
1608 )))
1609 } else {
1610 None
1611 }
1612 });
1613 let cause_args = task_type
1614 .arg
1615 .get_raw_vcs()
1616 .into_iter()
1617 .filter_map(|raw_vc| {
1618 let Some(task_id) = raw_vc.try_get_task_id() else {
1619 return None;
1621 };
1622 if !task_id.is_transient() {
1623 return None;
1624 }
1625 Some((task_id, raw_vc.try_get_type_id()))
1626 })
1627 .collect::<IndexSet<_>>() .into_iter()
1629 .map(|(task_id, cell_type_id)| {
1630 inner_id(backend, task_id, cell_type_id, visited_set)
1631 })
1632 .collect();
1633
1634 DebugTraceTransientTask::Cached {
1635 task_name,
1636 cell_type_id,
1637 cause_self,
1638 cause_args,
1639 }
1640 }
1641 inner_cached(
1642 self,
1643 task_type,
1644 cell_id.map(|c| c.type_id),
1645 &mut FxHashSet::default(),
1646 )
1647 }
1648
1649 fn invalidate_task(
1650 &self,
1651 task_id: TaskId,
1652 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1653 ) {
1654 if !self.should_track_dependencies() {
1655 panic!("Dependency tracking is disabled so invalidation is not allowed");
1656 }
1657 operation::InvalidateOperation::run(
1658 smallvec![task_id],
1659 #[cfg(feature = "trace_task_dirty")]
1660 TaskDirtyCause::Invalidator,
1661 self.execute_context(turbo_tasks),
1662 );
1663 }
1664
1665 fn invalidate_tasks(
1666 &self,
1667 tasks: &[TaskId],
1668 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1669 ) {
1670 if !self.should_track_dependencies() {
1671 panic!("Dependency tracking is disabled so invalidation is not allowed");
1672 }
1673 operation::InvalidateOperation::run(
1674 tasks.iter().copied().collect(),
1675 #[cfg(feature = "trace_task_dirty")]
1676 TaskDirtyCause::Unknown,
1677 self.execute_context(turbo_tasks),
1678 );
1679 }
1680
1681 fn invalidate_tasks_set(
1682 &self,
1683 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1684 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1685 ) {
1686 if !self.should_track_dependencies() {
1687 panic!("Dependency tracking is disabled so invalidation is not allowed");
1688 }
1689 operation::InvalidateOperation::run(
1690 tasks.iter().copied().collect(),
1691 #[cfg(feature = "trace_task_dirty")]
1692 TaskDirtyCause::Unknown,
1693 self.execute_context(turbo_tasks),
1694 );
1695 }
1696
1697 fn invalidate_serialization(
1698 &self,
1699 task_id: TaskId,
1700 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1701 ) {
1702 if task_id.is_transient() {
1703 return;
1704 }
1705 let mut ctx = self.execute_context(turbo_tasks);
1706 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1707 task.invalidate_serialization();
1708 }
1709
1710 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1711 let task = self.storage.access_mut(task_id);
1712 if let Some(value) = task.get_persistent_task_type() {
1713 format!("{task_id:?} {}", value)
1714 } else if let Some(value) = task.get_transient_task_type() {
1715 format!("{task_id:?} {}", value)
1716 } else {
1717 format!("{task_id:?} unknown")
1718 }
1719 }
1720
1721 #[allow(dead_code)]
1722 fn debug_get_task_name(&self, task_id: TaskId) -> String {
1723 let task = self.storage.access_mut(task_id);
1724 if let Some(value) = task.get_persistent_task_type() {
1725 value.to_string()
1726 } else if let Some(value) = task.get_transient_task_type() {
1727 value.to_string()
1728 } else {
1729 "unknown".to_string()
1730 }
1731 }
1732
1733 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1734 let task = self.storage.access_mut(task_id);
1735 task.get_persistent_task_type().cloned()
1736 }
1737
1738 fn task_execution_canceled(
1739 &self,
1740 task_id: TaskId,
1741 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1742 ) {
1743 let mut ctx = self.execute_context(turbo_tasks);
1744 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1745 if let Some(in_progress) = task.take_in_progress() {
1746 match in_progress {
1747 InProgressState::Scheduled {
1748 done_event,
1749 reason: _,
1750 } => done_event.notify(usize::MAX),
1751 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1752 done_event.notify(usize::MAX)
1753 }
1754 InProgressState::Canceled => {}
1755 }
1756 }
1757 let old = task.set_in_progress(InProgressState::Canceled);
1758 debug_assert!(old.is_none(), "InProgress already exists");
1759 }
1760
1761 fn try_start_task_execution(
1762 &self,
1763 task_id: TaskId,
1764 priority: TaskPriority,
1765 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1766 ) -> Option<TaskExecutionSpec<'_>> {
1767 let execution_reason;
1768 let task_type;
1769 {
1770 let mut ctx = self.execute_context(turbo_tasks);
1771 let mut task = ctx.task(task_id, TaskDataCategory::All);
1772 task_type = task.get_task_type().to_owned();
1773 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1774 if let Some(tasks) = task.prefetch() {
1775 drop(task);
1776 ctx.prepare_tasks(tasks);
1777 task = ctx.task(task_id, TaskDataCategory::All);
1778 }
1779 let in_progress = task.take_in_progress()?;
1780 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1781 let old = task.set_in_progress(in_progress);
1782 debug_assert!(old.is_none(), "InProgress already exists");
1783 return None;
1784 };
1785 execution_reason = reason;
1786 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1787 InProgressStateInner {
1788 stale: false,
1789 once_task,
1790 done_event,
1791 session_dependent: false,
1792 marked_as_completed: false,
1793 new_children: Default::default(),
1794 },
1795 )));
1796 debug_assert!(old.is_none(), "InProgress already exists");
1797
1798 enum Collectible {
1800 Current(CollectibleRef, i32),
1801 Outdated(CollectibleRef),
1802 }
1803 let collectibles = task
1804 .iter_collectibles()
1805 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1806 .chain(
1807 task.iter_outdated_collectibles()
1808 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1809 )
1810 .collect::<Vec<_>>();
1811 for collectible in collectibles {
1812 match collectible {
1813 Collectible::Current(collectible, value) => {
1814 let _ = task.insert_outdated_collectible(collectible, value);
1815 }
1816 Collectible::Outdated(collectible) => {
1817 if task
1818 .collectibles()
1819 .is_none_or(|m| m.get(&collectible).is_none())
1820 {
1821 task.remove_outdated_collectibles(&collectible);
1822 }
1823 }
1824 }
1825 }
1826
1827 if self.should_track_dependencies() {
1828 let cell_dependencies = task.iter_cell_dependencies().collect();
1830 task.set_outdated_cell_dependencies(cell_dependencies);
1831
1832 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1833 task.set_outdated_output_dependencies(outdated_output_dependencies);
1834 }
1835 }
1836
1837 let (span, future) = match task_type {
1838 TaskType::Cached(task_type) => {
1839 let CachedTaskType {
1840 native_fn,
1841 this,
1842 arg,
1843 } = &*task_type;
1844 (
1845 native_fn.span(task_id.persistence(), execution_reason, priority),
1846 native_fn.execute(*this, &**arg),
1847 )
1848 }
1849 TaskType::Transient(task_type) => {
1850 let span = tracing::trace_span!("turbo_tasks::root_task");
1851 let future = match &*task_type {
1852 TransientTask::Root(f) => f(),
1853 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1854 };
1855 (span, future)
1856 }
1857 };
1858 Some(TaskExecutionSpec { future, span })
1859 }
1860
1861 fn task_execution_completed(
1862 &self,
1863 task_id: TaskId,
1864 result: Result<RawVc, TurboTasksExecutionError>,
1865 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1866 has_invalidator: bool,
1867 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1868 ) -> bool {
1869 #[cfg(not(feature = "trace_task_details"))]
1884 let span = tracing::trace_span!(
1885 "task execution completed",
1886 new_children = tracing::field::Empty
1887 )
1888 .entered();
1889 #[cfg(feature = "trace_task_details")]
1890 let span = tracing::trace_span!(
1891 "task execution completed",
1892 task_id = display(task_id),
1893 result = match result.as_ref() {
1894 Ok(value) => display(either::Either::Left(value)),
1895 Err(err) => display(either::Either::Right(err)),
1896 },
1897 new_children = tracing::field::Empty,
1898 immutable = tracing::field::Empty,
1899 new_output = tracing::field::Empty,
1900 output_dependents = tracing::field::Empty,
1901 stale = tracing::field::Empty,
1902 )
1903 .entered();
1904
1905 let is_error = result.is_err();
1906
1907 let mut ctx = self.execute_context(turbo_tasks);
1908
1909 let Some(TaskExecutionCompletePrepareResult {
1910 new_children,
1911 is_now_immutable,
1912 #[cfg(feature = "verify_determinism")]
1913 no_output_set,
1914 new_output,
1915 output_dependent_tasks,
1916 }) = self.task_execution_completed_prepare(
1917 &mut ctx,
1918 #[cfg(feature = "trace_task_details")]
1919 &span,
1920 task_id,
1921 result,
1922 cell_counters,
1923 has_invalidator,
1924 )
1925 else {
1926 #[cfg(feature = "trace_task_details")]
1928 span.record("stale", "prepare");
1929 return true;
1930 };
1931
1932 #[cfg(feature = "trace_task_details")]
1933 span.record("new_output", new_output.is_some());
1934 #[cfg(feature = "trace_task_details")]
1935 span.record("output_dependents", output_dependent_tasks.len());
1936
1937 if !output_dependent_tasks.is_empty() {
1942 self.task_execution_completed_invalidate_output_dependent(
1943 &mut ctx,
1944 task_id,
1945 output_dependent_tasks,
1946 );
1947 }
1948
1949 let has_new_children = !new_children.is_empty();
1950 span.record("new_children", new_children.len());
1951
1952 if has_new_children {
1953 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1954 }
1955
1956 if has_new_children
1957 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
1958 {
1959 #[cfg(feature = "trace_task_details")]
1961 span.record("stale", "connect");
1962 return true;
1963 }
1964
1965 let (stale, in_progress_cells) = self.task_execution_completed_finish(
1966 &mut ctx,
1967 task_id,
1968 #[cfg(feature = "verify_determinism")]
1969 no_output_set,
1970 new_output,
1971 is_now_immutable,
1972 );
1973 if stale {
1974 #[cfg(feature = "trace_task_details")]
1976 span.record("stale", "finish");
1977 return true;
1978 }
1979
1980 let removed_data =
1981 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
1982
1983 drop(removed_data);
1985 drop(in_progress_cells);
1986
1987 false
1988 }
1989
1990 fn task_execution_completed_prepare(
1991 &self,
1992 ctx: &mut impl ExecuteContext<'_>,
1993 #[cfg(feature = "trace_task_details")] span: &Span,
1994 task_id: TaskId,
1995 result: Result<RawVc, TurboTasksExecutionError>,
1996 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1997 has_invalidator: bool,
1998 ) -> Option<TaskExecutionCompletePrepareResult> {
1999 let mut task = ctx.task(task_id, TaskDataCategory::All);
2000 let Some(in_progress) = task.get_in_progress_mut() else {
2001 panic!("Task execution completed, but task is not in progress: {task:#?}");
2002 };
2003 if matches!(in_progress, InProgressState::Canceled) {
2004 return Some(TaskExecutionCompletePrepareResult {
2005 new_children: Default::default(),
2006 is_now_immutable: false,
2007 #[cfg(feature = "verify_determinism")]
2008 no_output_set: false,
2009 new_output: None,
2010 output_dependent_tasks: Default::default(),
2011 });
2012 }
2013 let &mut InProgressState::InProgress(box InProgressStateInner {
2014 stale,
2015 ref mut new_children,
2016 session_dependent,
2017 once_task: is_once_task,
2018 ..
2019 }) = in_progress
2020 else {
2021 panic!("Task execution completed, but task is not in progress: {task:#?}");
2022 };
2023
2024 #[cfg(not(feature = "no_fast_stale"))]
2026 if stale && !is_once_task {
2027 let Some(InProgressState::InProgress(box InProgressStateInner {
2028 done_event,
2029 mut new_children,
2030 ..
2031 })) = task.take_in_progress()
2032 else {
2033 unreachable!();
2034 };
2035 let old = task.set_in_progress(InProgressState::Scheduled {
2036 done_event,
2037 reason: TaskExecutionReason::Stale,
2038 });
2039 debug_assert!(old.is_none(), "InProgress already exists");
2040 for task in task.iter_children() {
2043 new_children.remove(&task);
2044 }
2045 drop(task);
2046
2047 AggregationUpdateQueue::run(
2050 AggregationUpdateJob::DecreaseActiveCounts {
2051 task_ids: new_children.into_iter().collect(),
2052 },
2053 ctx,
2054 );
2055 return None;
2056 }
2057
2058 let mut new_children = take(new_children);
2060
2061 if has_invalidator {
2063 task.set_invalidator(true);
2064 }
2065
2066 let old_counters: FxHashMap<_, _> = task
2068 .iter_cell_type_max_index()
2069 .map(|(&k, &v)| (k, v))
2070 .collect();
2071 let mut counters_to_remove = old_counters.clone();
2072
2073 for (&cell_type, &max_index) in cell_counters.iter() {
2074 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2075 if old_max_index != max_index {
2076 task.insert_cell_type_max_index(cell_type, max_index);
2077 }
2078 } else {
2079 task.insert_cell_type_max_index(cell_type, max_index);
2080 }
2081 }
2082 for (cell_type, _) in counters_to_remove {
2083 task.remove_cell_type_max_index(&cell_type);
2084 }
2085
2086 let mut queue = AggregationUpdateQueue::new();
2087
2088 let mut old_edges = Vec::new();
2089
2090 let has_children = !new_children.is_empty();
2091 let is_immutable = task.immutable();
2092 let task_dependencies_for_immutable =
2093 if !is_immutable
2095 && !session_dependent
2097 && !task.invalidator()
2099 && task.is_collectibles_dependencies_empty()
2101 {
2102 Some(
2103 task.iter_output_dependencies()
2105 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2106 .collect::<FxHashSet<_>>(),
2107 )
2108 } else {
2109 None
2110 };
2111
2112 if has_children {
2113 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2115
2116 old_edges.extend(
2118 task.iter_children()
2119 .filter(|task| !new_children.remove(task))
2120 .map(OutdatedEdge::Child),
2121 );
2122 } else {
2123 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2124 }
2125
2126 old_edges.extend(
2127 task.iter_outdated_collectibles()
2128 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2129 );
2130
2131 if self.should_track_dependencies() {
2132 old_edges.extend(
2139 task.iter_outdated_cell_dependencies()
2140 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2141 );
2142 old_edges.extend(
2143 task.iter_outdated_output_dependencies()
2144 .map(OutdatedEdge::OutputDependency),
2145 );
2146 }
2147
2148 let current_output = task.get_output();
2150 #[cfg(feature = "verify_determinism")]
2151 let no_output_set = current_output.is_none();
2152 let new_output = match result {
2153 Ok(RawVc::TaskOutput(output_task_id)) => {
2154 if let Some(OutputValue::Output(current_task_id)) = current_output
2155 && *current_task_id == output_task_id
2156 {
2157 None
2158 } else {
2159 Some(OutputValue::Output(output_task_id))
2160 }
2161 }
2162 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2163 if let Some(OutputValue::Cell(CellRef {
2164 task: current_task_id,
2165 cell: current_cell,
2166 })) = current_output
2167 && *current_task_id == output_task_id
2168 && *current_cell == cell
2169 {
2170 None
2171 } else {
2172 Some(OutputValue::Cell(CellRef {
2173 task: output_task_id,
2174 cell,
2175 }))
2176 }
2177 }
2178 Ok(RawVc::LocalOutput(..)) => {
2179 panic!("Non-local tasks must not return a local Vc");
2180 }
2181 Err(err) => {
2182 if let Some(OutputValue::Error(old_error)) = current_output
2183 && old_error == &err
2184 {
2185 None
2186 } else {
2187 Some(OutputValue::Error(err))
2188 }
2189 }
2190 };
2191 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2192 if new_output.is_some() && ctx.should_track_dependencies() {
2194 output_dependent_tasks = task.iter_output_dependent().collect();
2195 }
2196
2197 drop(task);
2198
2199 let mut is_now_immutable = false;
2201 if let Some(dependencies) = task_dependencies_for_immutable
2202 && dependencies
2203 .iter()
2204 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2205 {
2206 is_now_immutable = true;
2207 }
2208 #[cfg(feature = "trace_task_details")]
2209 span.record("immutable", is_immutable || is_now_immutable);
2210
2211 if !queue.is_empty() || !old_edges.is_empty() {
2212 #[cfg(feature = "trace_task_completion")]
2213 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2214 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2218 }
2219
2220 Some(TaskExecutionCompletePrepareResult {
2221 new_children,
2222 is_now_immutable,
2223 #[cfg(feature = "verify_determinism")]
2224 no_output_set,
2225 new_output,
2226 output_dependent_tasks,
2227 })
2228 }
2229
2230 fn task_execution_completed_invalidate_output_dependent(
2231 &self,
2232 ctx: &mut impl ExecuteContext<'_>,
2233 task_id: TaskId,
2234 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2235 ) {
2236 debug_assert!(!output_dependent_tasks.is_empty());
2237
2238 if output_dependent_tasks.len() > 1 {
2239 ctx.prepare_tasks(
2240 output_dependent_tasks
2241 .iter()
2242 .map(|&id| (id, TaskDataCategory::All)),
2243 );
2244 }
2245
2246 fn process_output_dependents(
2247 ctx: &mut impl ExecuteContext<'_>,
2248 task_id: TaskId,
2249 dependent_task_id: TaskId,
2250 queue: &mut AggregationUpdateQueue,
2251 ) {
2252 #[cfg(feature = "trace_task_output_dependencies")]
2253 let span = tracing::trace_span!(
2254 "invalidate output dependency",
2255 task = %task_id,
2256 dependent_task = %dependent_task_id,
2257 result = tracing::field::Empty,
2258 )
2259 .entered();
2260 let mut make_stale = true;
2261 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2262 let transient_task_type = dependent.get_transient_task_type();
2263 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2264 #[cfg(feature = "trace_task_output_dependencies")]
2266 span.record("result", "once task");
2267 return;
2268 }
2269 if dependent.outdated_output_dependencies_contains(&task_id) {
2270 #[cfg(feature = "trace_task_output_dependencies")]
2271 span.record("result", "outdated dependency");
2272 make_stale = false;
2277 } else if !dependent.output_dependencies_contains(&task_id) {
2278 #[cfg(feature = "trace_task_output_dependencies")]
2281 span.record("result", "no backward dependency");
2282 return;
2283 }
2284 make_task_dirty_internal(
2285 dependent,
2286 dependent_task_id,
2287 make_stale,
2288 #[cfg(feature = "trace_task_dirty")]
2289 TaskDirtyCause::OutputChange { task_id },
2290 queue,
2291 ctx,
2292 );
2293 #[cfg(feature = "trace_task_output_dependencies")]
2294 span.record("result", "marked dirty");
2295 }
2296
2297 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2298 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2299 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2300 let _ = scope_and_block(chunks.len(), |scope| {
2301 for chunk in chunks {
2302 let child_ctx = ctx.child_context();
2303 scope.spawn(move || {
2304 let mut ctx = child_ctx.create();
2305 let mut queue = AggregationUpdateQueue::new();
2306 for dependent_task_id in chunk {
2307 process_output_dependents(
2308 &mut ctx,
2309 task_id,
2310 dependent_task_id,
2311 &mut queue,
2312 )
2313 }
2314 queue.execute(&mut ctx);
2315 });
2316 }
2317 });
2318 } else {
2319 let mut queue = AggregationUpdateQueue::new();
2320 for dependent_task_id in output_dependent_tasks {
2321 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2322 }
2323 queue.execute(ctx);
2324 }
2325 }
2326
2327 fn task_execution_completed_unfinished_children_dirty(
2328 &self,
2329 ctx: &mut impl ExecuteContext<'_>,
2330 new_children: &FxHashSet<TaskId>,
2331 ) {
2332 debug_assert!(!new_children.is_empty());
2333
2334 let mut queue = AggregationUpdateQueue::new();
2335 ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2336 if !child_task.has_output() {
2337 let child_id = child_task.id();
2338 make_task_dirty_internal(
2339 child_task,
2340 child_id,
2341 false,
2342 #[cfg(feature = "trace_task_dirty")]
2343 TaskDirtyCause::InitialDirty,
2344 &mut queue,
2345 ctx,
2346 );
2347 }
2348 });
2349
2350 queue.execute(ctx);
2351 }
2352
2353 fn task_execution_completed_connect(
2354 &self,
2355 ctx: &mut impl ExecuteContext<'_>,
2356 task_id: TaskId,
2357 new_children: FxHashSet<TaskId>,
2358 ) -> bool {
2359 debug_assert!(!new_children.is_empty());
2360
2361 let mut task = ctx.task(task_id, TaskDataCategory::All);
2362 let Some(in_progress) = task.get_in_progress() else {
2363 panic!("Task execution completed, but task is not in progress: {task:#?}");
2364 };
2365 if matches!(in_progress, InProgressState::Canceled) {
2366 return false;
2368 }
2369 let InProgressState::InProgress(box InProgressStateInner {
2370 #[cfg(not(feature = "no_fast_stale"))]
2371 stale,
2372 once_task: is_once_task,
2373 ..
2374 }) = in_progress
2375 else {
2376 panic!("Task execution completed, but task is not in progress: {task:#?}");
2377 };
2378
2379 #[cfg(not(feature = "no_fast_stale"))]
2381 if *stale && !is_once_task {
2382 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2383 task.take_in_progress()
2384 else {
2385 unreachable!();
2386 };
2387 let old = task.set_in_progress(InProgressState::Scheduled {
2388 done_event,
2389 reason: TaskExecutionReason::Stale,
2390 });
2391 debug_assert!(old.is_none(), "InProgress already exists");
2392 drop(task);
2393
2394 AggregationUpdateQueue::run(
2397 AggregationUpdateJob::DecreaseActiveCounts {
2398 task_ids: new_children.into_iter().collect(),
2399 },
2400 ctx,
2401 );
2402 return true;
2403 }
2404
2405 let has_active_count = ctx.should_track_activeness()
2406 && task
2407 .get_activeness()
2408 .is_some_and(|activeness| activeness.active_counter > 0);
2409 connect_children(
2410 ctx,
2411 task_id,
2412 task,
2413 new_children,
2414 has_active_count,
2415 ctx.should_track_activeness(),
2416 );
2417
2418 false
2419 }
2420
2421 fn task_execution_completed_finish(
2422 &self,
2423 ctx: &mut impl ExecuteContext<'_>,
2424 task_id: TaskId,
2425 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2426 new_output: Option<OutputValue>,
2427 is_now_immutable: bool,
2428 ) -> (
2429 bool,
2430 Option<
2431 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2432 >,
2433 ) {
2434 let mut task = ctx.task(task_id, TaskDataCategory::All);
2435 let Some(in_progress) = task.take_in_progress() else {
2436 panic!("Task execution completed, but task is not in progress: {task:#?}");
2437 };
2438 if matches!(in_progress, InProgressState::Canceled) {
2439 return (false, None);
2441 }
2442 let InProgressState::InProgress(box InProgressStateInner {
2443 done_event,
2444 once_task: is_once_task,
2445 stale,
2446 session_dependent,
2447 marked_as_completed: _,
2448 new_children,
2449 }) = in_progress
2450 else {
2451 panic!("Task execution completed, but task is not in progress: {task:#?}");
2452 };
2453 debug_assert!(new_children.is_empty());
2454
2455 if stale && !is_once_task {
2457 let old = task.set_in_progress(InProgressState::Scheduled {
2458 done_event,
2459 reason: TaskExecutionReason::Stale,
2460 });
2461 debug_assert!(old.is_none(), "InProgress already exists");
2462 return (true, None);
2463 }
2464
2465 let mut old_content = None;
2467 if let Some(value) = new_output {
2468 old_content = task.set_output(value);
2469 }
2470
2471 if is_now_immutable {
2474 task.set_immutable(true);
2475 }
2476
2477 let in_progress_cells = task.take_in_progress_cells();
2479 if let Some(ref cells) = in_progress_cells {
2480 for state in cells.values() {
2481 state.event.notify(usize::MAX);
2482 }
2483 }
2484
2485 let old_dirtyness = task.get_dirty().cloned();
2487 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2488 None => (false, false),
2489 Some(Dirtyness::Dirty(_)) => (true, false),
2490 Some(Dirtyness::SessionDependent) => {
2491 let clean_in_current_session = task.current_session_clean();
2492 (true, clean_in_current_session)
2493 }
2494 };
2495
2496 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2498 (Some(Dirtyness::SessionDependent), true, true)
2499 } else {
2500 (None, false, false)
2501 };
2502
2503 if old_dirtyness != new_dirtyness {
2505 if let Some(value) = new_dirtyness {
2506 task.set_dirty(value);
2507 } else if old_dirtyness.is_some() {
2508 task.take_dirty();
2509 }
2510 }
2511 if old_current_session_self_clean != new_current_session_self_clean {
2512 if new_current_session_self_clean {
2513 task.set_current_session_clean(true);
2514 } else if old_current_session_self_clean {
2515 task.set_current_session_clean(false);
2516 }
2517 }
2518
2519 let data_update = if old_self_dirty != new_self_dirty
2521 || old_current_session_self_clean != new_current_session_self_clean
2522 {
2523 let dirty_container_count = task
2524 .get_aggregated_dirty_container_count()
2525 .cloned()
2526 .unwrap_or_default();
2527 let current_session_clean_container_count = task
2528 .get_aggregated_current_session_clean_container_count()
2529 .copied()
2530 .unwrap_or_default();
2531 let result = ComputeDirtyAndCleanUpdate {
2532 old_dirty_container_count: dirty_container_count,
2533 new_dirty_container_count: dirty_container_count,
2534 old_current_session_clean_container_count: current_session_clean_container_count,
2535 new_current_session_clean_container_count: current_session_clean_container_count,
2536 old_self_dirty,
2537 new_self_dirty,
2538 old_current_session_self_clean,
2539 new_current_session_self_clean,
2540 }
2541 .compute();
2542 if result.dirty_count_update - result.current_session_clean_update < 0 {
2543 if let Some(activeness_state) = task.get_activeness_mut() {
2545 activeness_state.all_clean_event.notify(usize::MAX);
2546 activeness_state.unset_active_until_clean();
2547 if activeness_state.is_empty() {
2548 task.take_activeness();
2549 }
2550 }
2551 }
2552 result
2553 .aggregated_update(task_id)
2554 .and_then(|aggregated_update| {
2555 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2556 })
2557 } else {
2558 None
2559 };
2560
2561 #[cfg(feature = "verify_determinism")]
2562 let reschedule =
2563 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2564 #[cfg(not(feature = "verify_determinism"))]
2565 let reschedule = false;
2566 if reschedule {
2567 let old = task.set_in_progress(InProgressState::Scheduled {
2568 done_event,
2569 reason: TaskExecutionReason::Stale,
2570 });
2571 debug_assert!(old.is_none(), "InProgress already exists");
2572 drop(task);
2573 } else {
2574 drop(task);
2575
2576 done_event.notify(usize::MAX);
2578 }
2579
2580 drop(old_content);
2581
2582 if let Some(data_update) = data_update {
2583 AggregationUpdateQueue::run(data_update, ctx);
2584 }
2585
2586 (reschedule, in_progress_cells)
2588 }
2589
2590 fn task_execution_completed_cleanup(
2591 &self,
2592 ctx: &mut impl ExecuteContext<'_>,
2593 task_id: TaskId,
2594 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2595 is_error: bool,
2596 ) -> Vec<SharedReference> {
2597 let mut task = ctx.task(task_id, TaskDataCategory::All);
2598 let mut removed_cell_data = Vec::new();
2599 if !is_error {
2603 let to_remove_persistent: Vec<_> = task
2610 .iter_persistent_cell_data()
2611 .filter_map(|(cell, _)| {
2612 cell_counters
2613 .get(&cell.type_id)
2614 .is_none_or(|start_index| cell.index >= *start_index)
2615 .then_some(*cell)
2616 })
2617 .collect();
2618
2619 let to_remove_transient: Vec<_> = task
2621 .iter_transient_cell_data()
2622 .filter_map(|(cell, _)| {
2623 cell_counters
2624 .get(&cell.type_id)
2625 .is_none_or(|start_index| cell.index >= *start_index)
2626 .then_some(*cell)
2627 })
2628 .collect();
2629 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2630 for cell in to_remove_persistent {
2631 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2632 removed_cell_data.push(data.into_untyped());
2633 }
2634 }
2635 for cell in to_remove_transient {
2636 if let Some(data) = task.remove_transient_cell_data(&cell) {
2637 removed_cell_data.push(data);
2638 }
2639 }
2640 }
2641
2642 task.cleanup_after_execution();
2646
2647 drop(task);
2648
2649 removed_cell_data
2651 }
2652
2653 fn run_backend_job<'a>(
2654 self: &'a Arc<Self>,
2655 job: TurboTasksBackendJob,
2656 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2657 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2658 Box::pin(async move {
2659 match job {
2660 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2661 debug_assert!(self.should_persist());
2662
2663 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2664 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2665 let mut idle_start_listener = self.idle_start_event.listen();
2666 let mut idle_end_listener = self.idle_end_event.listen();
2667 let mut fresh_idle = true;
2668 loop {
2669 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2670 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2671 let idle_timeout = *IDLE_TIMEOUT;
2672 let (time, mut reason) =
2673 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2674 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2675 } else {
2676 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2677 };
2678
2679 let until = last_snapshot + time;
2680 if until > Instant::now() {
2681 let mut stop_listener = self.stopping_event.listen();
2682 if self.stopping.load(Ordering::Acquire) {
2683 return;
2684 }
2685 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2686 Instant::now() + idle_timeout
2687 } else {
2688 far_future()
2689 };
2690 loop {
2691 tokio::select! {
2692 _ = &mut stop_listener => {
2693 return;
2694 },
2695 _ = &mut idle_start_listener => {
2696 fresh_idle = true;
2697 idle_time = Instant::now() + idle_timeout;
2698 idle_start_listener = self.idle_start_event.listen()
2699 },
2700 _ = &mut idle_end_listener => {
2701 idle_time = until + idle_timeout;
2702 idle_end_listener = self.idle_end_event.listen()
2703 },
2704 _ = tokio::time::sleep_until(until) => {
2705 break;
2706 },
2707 _ = tokio::time::sleep_until(idle_time) => {
2708 if turbo_tasks.is_idle() {
2709 reason = "idle timeout";
2710 break;
2711 }
2712 },
2713 }
2714 }
2715 }
2716
2717 let this = self.clone();
2718 let snapshot = this.snapshot_and_persist(None, reason);
2719 if let Some((snapshot_start, new_data)) = snapshot {
2720 last_snapshot = snapshot_start;
2721 if !new_data {
2722 fresh_idle = false;
2723 continue;
2724 }
2725 let last_snapshot = last_snapshot.duration_since(self.start_time);
2726 self.last_snapshot.store(
2727 last_snapshot.as_millis().try_into().unwrap(),
2728 Ordering::Relaxed,
2729 );
2730
2731 turbo_tasks.schedule_backend_background_job(
2732 TurboTasksBackendJob::FollowUpSnapshot,
2733 );
2734 return;
2735 }
2736 }
2737 }
2738 }
2739 })
2740 }
2741
2742 fn try_read_own_task_cell(
2743 &self,
2744 task_id: TaskId,
2745 cell: CellId,
2746 options: ReadCellOptions,
2747 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2748 ) -> Result<TypedCellContent> {
2749 let mut ctx = self.execute_context(turbo_tasks);
2750 let task = ctx.task(task_id, TaskDataCategory::Data);
2751 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2752 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2753 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2754 } else {
2755 Ok(CellContent(None).into_typed(cell.type_id))
2756 }
2757 }
2758
2759 fn read_task_collectibles(
2760 &self,
2761 task_id: TaskId,
2762 collectible_type: TraitTypeId,
2763 reader_id: Option<TaskId>,
2764 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2765 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2766 let mut ctx = self.execute_context(turbo_tasks);
2767 let mut collectibles = AutoMap::default();
2768 {
2769 let mut task = ctx.task(task_id, TaskDataCategory::All);
2770 loop {
2772 let aggregation_number = get_aggregation_number(&task);
2773 if is_root_node(aggregation_number) {
2774 break;
2775 }
2776 drop(task);
2777 AggregationUpdateQueue::run(
2778 AggregationUpdateJob::UpdateAggregationNumber {
2779 task_id,
2780 base_aggregation_number: u32::MAX,
2781 distance: None,
2782 },
2783 &mut ctx,
2784 );
2785 task = ctx.task(task_id, TaskDataCategory::All);
2786 }
2787 for (collectible, count) in task.iter_aggregated_collectibles() {
2788 if *count > 0 && collectible.collectible_type == collectible_type {
2789 *collectibles
2790 .entry(RawVc::TaskCell(
2791 collectible.cell.task,
2792 collectible.cell.cell,
2793 ))
2794 .or_insert(0) += 1;
2795 }
2796 }
2797 for (&collectible, &count) in task.iter_collectibles() {
2798 if collectible.collectible_type == collectible_type {
2799 *collectibles
2800 .entry(RawVc::TaskCell(
2801 collectible.cell.task,
2802 collectible.cell.cell,
2803 ))
2804 .or_insert(0) += count;
2805 }
2806 }
2807 if let Some(reader_id) = reader_id {
2808 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2809 }
2810 }
2811 if let Some(reader_id) = reader_id {
2812 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2813 let target = CollectiblesRef {
2814 task: task_id,
2815 collectible_type,
2816 };
2817 if !reader.remove_outdated_collectibles_dependencies(&target) {
2818 let _ = reader.add_collectibles_dependencies(target);
2819 }
2820 }
2821 collectibles
2822 }
2823
2824 fn emit_collectible(
2825 &self,
2826 collectible_type: TraitTypeId,
2827 collectible: RawVc,
2828 task_id: TaskId,
2829 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2830 ) {
2831 self.assert_valid_collectible(task_id, collectible);
2832
2833 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2834 panic!("Collectibles need to be resolved");
2835 };
2836 let cell = CellRef {
2837 task: collectible_task,
2838 cell,
2839 };
2840 operation::UpdateCollectibleOperation::run(
2841 task_id,
2842 CollectibleRef {
2843 collectible_type,
2844 cell,
2845 },
2846 1,
2847 self.execute_context(turbo_tasks),
2848 );
2849 }
2850
2851 fn unemit_collectible(
2852 &self,
2853 collectible_type: TraitTypeId,
2854 collectible: RawVc,
2855 count: u32,
2856 task_id: TaskId,
2857 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2858 ) {
2859 self.assert_valid_collectible(task_id, collectible);
2860
2861 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2862 panic!("Collectibles need to be resolved");
2863 };
2864 let cell = CellRef {
2865 task: collectible_task,
2866 cell,
2867 };
2868 operation::UpdateCollectibleOperation::run(
2869 task_id,
2870 CollectibleRef {
2871 collectible_type,
2872 cell,
2873 },
2874 -(i32::try_from(count).unwrap()),
2875 self.execute_context(turbo_tasks),
2876 );
2877 }
2878
2879 fn update_task_cell(
2880 &self,
2881 task_id: TaskId,
2882 cell: CellId,
2883 is_serializable_cell_content: bool,
2884 content: CellContent,
2885 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2886 verification_mode: VerificationMode,
2887 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2888 ) {
2889 operation::UpdateCellOperation::run(
2890 task_id,
2891 cell,
2892 content,
2893 is_serializable_cell_content,
2894 updated_key_hashes,
2895 verification_mode,
2896 self.execute_context(turbo_tasks),
2897 );
2898 }
2899
2900 fn mark_own_task_as_session_dependent(
2901 &self,
2902 task_id: TaskId,
2903 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2904 ) {
2905 if !self.should_track_dependencies() {
2906 return;
2908 }
2909 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2910 let mut ctx = self.execute_context(turbo_tasks);
2911 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2912 let aggregation_number = get_aggregation_number(&task);
2913 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2914 drop(task);
2915 AggregationUpdateQueue::run(
2918 AggregationUpdateJob::UpdateAggregationNumber {
2919 task_id,
2920 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2921 distance: None,
2922 },
2923 &mut ctx,
2924 );
2925 task = ctx.task(task_id, TaskDataCategory::Meta);
2926 }
2927 if let Some(InProgressState::InProgress(box InProgressStateInner {
2928 session_dependent,
2929 ..
2930 })) = task.get_in_progress_mut()
2931 {
2932 *session_dependent = true;
2933 }
2934 }
2935
2936 fn mark_own_task_as_finished(
2937 &self,
2938 task: TaskId,
2939 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2940 ) {
2941 let mut ctx = self.execute_context(turbo_tasks);
2942 let mut task = ctx.task(task, TaskDataCategory::Data);
2943 if let Some(InProgressState::InProgress(box InProgressStateInner {
2944 marked_as_completed,
2945 ..
2946 })) = task.get_in_progress_mut()
2947 {
2948 *marked_as_completed = true;
2949 }
2954 }
2955
2956 fn set_own_task_aggregation_number(
2957 &self,
2958 task: TaskId,
2959 aggregation_number: u32,
2960 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2961 ) {
2962 let mut ctx = self.execute_context(turbo_tasks);
2963 AggregationUpdateQueue::run(
2964 AggregationUpdateJob::UpdateAggregationNumber {
2965 task_id: task,
2966 base_aggregation_number: aggregation_number,
2967 distance: None,
2968 },
2969 &mut ctx,
2970 );
2971 }
2972
2973 fn connect_task(
2974 &self,
2975 task: TaskId,
2976 parent_task: Option<TaskId>,
2977 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2978 ) {
2979 self.assert_not_persistent_calling_transient(parent_task, task, None);
2980 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
2981 }
2982
2983 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2984 let task_id = self.transient_task_id_factory.get();
2985 {
2986 let mut task = self.storage.access_mut(task_id);
2987 task.init_transient_task(task_id, task_type, self.should_track_activeness());
2988 }
2989 #[cfg(feature = "verify_aggregation_graph")]
2990 self.root_tasks.lock().insert(task_id);
2991 task_id
2992 }
2993
2994 fn dispose_root_task(
2995 &self,
2996 task_id: TaskId,
2997 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2998 ) {
2999 #[cfg(feature = "verify_aggregation_graph")]
3000 self.root_tasks.lock().remove(&task_id);
3001
3002 let mut ctx = self.execute_context(turbo_tasks);
3003 let mut task = ctx.task(task_id, TaskDataCategory::All);
3004 let is_dirty = task.is_dirty();
3005 let has_dirty_containers = task.has_dirty_containers();
3006 if is_dirty.is_some() || has_dirty_containers {
3007 if let Some(activeness_state) = task.get_activeness_mut() {
3008 activeness_state.unset_root_type();
3010 activeness_state.set_active_until_clean();
3011 };
3012 } else if let Some(activeness_state) = task.take_activeness() {
3013 activeness_state.all_clean_event.notify(usize::MAX);
3016 }
3017 }
3018
3019 #[cfg(feature = "verify_aggregation_graph")]
3020 fn verify_aggregation_graph(
3021 &self,
3022 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3023 idle: bool,
3024 ) {
3025 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3026 return;
3027 }
3028 use std::{collections::VecDeque, env, io::stdout};
3029
3030 use crate::backend::operation::{get_uppers, is_aggregating_node};
3031
3032 let mut ctx = self.execute_context(turbo_tasks);
3033 let root_tasks = self.root_tasks.lock().clone();
3034
3035 for task_id in root_tasks.into_iter() {
3036 let mut queue = VecDeque::new();
3037 let mut visited = FxHashSet::default();
3038 let mut aggregated_nodes = FxHashSet::default();
3039 let mut collectibles = FxHashMap::default();
3040 let root_task_id = task_id;
3041 visited.insert(task_id);
3042 aggregated_nodes.insert(task_id);
3043 queue.push_back(task_id);
3044 let mut counter = 0;
3045 while let Some(task_id) = queue.pop_front() {
3046 counter += 1;
3047 if counter % 100000 == 0 {
3048 println!(
3049 "queue={}, visited={}, aggregated_nodes={}",
3050 queue.len(),
3051 visited.len(),
3052 aggregated_nodes.len()
3053 );
3054 }
3055 let task = ctx.task(task_id, TaskDataCategory::All);
3056 if idle && !self.is_idle.load(Ordering::Relaxed) {
3057 return;
3058 }
3059
3060 let uppers = get_uppers(&task);
3061 if task_id != root_task_id
3062 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3063 {
3064 panic!(
3065 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3066 {:?})",
3067 task_id,
3068 ctx.get_task_description(task_id),
3069 uppers
3070 );
3071 }
3072
3073 let aggregated_collectibles: Vec<_> = task
3074 .iter_aggregated_collectibles()
3075 .map(|(collectible, _)| collectible)
3076 .collect();
3077 for collectible in aggregated_collectibles {
3078 collectibles
3079 .entry(collectible)
3080 .or_insert_with(|| (false, Vec::new()))
3081 .1
3082 .push(task_id);
3083 }
3084
3085 let own_collectibles: Vec<_> = task
3086 .iter_collectibles_entries()
3087 .filter_map(|(&collectible, &value)| (value > 0).then_some(collectible))
3088 .collect::<Vec<_>>();
3089 for collectible in own_collectibles {
3090 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3091 *flag = true
3092 } else {
3093 panic!(
3094 "Task {} has a collectible {:?} that is not in any upper task",
3095 task_id, collectible
3096 );
3097 }
3098 }
3099
3100 let is_dirty = task.has_dirty();
3101 let has_dirty_container = task.has_dirty_containers();
3102 let should_be_in_upper = is_dirty || has_dirty_container;
3103
3104 let aggregation_number = get_aggregation_number(&task);
3105 if is_aggregating_node(aggregation_number) {
3106 aggregated_nodes.insert(task_id);
3107 }
3108 for child_id in task.iter_children() {
3115 if visited.insert(child_id) {
3117 queue.push_back(child_id);
3118 }
3119 }
3120 drop(task);
3121
3122 if should_be_in_upper {
3123 for upper_id in uppers {
3124 let task = ctx.task(upper_id, TaskDataCategory::All);
3125 let in_upper = task
3126 .get_aggregated_dirty_containers(&task_id)
3127 .is_some_and(|&dirty| dirty > 0);
3128 if !in_upper {
3129 let containers: Vec<_> = task
3130 .iter_aggregated_dirty_containers_entries()
3131 .map(|(&k, &v)| (k, v))
3132 .collect();
3133 panic!(
3134 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3135 ({})\nThese dirty containers are present:\n{:#?}",
3136 task_id,
3137 ctx.get_task_description(task_id),
3138 upper_id,
3139 ctx.get_task_description(upper_id),
3140 containers,
3141 );
3142 }
3143 }
3144 }
3145 }
3146
3147 for (collectible, (flag, task_ids)) in collectibles {
3148 if !flag {
3149 use std::io::Write;
3150 let mut stdout = stdout().lock();
3151 writeln!(
3152 stdout,
3153 "{:?} that is not emitted in any child task but in these aggregated \
3154 tasks: {:#?}",
3155 collectible,
3156 task_ids
3157 .iter()
3158 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
3159 .collect::<Vec<_>>()
3160 )
3161 .unwrap();
3162
3163 let task_id = collectible.cell.task;
3164 let mut queue = {
3165 let task = ctx.task(task_id, TaskDataCategory::All);
3166 get_uppers(&task)
3167 };
3168 let mut visited = FxHashSet::default();
3169 for &upper_id in queue.iter() {
3170 visited.insert(upper_id);
3171 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3172 }
3173 while let Some(task_id) = queue.pop() {
3174 let desc = ctx.get_task_description(task_id);
3175 let task = ctx.task(task_id, TaskDataCategory::All);
3176 let aggregated_collectible = task
3177 .get_aggregated_collectibles(&collectible)
3178 .copied()
3179 .unwrap_or_default();
3180 let uppers = get_uppers(&task);
3181 drop(task);
3182 writeln!(
3183 stdout,
3184 "upper {task_id} {desc} collectible={aggregated_collectible}"
3185 )
3186 .unwrap();
3187 if task_ids.contains(&task_id) {
3188 writeln!(
3189 stdout,
3190 "Task has an upper connection to an aggregated task that doesn't \
3191 reference it. Upper connection is invalid!"
3192 )
3193 .unwrap();
3194 }
3195 for upper_id in uppers {
3196 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3197 if !visited.contains(&upper_id) {
3198 queue.push(upper_id);
3199 }
3200 }
3201 }
3202 panic!("See stdout for more details");
3203 }
3204 }
3205 }
3206 }
3207
3208 fn assert_not_persistent_calling_transient(
3209 &self,
3210 parent_id: Option<TaskId>,
3211 child_id: TaskId,
3212 cell_id: Option<CellId>,
3213 ) {
3214 if let Some(parent_id) = parent_id
3215 && !parent_id.is_transient()
3216 && child_id.is_transient()
3217 {
3218 self.panic_persistent_calling_transient(
3219 self.debug_get_task_description(parent_id),
3220 self.debug_get_cached_task_type(child_id).as_deref(),
3221 cell_id,
3222 );
3223 }
3224 }
3225
3226 fn panic_persistent_calling_transient(
3227 &self,
3228 parent: String,
3229 child: Option<&CachedTaskType>,
3230 cell_id: Option<CellId>,
3231 ) {
3232 let transient_reason = if let Some(child) = child {
3233 Cow::Owned(format!(
3234 " The callee is transient because it depends on:\n{}",
3235 self.debug_trace_transient_task(child, cell_id),
3236 ))
3237 } else {
3238 Cow::Borrowed("")
3239 };
3240 panic!(
3241 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3242 parent,
3243 child.map_or("unknown", |t| t.get_name()),
3244 transient_reason,
3245 );
3246 }
3247
3248 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3249 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3251 let task_info = if let Some(col_task_ty) = collectible
3253 .try_get_task_id()
3254 .map(|t| self.debug_get_task_description(t))
3255 {
3256 Cow::Owned(format!(" (return type of {col_task_ty})"))
3257 } else {
3258 Cow::Borrowed("")
3259 };
3260 panic!("Collectible{task_info} must be a ResolvedVc")
3261 };
3262 if col_task_id.is_transient() && !task_id.is_transient() {
3263 let transient_reason =
3264 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3265 Cow::Owned(format!(
3266 ". The collectible is transient because it depends on:\n{}",
3267 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3268 ))
3269 } else {
3270 Cow::Borrowed("")
3271 };
3272 panic!(
3274 "Collectible is transient, transient collectibles cannot be emitted from \
3275 persistent tasks{transient_reason}",
3276 )
3277 }
3278 }
3279}
3280
3281impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3282 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3283 self.0.startup(turbo_tasks);
3284 }
3285
3286 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3287 self.0.stopping();
3288 }
3289
3290 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3291 self.0.stop(turbo_tasks);
3292 }
3293
3294 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3295 self.0.idle_start(turbo_tasks);
3296 }
3297
3298 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3299 self.0.idle_end();
3300 }
3301
3302 fn get_or_create_persistent_task(
3303 &self,
3304 task_type: CachedTaskType,
3305 parent_task: Option<TaskId>,
3306 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3307 ) -> TaskId {
3308 self.0
3309 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3310 }
3311
3312 fn get_or_create_transient_task(
3313 &self,
3314 task_type: CachedTaskType,
3315 parent_task: Option<TaskId>,
3316 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3317 ) -> TaskId {
3318 self.0
3319 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3320 }
3321
3322 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3323 self.0.invalidate_task(task_id, turbo_tasks);
3324 }
3325
3326 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3327 self.0.invalidate_tasks(tasks, turbo_tasks);
3328 }
3329
3330 fn invalidate_tasks_set(
3331 &self,
3332 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3333 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3334 ) {
3335 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3336 }
3337
3338 fn invalidate_serialization(
3339 &self,
3340 task_id: TaskId,
3341 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3342 ) {
3343 self.0.invalidate_serialization(task_id, turbo_tasks);
3344 }
3345
3346 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3347 self.0.task_execution_canceled(task, turbo_tasks)
3348 }
3349
3350 fn try_start_task_execution(
3351 &self,
3352 task_id: TaskId,
3353 priority: TaskPriority,
3354 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3355 ) -> Option<TaskExecutionSpec<'_>> {
3356 self.0
3357 .try_start_task_execution(task_id, priority, turbo_tasks)
3358 }
3359
3360 fn task_execution_completed(
3361 &self,
3362 task_id: TaskId,
3363 result: Result<RawVc, TurboTasksExecutionError>,
3364 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3365 has_invalidator: bool,
3366 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3367 ) -> bool {
3368 self.0.task_execution_completed(
3369 task_id,
3370 result,
3371 cell_counters,
3372 has_invalidator,
3373 turbo_tasks,
3374 )
3375 }
3376
3377 type BackendJob = TurboTasksBackendJob;
3378
3379 fn run_backend_job<'a>(
3380 &'a self,
3381 job: Self::BackendJob,
3382 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3383 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3384 self.0.run_backend_job(job, turbo_tasks)
3385 }
3386
3387 fn try_read_task_output(
3388 &self,
3389 task_id: TaskId,
3390 reader: Option<TaskId>,
3391 options: ReadOutputOptions,
3392 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3393 ) -> Result<Result<RawVc, EventListener>> {
3394 self.0
3395 .try_read_task_output(task_id, reader, options, turbo_tasks)
3396 }
3397
3398 fn try_read_task_cell(
3399 &self,
3400 task_id: TaskId,
3401 cell: CellId,
3402 reader: Option<TaskId>,
3403 options: ReadCellOptions,
3404 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3405 ) -> Result<Result<TypedCellContent, EventListener>> {
3406 self.0
3407 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3408 }
3409
3410 fn try_read_own_task_cell(
3411 &self,
3412 task_id: TaskId,
3413 cell: CellId,
3414 options: ReadCellOptions,
3415 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3416 ) -> Result<TypedCellContent> {
3417 self.0
3418 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3419 }
3420
3421 fn read_task_collectibles(
3422 &self,
3423 task_id: TaskId,
3424 collectible_type: TraitTypeId,
3425 reader: Option<TaskId>,
3426 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3427 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3428 self.0
3429 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3430 }
3431
3432 fn emit_collectible(
3433 &self,
3434 collectible_type: TraitTypeId,
3435 collectible: RawVc,
3436 task_id: TaskId,
3437 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3438 ) {
3439 self.0
3440 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3441 }
3442
3443 fn unemit_collectible(
3444 &self,
3445 collectible_type: TraitTypeId,
3446 collectible: RawVc,
3447 count: u32,
3448 task_id: TaskId,
3449 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3450 ) {
3451 self.0
3452 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3453 }
3454
3455 fn update_task_cell(
3456 &self,
3457 task_id: TaskId,
3458 cell: CellId,
3459 is_serializable_cell_content: bool,
3460 content: CellContent,
3461 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3462 verification_mode: VerificationMode,
3463 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3464 ) {
3465 self.0.update_task_cell(
3466 task_id,
3467 cell,
3468 is_serializable_cell_content,
3469 content,
3470 updated_key_hashes,
3471 verification_mode,
3472 turbo_tasks,
3473 );
3474 }
3475
3476 fn mark_own_task_as_finished(
3477 &self,
3478 task_id: TaskId,
3479 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3480 ) {
3481 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3482 }
3483
3484 fn set_own_task_aggregation_number(
3485 &self,
3486 task: TaskId,
3487 aggregation_number: u32,
3488 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3489 ) {
3490 self.0
3491 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3492 }
3493
3494 fn mark_own_task_as_session_dependent(
3495 &self,
3496 task: TaskId,
3497 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3498 ) {
3499 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3500 }
3501
3502 fn connect_task(
3503 &self,
3504 task: TaskId,
3505 parent_task: Option<TaskId>,
3506 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3507 ) {
3508 self.0.connect_task(task, parent_task, turbo_tasks);
3509 }
3510
3511 fn create_transient_task(
3512 &self,
3513 task_type: TransientTaskType,
3514 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3515 ) -> TaskId {
3516 self.0.create_transient_task(task_type)
3517 }
3518
3519 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3520 self.0.dispose_root_task(task_id, turbo_tasks);
3521 }
3522
3523 fn task_statistics(&self) -> &TaskStatisticsApi {
3524 &self.0.task_statistics
3525 }
3526
3527 fn is_tracking_dependencies(&self) -> bool {
3528 self.0.options.dependency_tracking
3529 }
3530}
3531
3532enum DebugTraceTransientTask {
3533 Cached {
3534 task_name: &'static str,
3535 cell_type_id: Option<ValueTypeId>,
3536 cause_self: Option<Box<DebugTraceTransientTask>>,
3537 cause_args: Vec<DebugTraceTransientTask>,
3538 },
3539 Collapsed {
3541 task_name: &'static str,
3542 cell_type_id: Option<ValueTypeId>,
3543 },
3544 Uncached {
3545 cell_type_id: Option<ValueTypeId>,
3546 },
3547}
3548
3549impl DebugTraceTransientTask {
3550 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3551 let indent = " ".repeat(level);
3552 f.write_str(&indent)?;
3553
3554 fn fmt_cell_type_id(
3555 f: &mut fmt::Formatter<'_>,
3556 cell_type_id: Option<ValueTypeId>,
3557 ) -> fmt::Result {
3558 if let Some(ty) = cell_type_id {
3559 write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3560 } else {
3561 Ok(())
3562 }
3563 }
3564
3565 match self {
3567 Self::Cached {
3568 task_name,
3569 cell_type_id,
3570 ..
3571 }
3572 | Self::Collapsed {
3573 task_name,
3574 cell_type_id,
3575 ..
3576 } => {
3577 f.write_str(task_name)?;
3578 fmt_cell_type_id(f, *cell_type_id)?;
3579 if matches!(self, Self::Collapsed { .. }) {
3580 f.write_str(" (collapsed)")?;
3581 }
3582 }
3583 Self::Uncached { cell_type_id } => {
3584 f.write_str("unknown transient task")?;
3585 fmt_cell_type_id(f, *cell_type_id)?;
3586 }
3587 }
3588 f.write_char('\n')?;
3589
3590 if let Self::Cached {
3592 cause_self,
3593 cause_args,
3594 ..
3595 } = self
3596 {
3597 if let Some(c) = cause_self {
3598 writeln!(f, "{indent} self:")?;
3599 c.fmt_indented(f, level + 1)?;
3600 }
3601 if !cause_args.is_empty() {
3602 writeln!(f, "{indent} args:")?;
3603 for c in cause_args {
3604 c.fmt_indented(f, level + 1)?;
3605 }
3606 }
3607 }
3608 Ok(())
3609 }
3610}
3611
3612impl fmt::Display for DebugTraceTransientTask {
3613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3614 self.fmt_indented(f, 0)
3615 }
3616}
3617
3618fn far_future() -> Instant {
3620 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3625}
3626
3627fn encode_task_data(
3632 task: TaskId,
3633 data: &TaskStorage,
3634 category: SpecificTaskDataCategory,
3635 scratch_buffer: &mut TurboBincodeBuffer,
3636) -> Result<TurboBincodeBuffer> {
3637 scratch_buffer.clear();
3638 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3639 data.encode(category, &mut encoder)?;
3640
3641 if cfg!(feature = "verify_serialization") {
3642 TaskStorage::new()
3643 .decode(
3644 category,
3645 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3646 )
3647 .with_context(|| {
3648 format!(
3649 "expected to be able to decode serialized data for '{category:?}' information \
3650 for {task}"
3651 )
3652 })?;
3653 }
3654 Ok(SmallVec::from_slice(scratch_buffer))
3655}