1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17 time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33 backend::{
34 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
35 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37 VerificationMode,
38 },
39 event::{Event, EventDescription, EventListener},
40 message_queue::{TimingEvent, TraceEvent},
41 registry::get_value_type,
42 scope::scope_and_block,
43 task_statistics::TaskStatisticsApi,
44 trace::TraceRawVcs,
45 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49 operation::AnyOperation,
50 storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55 backend::{
56 operation::{
57 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58 CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59 ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60 TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61 is_root_node, make_task_dirty_internal, prepare_new_children,
62 },
63 storage::Storage,
64 storage_schema::{TaskStorage, TaskStorageAccessors},
65 },
66 backing_storage::{BackingStorage, SnapshotItem},
67 data::{
68 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69 InProgressState, InProgressStateInner, OutputValue, TransientTask,
70 },
71 error::TaskError,
72 utils::{
73 arc_or_owned::ArcOrOwned,
74 chunked_vec::ChunkedVec,
75 dash_map_drop_contents::drop_contents,
76 dash_map_raw_entry::{RawEntry, raw_entry},
77 ptr_eq_arc::PtrEqArc,
78 shard_amount::compute_shard_amount,
79 sharded::Sharded,
80 swap_retain,
81 },
82};
83
84const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
88
89const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
90
91static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
94 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
95 .ok()
96 .and_then(|v| v.parse::<u64>().ok())
97 .map(Duration::from_millis)
98 .unwrap_or(Duration::from_secs(2))
99});
100
101struct SnapshotRequest {
102 snapshot_requested: bool,
103 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
104}
105
106impl SnapshotRequest {
107 fn new() -> Self {
108 Self {
109 snapshot_requested: false,
110 suspended_operations: FxHashSet::default(),
111 }
112 }
113}
114
115pub enum StorageMode {
116 ReadOnly,
118 ReadWrite,
121 ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127 pub dependency_tracking: bool,
132
133 pub active_tracking: bool,
139
140 pub storage_mode: Option<StorageMode>,
142
143 pub num_workers: Option<usize>,
146
147 pub small_preallocation: bool,
149}
150
151impl Default for BackendOptions {
152 fn default() -> Self {
153 Self {
154 dependency_tracking: true,
155 active_tracking: true,
156 storage_mode: Some(StorageMode::ReadWrite),
157 num_workers: None,
158 small_preallocation: false,
159 }
160 }
161}
162
163pub enum TurboTasksBackendJob {
164 InitialSnapshot,
165 FollowUpSnapshot,
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173 options: BackendOptions,
174
175 start_time: Instant,
176
177 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179
180 persisted_task_cache_log: Option<TaskCacheLog>,
181 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182
183 storage: Storage,
184
185 local_is_partial: bool,
188
189 in_progress_operations: AtomicUsize,
195
196 snapshot_request: Mutex<SnapshotRequest>,
197 operations_suspended: Condvar,
201 snapshot_completed: Condvar,
204 last_snapshot: AtomicU64,
206
207 stopping: AtomicBool,
208 stopping_event: Event,
209 idle_start_event: Event,
210 idle_end_event: Event,
211 #[cfg(feature = "verify_aggregation_graph")]
212 is_idle: AtomicBool,
213
214 task_statistics: TaskStatisticsApi,
215
216 backing_storage: B,
217
218 #[cfg(feature = "verify_aggregation_graph")]
219 root_tasks: Mutex<FxHashSet<TaskId>>,
220}
221
222impl<B: BackingStorage> TurboTasksBackend<B> {
223 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
224 Self(Arc::new(TurboTasksBackendInner::new(
225 options,
226 backing_storage,
227 )))
228 }
229
230 pub fn backing_storage(&self) -> &B {
231 &self.0.backing_storage
232 }
233}
234
235impl<B: BackingStorage> TurboTasksBackendInner<B> {
236 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
237 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
238 let need_log = matches!(
239 options.storage_mode,
240 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
241 );
242 if !options.dependency_tracking {
243 options.active_tracking = false;
244 }
245 let small_preallocation = options.small_preallocation;
246 let next_task_id = backing_storage
247 .next_free_task_id()
248 .expect("Failed to get task id");
249 Self {
250 options,
251 start_time: Instant::now(),
252 persisted_task_id_factory: IdFactoryWithReuse::new(
253 next_task_id,
254 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
255 ),
256 transient_task_id_factory: IdFactoryWithReuse::new(
257 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
258 TaskId::MAX,
259 ),
260 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
261 task_cache: FxDashMap::default(),
262 local_is_partial: next_task_id != TaskId::MIN,
263 storage: Storage::new(shard_amount, small_preallocation),
264 in_progress_operations: AtomicUsize::new(0),
265 snapshot_request: Mutex::new(SnapshotRequest::new()),
266 operations_suspended: Condvar::new(),
267 snapshot_completed: Condvar::new(),
268 last_snapshot: AtomicU64::new(0),
269 stopping: AtomicBool::new(false),
270 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
271 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
272 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
273 #[cfg(feature = "verify_aggregation_graph")]
274 is_idle: AtomicBool::new(false),
275 task_statistics: TaskStatisticsApi::default(),
276 backing_storage,
277 #[cfg(feature = "verify_aggregation_graph")]
278 root_tasks: Default::default(),
279 }
280 }
281
282 fn execute_context<'a>(
283 &'a self,
284 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
285 ) -> impl ExecuteContext<'a> {
286 ExecuteContextImpl::new(self, turbo_tasks)
287 }
288
289 fn suspending_requested(&self) -> bool {
290 self.should_persist()
291 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
292 }
293
294 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
295 #[cold]
296 fn operation_suspend_point_cold<B: BackingStorage>(
297 this: &TurboTasksBackendInner<B>,
298 suspend: impl FnOnce() -> AnyOperation,
299 ) {
300 let operation = Arc::new(suspend());
301 let mut snapshot_request = this.snapshot_request.lock();
302 if snapshot_request.snapshot_requested {
303 snapshot_request
304 .suspended_operations
305 .insert(operation.clone().into());
306 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
307 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
308 if value == SNAPSHOT_REQUESTED_BIT {
309 this.operations_suspended.notify_all();
310 }
311 this.snapshot_completed
312 .wait_while(&mut snapshot_request, |snapshot_request| {
313 snapshot_request.snapshot_requested
314 });
315 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316 snapshot_request
317 .suspended_operations
318 .remove(&operation.into());
319 }
320 }
321
322 if self.suspending_requested() {
323 operation_suspend_point_cold(self, suspend);
324 }
325 }
326
327 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
328 if !self.should_persist() {
329 return OperationGuard { backend: None };
330 }
331 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
332 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
333 let mut snapshot_request = self.snapshot_request.lock();
334 if snapshot_request.snapshot_requested {
335 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
336 if value == SNAPSHOT_REQUESTED_BIT {
337 self.operations_suspended.notify_all();
338 }
339 self.snapshot_completed
340 .wait_while(&mut snapshot_request, |snapshot_request| {
341 snapshot_request.snapshot_requested
342 });
343 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
344 }
345 }
346 OperationGuard {
347 backend: Some(self),
348 }
349 }
350
351 fn should_persist(&self) -> bool {
352 matches!(
353 self.options.storage_mode,
354 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
355 )
356 }
357
358 fn should_restore(&self) -> bool {
359 self.options.storage_mode.is_some()
360 }
361
362 fn should_track_dependencies(&self) -> bool {
363 self.options.dependency_tracking
364 }
365
366 fn should_track_activeness(&self) -> bool {
367 self.options.active_tracking
368 }
369
370 fn track_cache_hit(&self, task_type: &CachedTaskType) {
371 self.task_statistics
372 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
373 }
374
375 fn track_cache_miss(&self, task_type: &CachedTaskType) {
376 self.task_statistics
377 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
378 }
379
380 fn task_error_to_turbo_tasks_execution_error(
385 &self,
386 error: &TaskError,
387 ctx: &mut impl ExecuteContext<'_>,
388 ) -> TurboTasksExecutionError {
389 match error {
390 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
391 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
392 message: item.message.clone(),
393 source: item
394 .source
395 .as_ref()
396 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
397 })),
398 TaskError::LocalTaskContext(local_task_context) => {
399 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
400 name: local_task_context.name.clone(),
401 source: local_task_context
402 .source
403 .as_ref()
404 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
405 }))
406 }
407 TaskError::TaskChain(chain) => {
408 let task_id = chain.last().unwrap();
409 let error = {
410 let task = ctx.task(*task_id, TaskDataCategory::Meta);
411 if let Some(OutputValue::Error(error)) = task.get_output() {
412 Some(error.clone())
413 } else {
414 None
415 }
416 };
417 let error = error.map_or_else(
418 || {
419 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
421 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
422 "Error no longer available",
423 )),
424 location: None,
425 }))
426 },
427 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
428 );
429 let mut current_error = error;
430 for &task_id in chain.iter().rev() {
431 current_error =
432 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
433 task_id,
434 source: Some(current_error),
435 turbo_tasks: ctx.turbo_tasks(),
436 }));
437 }
438 current_error
439 }
440 }
441 }
442}
443
444pub(crate) struct OperationGuard<'a, B: BackingStorage> {
445 backend: Option<&'a TurboTasksBackendInner<B>>,
446}
447
448impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
449 fn drop(&mut self) {
450 if let Some(backend) = self.backend {
451 let fetch_sub = backend
452 .in_progress_operations
453 .fetch_sub(1, Ordering::AcqRel);
454 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
455 backend.operations_suspended.notify_all();
456 }
457 }
458 }
459}
460
461struct TaskExecutionCompletePrepareResult {
463 pub new_children: FxHashSet<TaskId>,
464 pub is_now_immutable: bool,
465 #[cfg(feature = "verify_determinism")]
466 pub no_output_set: bool,
467 pub new_output: Option<OutputValue>,
468 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
469}
470
471impl<B: BackingStorage> TurboTasksBackendInner<B> {
473 fn connect_child(
474 &self,
475 parent_task: Option<TaskId>,
476 child_task: TaskId,
477 task_type: Option<ArcOrOwned<CachedTaskType>>,
478 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479 ) {
480 operation::ConnectChildOperation::run(
481 parent_task,
482 child_task,
483 task_type,
484 self.execute_context(turbo_tasks),
485 );
486 }
487
488 fn try_read_task_output(
489 self: &Arc<Self>,
490 task_id: TaskId,
491 reader: Option<TaskId>,
492 options: ReadOutputOptions,
493 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
494 ) -> Result<Result<RawVc, EventListener>> {
495 self.assert_not_persistent_calling_transient(reader, task_id, None);
496
497 let mut ctx = self.execute_context(turbo_tasks);
498 let need_reader_task = if self.should_track_dependencies()
499 && !matches!(options.tracking, ReadTracking::Untracked)
500 && reader.is_some_and(|reader_id| reader_id != task_id)
501 && let Some(reader_id) = reader
502 && reader_id != task_id
503 {
504 Some(reader_id)
505 } else {
506 None
507 };
508 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
509 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
513 (task, Some(reader))
514 } else {
515 (ctx.task(task_id, TaskDataCategory::All), None)
516 };
517
518 fn listen_to_done_event(
519 reader_description: Option<EventDescription>,
520 tracking: ReadTracking,
521 done_event: &Event,
522 ) -> EventListener {
523 done_event.listen_with_note(move || {
524 move || {
525 if let Some(reader_description) = reader_description.as_ref() {
526 format!(
527 "try_read_task_output from {} ({})",
528 reader_description, tracking
529 )
530 } else {
531 format!("try_read_task_output ({})", tracking)
532 }
533 }
534 })
535 }
536
537 fn check_in_progress(
538 task: &impl TaskGuard,
539 reader_description: Option<EventDescription>,
540 tracking: ReadTracking,
541 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
542 {
543 match task.get_in_progress() {
544 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
545 listen_to_done_event(reader_description, tracking, done_event),
546 ))),
547 Some(InProgressState::InProgress(box InProgressStateInner {
548 done_event, ..
549 })) => Some(Ok(Err(listen_to_done_event(
550 reader_description,
551 tracking,
552 done_event,
553 )))),
554 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
555 "{} was canceled",
556 task.get_task_description()
557 ))),
558 None => None,
559 }
560 }
561
562 if matches!(options.consistency, ReadConsistency::Strong) {
563 loop {
565 let aggregation_number = get_aggregation_number(&task);
566 if is_root_node(aggregation_number) {
567 break;
568 }
569 drop(task);
570 drop(reader_task);
571 {
572 let _span = tracing::trace_span!(
573 "make root node for strongly consistent read",
574 %task_id
575 )
576 .entered();
577 AggregationUpdateQueue::run(
578 AggregationUpdateJob::UpdateAggregationNumber {
579 task_id,
580 base_aggregation_number: u32::MAX,
581 distance: None,
582 },
583 &mut ctx,
584 );
585 }
586 (task, reader_task) = if let Some(reader_id) = need_reader_task {
587 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
589 (task, Some(reader))
590 } else {
591 (ctx.task(task_id, TaskDataCategory::All), None)
592 }
593 }
594
595 let is_dirty = task.is_dirty();
596
597 let has_dirty_containers = task.has_dirty_containers();
599 if has_dirty_containers || is_dirty.is_some() {
600 let activeness = task.get_activeness_mut();
601 let mut task_ids_to_schedule: Vec<_> = Vec::new();
602 let activeness = if let Some(activeness) = activeness {
604 activeness.set_active_until_clean();
608 activeness
609 } else {
610 if ctx.should_track_activeness() {
614 task_ids_to_schedule = task.dirty_containers().collect();
616 task_ids_to_schedule.push(task_id);
617 }
618 let activeness =
619 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
620 activeness.set_active_until_clean();
621 activeness
622 };
623 let listener = activeness.all_clean_event.listen_with_note(move || {
624 let this = self.clone();
625 let tt = turbo_tasks.pin();
626 move || {
627 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
628 let mut ctx = this.execute_context(tt);
629 let mut visited = FxHashSet::default();
630 fn indent(s: &str) -> String {
631 s.split_inclusive('\n')
632 .flat_map(|line: &str| [" ", line].into_iter())
633 .collect::<String>()
634 }
635 fn get_info(
636 ctx: &mut impl ExecuteContext<'_>,
637 task_id: TaskId,
638 parent_and_count: Option<(TaskId, i32)>,
639 visited: &mut FxHashSet<TaskId>,
640 ) -> String {
641 let task = ctx.task(task_id, TaskDataCategory::All);
642 let is_dirty = task.is_dirty();
643 let in_progress =
644 task.get_in_progress()
645 .map_or("not in progress", |p| match p {
646 InProgressState::InProgress(_) => "in progress",
647 InProgressState::Scheduled { .. } => "scheduled",
648 InProgressState::Canceled => "canceled",
649 });
650 let activeness = task.get_activeness().map_or_else(
651 || "not active".to_string(),
652 |activeness| format!("{activeness:?}"),
653 );
654 let aggregation_number = get_aggregation_number(&task);
655 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
656 {
657 let uppers = get_uppers(&task);
658 !uppers.contains(&parent_task_id)
659 } else {
660 false
661 };
662
663 let has_dirty_containers = task.has_dirty_containers();
665
666 let task_description = task.get_task_description();
667 let is_dirty_label = if let Some(parent_priority) = is_dirty {
668 format!(", dirty({parent_priority})")
669 } else {
670 String::new()
671 };
672 let has_dirty_containers_label = if has_dirty_containers {
673 ", dirty containers"
674 } else {
675 ""
676 };
677 let count = if let Some((_, count)) = parent_and_count {
678 format!(" {count}")
679 } else {
680 String::new()
681 };
682 let mut info = format!(
683 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
684 {in_progress}, \
685 {activeness}{is_dirty_label}{has_dirty_containers_label})",
686 );
687 let children: Vec<_> = task.dirty_containers_with_count().collect();
688 drop(task);
689
690 if missing_upper {
691 info.push_str("\n ERROR: missing upper connection");
692 }
693
694 if has_dirty_containers || !children.is_empty() {
695 writeln!(info, "\n dirty tasks:").unwrap();
696
697 for (child_task_id, count) in children {
698 let task_description = ctx
699 .task(child_task_id, TaskDataCategory::Data)
700 .get_task_description();
701 if visited.insert(child_task_id) {
702 let child_info = get_info(
703 ctx,
704 child_task_id,
705 Some((task_id, count)),
706 visited,
707 );
708 info.push_str(&indent(&child_info));
709 if !info.ends_with('\n') {
710 info.push('\n');
711 }
712 } else {
713 writeln!(
714 info,
715 " {child_task_id} {task_description} {count} \
716 (already visited)"
717 )
718 .unwrap();
719 }
720 }
721 }
722 info
723 }
724 let info = get_info(&mut ctx, task_id, None, &mut visited);
725 format!(
726 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
727 )
728 }
729 });
730 drop(reader_task);
731 drop(task);
732 if !task_ids_to_schedule.is_empty() {
733 let mut queue = AggregationUpdateQueue::new();
734 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
735 queue.execute(&mut ctx);
736 }
737
738 return Ok(Err(listener));
739 }
740 }
741
742 let reader_description = reader_task
743 .as_ref()
744 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
745 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
746 {
747 return value;
748 }
749
750 if let Some(output) = task.get_output() {
751 let result = match output {
752 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
753 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
754 OutputValue::Error(error) => Err(error.clone()),
755 };
756 if let Some(mut reader_task) = reader_task.take()
757 && options.tracking.should_track(result.is_err())
758 && (!task.immutable() || cfg!(feature = "verify_immutable"))
759 {
760 #[cfg(feature = "trace_task_output_dependencies")]
761 let _span = tracing::trace_span!(
762 "add output dependency",
763 task = %task_id,
764 dependent_task = ?reader
765 )
766 .entered();
767 let mut queue = LeafDistanceUpdateQueue::new();
768 let reader = reader.unwrap();
769 if task.add_output_dependent(reader) {
770 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
772 let reader_leaf_distance =
773 reader_task.get_leaf_distance().copied().unwrap_or_default();
774 if reader_leaf_distance.distance <= leaf_distance.distance {
775 queue.push(
776 reader,
777 leaf_distance.distance,
778 leaf_distance.max_distance_in_buffer,
779 );
780 }
781 }
782
783 drop(task);
784
785 if !reader_task.remove_outdated_output_dependencies(&task_id) {
791 let _ = reader_task.add_output_dependencies(task_id);
792 }
793 drop(reader_task);
794
795 queue.execute(&mut ctx);
796 } else {
797 drop(task);
798 }
799
800 return result.map_err(|error| {
801 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
802 .with_task_context(task_id, turbo_tasks.pin())
803 .into()
804 });
805 }
806 drop(reader_task);
807
808 let note = EventDescription::new(|| {
809 move || {
810 if let Some(reader) = reader_description.as_ref() {
811 format!("try_read_task_output (recompute) from {reader}",)
812 } else {
813 "try_read_task_output (recompute, untracked)".to_string()
814 }
815 }
816 });
817
818 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
820 TaskExecutionReason::OutputNotAvailable,
821 EventDescription::new(|| task.get_task_desc_fn()),
822 note,
823 );
824
825 let old = task.set_in_progress(in_progress_state);
828 debug_assert!(old.is_none(), "InProgress already exists");
829 ctx.schedule_task(task, TaskPriority::Initial);
830
831 Ok(Err(listener))
832 }
833
834 fn try_read_task_cell(
835 &self,
836 task_id: TaskId,
837 reader: Option<TaskId>,
838 cell: CellId,
839 options: ReadCellOptions,
840 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
841 ) -> Result<Result<TypedCellContent, EventListener>> {
842 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
843
844 fn add_cell_dependency(
845 task_id: TaskId,
846 mut task: impl TaskGuard,
847 reader: Option<TaskId>,
848 reader_task: Option<impl TaskGuard>,
849 cell: CellId,
850 key: Option<u64>,
851 ) {
852 if let Some(mut reader_task) = reader_task
853 && (!task.immutable() || cfg!(feature = "verify_immutable"))
854 {
855 let reader = reader.unwrap();
856 let _ = task.add_cell_dependents((cell, key, reader));
857 drop(task);
858
859 let target = CellRef {
865 task: task_id,
866 cell,
867 };
868 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
869 let _ = reader_task.add_cell_dependencies((target, key));
870 }
871 drop(reader_task);
872 }
873 }
874
875 let ReadCellOptions {
876 is_serializable_cell_content,
877 tracking,
878 final_read_hint,
879 } = options;
880
881 let mut ctx = self.execute_context(turbo_tasks);
882 let (mut task, reader_task) = if self.should_track_dependencies()
883 && !matches!(tracking, ReadCellTracking::Untracked)
884 && let Some(reader_id) = reader
885 && reader_id != task_id
886 {
887 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
891 (task, Some(reader))
892 } else {
893 (ctx.task(task_id, TaskDataCategory::All), None)
894 };
895
896 let content = if final_read_hint {
897 task.remove_cell_data(is_serializable_cell_content, cell)
898 } else {
899 task.get_cell_data(is_serializable_cell_content, cell)
900 };
901 if let Some(content) = content {
902 if tracking.should_track(false) {
903 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
904 }
905 return Ok(Ok(TypedCellContent(
906 cell.type_id,
907 CellContent(Some(content.reference)),
908 )));
909 }
910
911 let in_progress = task.get_in_progress();
912 if matches!(
913 in_progress,
914 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
915 ) {
916 return Ok(Err(self
917 .listen_to_cell(&mut task, task_id, &reader_task, cell)
918 .0));
919 }
920 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
921
922 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
924 let Some(max_id) = max_id else {
925 let task_desc = task.get_task_description();
926 if tracking.should_track(true) {
927 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
928 }
929 bail!(
930 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
931 );
932 };
933 if cell.index >= max_id {
934 let task_desc = task.get_task_description();
935 if tracking.should_track(true) {
936 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
937 }
938 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
939 }
940
941 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
946 drop(reader_task);
947 if !new_listener {
948 return Ok(Err(listener));
949 }
950
951 let _span = tracing::trace_span!(
952 "recomputation",
953 cell_type = get_value_type(cell.type_id).ty.global_name,
954 cell_index = cell.index
955 )
956 .entered();
957
958 if is_cancelled {
960 bail!("{} was canceled", task.get_task_description());
961 }
962
963 let _ = task.add_scheduled(
964 TaskExecutionReason::CellNotAvailable,
965 EventDescription::new(|| task.get_task_desc_fn()),
966 );
967 ctx.schedule_task(task, TaskPriority::Initial);
968
969 Ok(Err(listener))
970 }
971
972 fn listen_to_cell(
973 &self,
974 task: &mut impl TaskGuard,
975 task_id: TaskId,
976 reader_task: &Option<impl TaskGuard>,
977 cell: CellId,
978 ) -> (EventListener, bool) {
979 let note = || {
980 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
981 move || {
982 if let Some(reader_desc) = reader_desc.as_ref() {
983 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
984 } else {
985 "try_read_task_cell (in progress, untracked)".to_string()
986 }
987 }
988 };
989 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
990 let listener = in_progress.event.listen_with_note(note);
992 return (listener, false);
993 }
994 let in_progress = InProgressCellState::new(task_id, cell);
995 let listener = in_progress.event.listen_with_note(note);
996 let old = task.insert_in_progress_cells(cell, in_progress);
997 debug_assert!(old.is_none(), "InProgressCell already exists");
998 (listener, true)
999 }
1000
1001 fn snapshot_and_persist(
1002 &self,
1003 parent_span: Option<tracing::Id>,
1004 reason: &str,
1005 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1006 ) -> Option<(Instant, bool)> {
1007 let snapshot_span =
1008 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1009 .entered();
1010 let start = Instant::now();
1011 let wall_start = SystemTime::now();
1015 debug_assert!(self.should_persist());
1016
1017 let suspended_operations;
1018 {
1019 let _span = tracing::info_span!("blocking").entered();
1020 let mut snapshot_request = self.snapshot_request.lock();
1021 snapshot_request.snapshot_requested = true;
1022 let active_operations = self
1023 .in_progress_operations
1024 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1025 if active_operations != 0 {
1026 self.operations_suspended
1027 .wait_while(&mut snapshot_request, |_| {
1028 self.in_progress_operations.load(Ordering::Relaxed)
1029 != SNAPSHOT_REQUESTED_BIT
1030 });
1031 }
1032 suspended_operations = snapshot_request
1033 .suspended_operations
1034 .iter()
1035 .map(|op| op.arc().clone())
1036 .collect::<Vec<_>>();
1037 }
1038 self.storage.start_snapshot();
1039 let mut persisted_task_cache_log = self
1040 .persisted_task_cache_log
1041 .as_ref()
1042 .map(|l| l.take(|i| i))
1043 .unwrap_or_default();
1044 let mut snapshot_request = self.snapshot_request.lock();
1045 snapshot_request.snapshot_requested = false;
1046 self.in_progress_operations
1047 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1048 self.snapshot_completed.notify_all();
1049 let snapshot_time = Instant::now();
1050 drop(snapshot_request);
1051
1052 #[cfg(feature = "print_cache_item_size")]
1053 #[derive(Default)]
1054 struct TaskCacheStats {
1055 data: usize,
1056 data_compressed: usize,
1057 data_count: usize,
1058 meta: usize,
1059 meta_compressed: usize,
1060 meta_count: usize,
1061 upper_count: usize,
1062 collectibles_count: usize,
1063 aggregated_collectibles_count: usize,
1064 children_count: usize,
1065 followers_count: usize,
1066 collectibles_dependents_count: usize,
1067 aggregated_dirty_containers_count: usize,
1068 output_size: usize,
1069 }
1070 #[cfg(feature = "print_cache_item_size")]
1071 impl TaskCacheStats {
1072 fn compressed_size(data: &[u8]) -> Result<usize> {
1073 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1074 data,
1075 &mut Vec::new(),
1076 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1077 )?)
1078 }
1079
1080 fn add_data(&mut self, data: &[u8]) {
1081 self.data += data.len();
1082 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1083 self.data_count += 1;
1084 }
1085
1086 fn add_meta(&mut self, data: &[u8]) {
1087 self.meta += data.len();
1088 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1089 self.meta_count += 1;
1090 }
1091
1092 fn add_counts(&mut self, storage: &TaskStorage) {
1093 let counts = storage.meta_counts();
1094 self.upper_count += counts.upper;
1095 self.collectibles_count += counts.collectibles;
1096 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1097 self.children_count += counts.children;
1098 self.followers_count += counts.followers;
1099 self.collectibles_dependents_count += counts.collectibles_dependents;
1100 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1101 if let Some(output) = storage.get_output() {
1102 use turbo_bincode::turbo_bincode_encode;
1103
1104 self.output_size += turbo_bincode_encode(&output)
1105 .map(|data| data.len())
1106 .unwrap_or(0);
1107 }
1108 }
1109 }
1110 #[cfg(feature = "print_cache_item_size")]
1111 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1112 Mutex::new(FxHashMap::default());
1113
1114 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1121 let encode_category = |task_id: TaskId,
1122 data: &TaskStorage,
1123 category: SpecificTaskDataCategory,
1124 buffer: &mut TurboBincodeBuffer|
1125 -> Option<TurboBincodeBuffer> {
1126 match encode_task_data(task_id, data, category, buffer) {
1127 Ok(encoded) => {
1128 #[cfg(feature = "print_cache_item_size")]
1129 {
1130 let mut stats = task_cache_stats.lock();
1131 let entry = stats
1132 .entry(self.get_task_name(task_id, turbo_tasks))
1133 .or_default();
1134 match category {
1135 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1136 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1137 }
1138 }
1139 Some(encoded)
1140 }
1141 Err(err) => {
1142 eprintln!(
1143 "Serializing task {} failed ({:?}): {:?}",
1144 self.debug_get_task_description(task_id),
1145 category,
1146 err
1147 );
1148 None
1149 }
1150 }
1151 };
1152 if task_id.is_transient() {
1153 unreachable!("transient task_ids should never be enqueued to be persisted");
1154 }
1155
1156 let encode_meta = inner.flags.meta_modified();
1157 let encode_data = inner.flags.data_modified();
1158
1159 #[cfg(feature = "print_cache_item_size")]
1160 if encode_meta {
1161 task_cache_stats
1162 .lock()
1163 .entry(self.get_task_name(task_id, turbo_tasks))
1164 .or_default()
1165 .add_counts(inner);
1166 }
1167
1168 let meta = if encode_meta {
1169 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1170 } else {
1171 None
1172 };
1173
1174 let data = if encode_data {
1175 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1176 } else {
1177 None
1178 };
1179
1180 SnapshotItem {
1181 task_id,
1182 meta,
1183 data,
1184 }
1185 };
1186
1187 let task_snapshots = self.storage.take_snapshot(&process);
1189
1190 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1191
1192 drop(snapshot_span);
1193 let snapshot_duration = start.elapsed();
1194 let task_count = task_snapshots.len();
1195
1196 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1197 return Some((snapshot_time, false));
1198 }
1199
1200 let persist_start = Instant::now();
1201 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1202 {
1203 if let Err(err) = self.backing_storage.save_snapshot(
1204 suspended_operations,
1205 persisted_task_cache_log,
1206 task_snapshots,
1207 ) {
1208 eprintln!("Persisting failed: {err:?}");
1209 return None;
1210 }
1211 #[cfg(feature = "print_cache_item_size")]
1212 {
1213 let mut task_cache_stats = task_cache_stats
1214 .into_inner()
1215 .into_iter()
1216 .collect::<Vec<_>>();
1217 if !task_cache_stats.is_empty() {
1218 use turbo_tasks::util::FormatBytes;
1219
1220 use crate::utils::markdown_table::print_markdown_table;
1221
1222 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1223 (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1224 .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1225 });
1226 println!(
1227 "Task cache stats: {} ({})",
1228 FormatBytes(
1229 task_cache_stats
1230 .iter()
1231 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1232 .sum::<usize>()
1233 ),
1234 FormatBytes(
1235 task_cache_stats
1236 .iter()
1237 .map(|(_, s)| s.data + s.meta)
1238 .sum::<usize>()
1239 )
1240 );
1241
1242 print_markdown_table(
1243 [
1244 "Task",
1245 " Total Size",
1246 " Data Size",
1247 " Data Count x Avg",
1248 " Data Count x Avg",
1249 " Meta Size",
1250 " Meta Count x Avg",
1251 " Meta Count x Avg",
1252 " Uppers",
1253 " Coll",
1254 " Agg Coll",
1255 " Children",
1256 " Followers",
1257 " Coll Deps",
1258 " Agg Dirty",
1259 " Output Size",
1260 ],
1261 task_cache_stats.iter(),
1262 |(task_desc, stats)| {
1263 [
1264 task_desc.to_string(),
1265 format!(
1266 " {} ({})",
1267 FormatBytes(stats.data_compressed + stats.meta_compressed),
1268 FormatBytes(stats.data + stats.meta)
1269 ),
1270 format!(
1271 " {} ({})",
1272 FormatBytes(stats.data_compressed),
1273 FormatBytes(stats.data)
1274 ),
1275 format!(" {} x", stats.data_count,),
1276 format!(
1277 "{} ({})",
1278 FormatBytes(
1279 stats
1280 .data_compressed
1281 .checked_div(stats.data_count)
1282 .unwrap_or(0)
1283 ),
1284 FormatBytes(
1285 stats.data.checked_div(stats.data_count).unwrap_or(0)
1286 ),
1287 ),
1288 format!(
1289 " {} ({})",
1290 FormatBytes(stats.meta_compressed),
1291 FormatBytes(stats.meta)
1292 ),
1293 format!(" {} x", stats.meta_count,),
1294 format!(
1295 "{} ({})",
1296 FormatBytes(
1297 stats
1298 .meta_compressed
1299 .checked_div(stats.meta_count)
1300 .unwrap_or(0)
1301 ),
1302 FormatBytes(
1303 stats.meta.checked_div(stats.meta_count).unwrap_or(0)
1304 ),
1305 ),
1306 format!(" {}", stats.upper_count),
1307 format!(" {}", stats.collectibles_count),
1308 format!(" {}", stats.aggregated_collectibles_count),
1309 format!(" {}", stats.children_count),
1310 format!(" {}", stats.followers_count),
1311 format!(" {}", stats.collectibles_dependents_count),
1312 format!(" {}", stats.aggregated_dirty_containers_count),
1313 format!(" {}", FormatBytes(stats.output_size)),
1314 ]
1315 },
1316 );
1317 }
1318 }
1319 }
1320
1321 let elapsed = start.elapsed();
1322 let persist_duration = persist_start.elapsed();
1323 if elapsed > Duration::from_secs(10) {
1325 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1326 "Finished writing to filesystem cache".to_string(),
1327 elapsed,
1328 )));
1329 }
1330
1331 let wall_start_ms = wall_start
1332 .duration_since(SystemTime::UNIX_EPOCH)
1333 .unwrap_or_default()
1334 .as_secs_f64()
1336 * 1000.0;
1337 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1338 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1339 "turbopack-persistence",
1340 wall_start_ms,
1341 wall_end_ms,
1342 vec![
1343 ("reason", serde_json::Value::from(reason)),
1344 (
1345 "snapshot_duration_ms",
1346 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1347 ),
1348 (
1349 "persist_duration_ms",
1350 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1351 ),
1352 ("task_count", serde_json::Value::from(task_count)),
1353 ],
1354 )));
1355
1356 Some((snapshot_time, true))
1357 }
1358
1359 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1360 if self.should_restore() {
1361 let uncompleted_operations = self
1365 .backing_storage
1366 .uncompleted_operations()
1367 .expect("Failed to get uncompleted operations");
1368 if !uncompleted_operations.is_empty() {
1369 let mut ctx = self.execute_context(turbo_tasks);
1370 for op in uncompleted_operations {
1371 op.execute(&mut ctx);
1372 }
1373 }
1374 }
1375
1376 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1379 let _span = trace_span!("persisting background job").entered();
1381 let _span = tracing::info_span!("thread").entered();
1382 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1383 }
1384 }
1385
1386 fn stopping(&self) {
1387 self.stopping.store(true, Ordering::Release);
1388 self.stopping_event.notify(usize::MAX);
1389 }
1390
1391 #[allow(unused_variables)]
1392 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1393 #[cfg(feature = "verify_aggregation_graph")]
1394 {
1395 self.is_idle.store(false, Ordering::Release);
1396 self.verify_aggregation_graph(turbo_tasks, false);
1397 }
1398 if self.should_persist() {
1399 self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1400 }
1401 drop_contents(&self.task_cache);
1402 self.storage.drop_contents();
1403 if let Err(err) = self.backing_storage.shutdown() {
1404 println!("Shutting down failed: {err}");
1405 }
1406 }
1407
1408 #[allow(unused_variables)]
1409 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1410 self.idle_start_event.notify(usize::MAX);
1411
1412 #[cfg(feature = "verify_aggregation_graph")]
1413 {
1414 use tokio::select;
1415
1416 self.is_idle.store(true, Ordering::Release);
1417 let this = self.clone();
1418 let turbo_tasks = turbo_tasks.pin();
1419 tokio::task::spawn(async move {
1420 select! {
1421 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1422 }
1424 _ = this.idle_end_event.listen() => {
1425 return;
1426 }
1427 }
1428 if !this.is_idle.load(Ordering::Relaxed) {
1429 return;
1430 }
1431 this.verify_aggregation_graph(&*turbo_tasks, true);
1432 });
1433 }
1434 }
1435
1436 fn idle_end(&self) {
1437 #[cfg(feature = "verify_aggregation_graph")]
1438 self.is_idle.store(false, Ordering::Release);
1439 self.idle_end_event.notify(usize::MAX);
1440 }
1441
1442 fn get_or_create_persistent_task(
1443 &self,
1444 task_type: CachedTaskType,
1445 parent_task: Option<TaskId>,
1446 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1447 ) -> TaskId {
1448 let is_root = task_type.native_fn.is_root;
1449
1450 if let Some(task_id) = self.task_cache.get(&task_type) {
1452 let task_id = *task_id;
1453 self.track_cache_hit(&task_type);
1454 self.connect_child(
1455 parent_task,
1456 task_id,
1457 Some(ArcOrOwned::Owned(task_type)),
1458 turbo_tasks,
1459 );
1460 return task_id;
1461 }
1462
1463 let mut ctx = self.execute_context(turbo_tasks);
1465
1466 let mut is_new = false;
1467 let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1468 self.track_cache_hit(&task_type);
1471 let task_type = match raw_entry(&self.task_cache, &task_type) {
1472 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1473 RawEntry::Vacant(e) => {
1474 let task_type = Arc::new(task_type);
1475 e.insert(task_type.clone(), task_id);
1476 ArcOrOwned::Arc(task_type)
1477 }
1478 };
1479 (task_id, task_type)
1480 } else {
1481 let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1484 RawEntry::Occupied(e) => {
1485 let task_id = *e.get();
1488 drop(e);
1489 self.track_cache_hit(&task_type);
1490 (task_id, ArcOrOwned::Owned(task_type))
1491 }
1492 RawEntry::Vacant(e) => {
1493 let task_type = Arc::new(task_type);
1495 let task_id = self.persisted_task_id_factory.get();
1496 e.insert(task_type.clone(), task_id);
1497 self.track_cache_miss(&task_type);
1499 is_new = true;
1500 if let Some(log) = &self.persisted_task_cache_log {
1501 log.lock(task_id).push((task_type.clone(), task_id));
1502 }
1503 (task_id, ArcOrOwned::Arc(task_type))
1504 }
1505 };
1506 (task_id, task_type)
1507 };
1508 if is_new && is_root {
1509 AggregationUpdateQueue::run(
1510 AggregationUpdateJob::UpdateAggregationNumber {
1511 task_id,
1512 base_aggregation_number: u32::MAX,
1513 distance: None,
1514 },
1515 &mut ctx,
1516 );
1517 }
1518 operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1520
1521 task_id
1522 }
1523
1524 fn get_or_create_transient_task(
1525 &self,
1526 task_type: CachedTaskType,
1527 parent_task: Option<TaskId>,
1528 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1529 ) -> TaskId {
1530 let is_root = task_type.native_fn.is_root;
1531
1532 if let Some(parent_task) = parent_task
1533 && !parent_task.is_transient()
1534 {
1535 self.panic_persistent_calling_transient(
1536 self.debug_get_task_description(parent_task),
1537 Some(&task_type),
1538 None,
1539 );
1540 }
1541 if let Some(task_id) = self.task_cache.get(&task_type) {
1543 let task_id = *task_id;
1544 self.track_cache_hit(&task_type);
1545 self.connect_child(
1546 parent_task,
1547 task_id,
1548 Some(ArcOrOwned::Owned(task_type)),
1549 turbo_tasks,
1550 );
1551 return task_id;
1552 }
1553 match raw_entry(&self.task_cache, &task_type) {
1555 RawEntry::Occupied(e) => {
1556 let task_id = *e.get();
1557 drop(e);
1558 self.track_cache_hit(&task_type);
1559 self.connect_child(
1560 parent_task,
1561 task_id,
1562 Some(ArcOrOwned::Owned(task_type)),
1563 turbo_tasks,
1564 );
1565 task_id
1566 }
1567 RawEntry::Vacant(e) => {
1568 let task_type = Arc::new(task_type);
1569 let task_id = self.transient_task_id_factory.get();
1570 e.insert(task_type.clone(), task_id);
1571 self.track_cache_miss(&task_type);
1572
1573 if is_root {
1574 let mut ctx = self.execute_context(turbo_tasks);
1575 AggregationUpdateQueue::run(
1576 AggregationUpdateJob::UpdateAggregationNumber {
1577 task_id,
1578 base_aggregation_number: u32::MAX,
1579 distance: None,
1580 },
1581 &mut ctx,
1582 );
1583 }
1584
1585 self.connect_child(
1586 parent_task,
1587 task_id,
1588 Some(ArcOrOwned::Arc(task_type)),
1589 turbo_tasks,
1590 );
1591
1592 task_id
1593 }
1594 }
1595 }
1596
1597 fn debug_trace_transient_task(
1600 &self,
1601 task_type: &CachedTaskType,
1602 cell_id: Option<CellId>,
1603 ) -> DebugTraceTransientTask {
1604 fn inner_id(
1607 backend: &TurboTasksBackendInner<impl BackingStorage>,
1608 task_id: TaskId,
1609 cell_type_id: Option<ValueTypeId>,
1610 visited_set: &mut FxHashSet<TaskId>,
1611 ) -> DebugTraceTransientTask {
1612 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1613 if visited_set.contains(&task_id) {
1614 let task_name = task_type.get_name();
1615 DebugTraceTransientTask::Collapsed {
1616 task_name,
1617 cell_type_id,
1618 }
1619 } else {
1620 inner_cached(backend, &task_type, cell_type_id, visited_set)
1621 }
1622 } else {
1623 DebugTraceTransientTask::Uncached { cell_type_id }
1624 }
1625 }
1626 fn inner_cached(
1627 backend: &TurboTasksBackendInner<impl BackingStorage>,
1628 task_type: &CachedTaskType,
1629 cell_type_id: Option<ValueTypeId>,
1630 visited_set: &mut FxHashSet<TaskId>,
1631 ) -> DebugTraceTransientTask {
1632 let task_name = task_type.get_name();
1633
1634 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1635 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1636 return None;
1640 };
1641 if task_id.is_transient() {
1642 Some(Box::new(inner_id(
1643 backend,
1644 task_id,
1645 cause_self_raw_vc.try_get_type_id(),
1646 visited_set,
1647 )))
1648 } else {
1649 None
1650 }
1651 });
1652 let cause_args = task_type
1653 .arg
1654 .get_raw_vcs()
1655 .into_iter()
1656 .filter_map(|raw_vc| {
1657 let Some(task_id) = raw_vc.try_get_task_id() else {
1658 return None;
1660 };
1661 if !task_id.is_transient() {
1662 return None;
1663 }
1664 Some((task_id, raw_vc.try_get_type_id()))
1665 })
1666 .collect::<IndexSet<_>>() .into_iter()
1668 .map(|(task_id, cell_type_id)| {
1669 inner_id(backend, task_id, cell_type_id, visited_set)
1670 })
1671 .collect();
1672
1673 DebugTraceTransientTask::Cached {
1674 task_name,
1675 cell_type_id,
1676 cause_self,
1677 cause_args,
1678 }
1679 }
1680 inner_cached(
1681 self,
1682 task_type,
1683 cell_id.map(|c| c.type_id),
1684 &mut FxHashSet::default(),
1685 )
1686 }
1687
1688 fn invalidate_task(
1689 &self,
1690 task_id: TaskId,
1691 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1692 ) {
1693 if !self.should_track_dependencies() {
1694 panic!("Dependency tracking is disabled so invalidation is not allowed");
1695 }
1696 operation::InvalidateOperation::run(
1697 smallvec![task_id],
1698 #[cfg(feature = "trace_task_dirty")]
1699 TaskDirtyCause::Invalidator,
1700 self.execute_context(turbo_tasks),
1701 );
1702 }
1703
1704 fn invalidate_tasks(
1705 &self,
1706 tasks: &[TaskId],
1707 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1708 ) {
1709 if !self.should_track_dependencies() {
1710 panic!("Dependency tracking is disabled so invalidation is not allowed");
1711 }
1712 operation::InvalidateOperation::run(
1713 tasks.iter().copied().collect(),
1714 #[cfg(feature = "trace_task_dirty")]
1715 TaskDirtyCause::Unknown,
1716 self.execute_context(turbo_tasks),
1717 );
1718 }
1719
1720 fn invalidate_tasks_set(
1721 &self,
1722 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1723 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1724 ) {
1725 if !self.should_track_dependencies() {
1726 panic!("Dependency tracking is disabled so invalidation is not allowed");
1727 }
1728 operation::InvalidateOperation::run(
1729 tasks.iter().copied().collect(),
1730 #[cfg(feature = "trace_task_dirty")]
1731 TaskDirtyCause::Unknown,
1732 self.execute_context(turbo_tasks),
1733 );
1734 }
1735
1736 fn invalidate_serialization(
1737 &self,
1738 task_id: TaskId,
1739 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1740 ) {
1741 if task_id.is_transient() {
1742 return;
1743 }
1744 let mut ctx = self.execute_context(turbo_tasks);
1745 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1746 task.invalidate_serialization();
1747 }
1748
1749 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1750 let task = self.storage.access_mut(task_id);
1751 if let Some(value) = task.get_persistent_task_type() {
1752 format!("{task_id:?} {}", value)
1753 } else if let Some(value) = task.get_transient_task_type() {
1754 format!("{task_id:?} {}", value)
1755 } else {
1756 format!("{task_id:?} unknown")
1757 }
1758 }
1759
1760 fn get_task_name(
1761 &self,
1762 task_id: TaskId,
1763 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1764 ) -> String {
1765 let mut ctx = self.execute_context(turbo_tasks);
1766 let task = ctx.task(task_id, TaskDataCategory::Data);
1767 if let Some(value) = task.get_persistent_task_type() {
1768 value.to_string()
1769 } else if let Some(value) = task.get_transient_task_type() {
1770 value.to_string()
1771 } else {
1772 "unknown".to_string()
1773 }
1774 }
1775
1776 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1777 let task = self.storage.access_mut(task_id);
1778 task.get_persistent_task_type().cloned()
1779 }
1780
1781 fn task_execution_canceled(
1782 &self,
1783 task_id: TaskId,
1784 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1785 ) {
1786 let mut ctx = self.execute_context(turbo_tasks);
1787 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1788 if let Some(in_progress) = task.take_in_progress() {
1789 match in_progress {
1790 InProgressState::Scheduled {
1791 done_event,
1792 reason: _,
1793 } => done_event.notify(usize::MAX),
1794 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1795 done_event.notify(usize::MAX)
1796 }
1797 InProgressState::Canceled => {}
1798 }
1799 }
1800 let old = task.set_in_progress(InProgressState::Canceled);
1801 debug_assert!(old.is_none(), "InProgress already exists");
1802 }
1803
1804 fn try_start_task_execution(
1805 &self,
1806 task_id: TaskId,
1807 priority: TaskPriority,
1808 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1809 ) -> Option<TaskExecutionSpec<'_>> {
1810 let execution_reason;
1811 let task_type;
1812 {
1813 let mut ctx = self.execute_context(turbo_tasks);
1814 let mut task = ctx.task(task_id, TaskDataCategory::All);
1815 task_type = task.get_task_type().to_owned();
1816 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1817 if let Some(tasks) = task.prefetch() {
1818 drop(task);
1819 ctx.prepare_tasks(tasks);
1820 task = ctx.task(task_id, TaskDataCategory::All);
1821 }
1822 let in_progress = task.take_in_progress()?;
1823 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1824 let old = task.set_in_progress(in_progress);
1825 debug_assert!(old.is_none(), "InProgress already exists");
1826 return None;
1827 };
1828 execution_reason = reason;
1829 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1830 InProgressStateInner {
1831 stale: false,
1832 once_task,
1833 done_event,
1834 session_dependent: false,
1835 marked_as_completed: false,
1836 new_children: Default::default(),
1837 },
1838 )));
1839 debug_assert!(old.is_none(), "InProgress already exists");
1840
1841 enum Collectible {
1843 Current(CollectibleRef, i32),
1844 Outdated(CollectibleRef),
1845 }
1846 let collectibles = task
1847 .iter_collectibles()
1848 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1849 .chain(
1850 task.iter_outdated_collectibles()
1851 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1852 )
1853 .collect::<Vec<_>>();
1854 for collectible in collectibles {
1855 match collectible {
1856 Collectible::Current(collectible, value) => {
1857 let _ = task.insert_outdated_collectible(collectible, value);
1858 }
1859 Collectible::Outdated(collectible) => {
1860 if task
1861 .collectibles()
1862 .is_none_or(|m| m.get(&collectible).is_none())
1863 {
1864 task.remove_outdated_collectibles(&collectible);
1865 }
1866 }
1867 }
1868 }
1869
1870 if self.should_track_dependencies() {
1871 let cell_dependencies = task.iter_cell_dependencies().collect();
1873 task.set_outdated_cell_dependencies(cell_dependencies);
1874
1875 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1876 task.set_outdated_output_dependencies(outdated_output_dependencies);
1877 }
1878 }
1879
1880 let (span, future) = match task_type {
1881 TaskType::Cached(task_type) => {
1882 let CachedTaskType {
1883 native_fn,
1884 this,
1885 arg,
1886 } = &*task_type;
1887 (
1888 native_fn.span(task_id.persistence(), execution_reason, priority),
1889 native_fn.execute(*this, &**arg),
1890 )
1891 }
1892 TaskType::Transient(task_type) => {
1893 let span = tracing::trace_span!("turbo_tasks::root_task");
1894 let future = match &*task_type {
1895 TransientTask::Root(f) => f(),
1896 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1897 };
1898 (span, future)
1899 }
1900 };
1901 Some(TaskExecutionSpec { future, span })
1902 }
1903
1904 fn task_execution_completed(
1905 &self,
1906 task_id: TaskId,
1907 result: Result<RawVc, TurboTasksExecutionError>,
1908 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1909 #[cfg(feature = "verify_determinism")] stateful: bool,
1910 has_invalidator: bool,
1911 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1912 ) -> bool {
1913 #[cfg(not(feature = "trace_task_details"))]
1928 let span = tracing::trace_span!(
1929 "task execution completed",
1930 new_children = tracing::field::Empty
1931 )
1932 .entered();
1933 #[cfg(feature = "trace_task_details")]
1934 let span = tracing::trace_span!(
1935 "task execution completed",
1936 task_id = display(task_id),
1937 result = match result.as_ref() {
1938 Ok(value) => display(either::Either::Left(value)),
1939 Err(err) => display(either::Either::Right(err)),
1940 },
1941 new_children = tracing::field::Empty,
1942 immutable = tracing::field::Empty,
1943 new_output = tracing::field::Empty,
1944 output_dependents = tracing::field::Empty,
1945 stale = tracing::field::Empty,
1946 )
1947 .entered();
1948
1949 let is_error = result.is_err();
1950
1951 let mut ctx = self.execute_context(turbo_tasks);
1952
1953 let Some(TaskExecutionCompletePrepareResult {
1954 new_children,
1955 is_now_immutable,
1956 #[cfg(feature = "verify_determinism")]
1957 no_output_set,
1958 new_output,
1959 output_dependent_tasks,
1960 }) = self.task_execution_completed_prepare(
1961 &mut ctx,
1962 #[cfg(feature = "trace_task_details")]
1963 &span,
1964 task_id,
1965 result,
1966 cell_counters,
1967 #[cfg(feature = "verify_determinism")]
1968 stateful,
1969 has_invalidator,
1970 )
1971 else {
1972 #[cfg(feature = "trace_task_details")]
1974 span.record("stale", "prepare");
1975 return true;
1976 };
1977
1978 #[cfg(feature = "trace_task_details")]
1979 span.record("new_output", new_output.is_some());
1980 #[cfg(feature = "trace_task_details")]
1981 span.record("output_dependents", output_dependent_tasks.len());
1982
1983 if !output_dependent_tasks.is_empty() {
1988 self.task_execution_completed_invalidate_output_dependent(
1989 &mut ctx,
1990 task_id,
1991 output_dependent_tasks,
1992 );
1993 }
1994
1995 let has_new_children = !new_children.is_empty();
1996 span.record("new_children", new_children.len());
1997
1998 if has_new_children {
1999 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2000 }
2001
2002 if has_new_children
2003 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2004 {
2005 #[cfg(feature = "trace_task_details")]
2007 span.record("stale", "connect");
2008 return true;
2009 }
2010
2011 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2012 &mut ctx,
2013 task_id,
2014 #[cfg(feature = "verify_determinism")]
2015 no_output_set,
2016 new_output,
2017 is_now_immutable,
2018 );
2019 if stale {
2020 #[cfg(feature = "trace_task_details")]
2022 span.record("stale", "finish");
2023 return true;
2024 }
2025
2026 let removed_data =
2027 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2028
2029 drop(removed_data);
2031 drop(in_progress_cells);
2032
2033 false
2034 }
2035
2036 fn task_execution_completed_prepare(
2037 &self,
2038 ctx: &mut impl ExecuteContext<'_>,
2039 #[cfg(feature = "trace_task_details")] span: &Span,
2040 task_id: TaskId,
2041 result: Result<RawVc, TurboTasksExecutionError>,
2042 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2043 #[cfg(feature = "verify_determinism")] stateful: bool,
2044 has_invalidator: bool,
2045 ) -> Option<TaskExecutionCompletePrepareResult> {
2046 let mut task = ctx.task(task_id, TaskDataCategory::All);
2047 let Some(in_progress) = task.get_in_progress_mut() else {
2048 panic!("Task execution completed, but task is not in progress: {task:#?}");
2049 };
2050 if matches!(in_progress, InProgressState::Canceled) {
2051 return Some(TaskExecutionCompletePrepareResult {
2052 new_children: Default::default(),
2053 is_now_immutable: false,
2054 #[cfg(feature = "verify_determinism")]
2055 no_output_set: false,
2056 new_output: None,
2057 output_dependent_tasks: Default::default(),
2058 });
2059 }
2060 let &mut InProgressState::InProgress(box InProgressStateInner {
2061 stale,
2062 ref mut new_children,
2063 session_dependent,
2064 once_task: is_once_task,
2065 ..
2066 }) = in_progress
2067 else {
2068 panic!("Task execution completed, but task is not in progress: {task:#?}");
2069 };
2070
2071 #[cfg(not(feature = "no_fast_stale"))]
2073 if stale && !is_once_task {
2074 let Some(InProgressState::InProgress(box InProgressStateInner {
2075 done_event,
2076 mut new_children,
2077 ..
2078 })) = task.take_in_progress()
2079 else {
2080 unreachable!();
2081 };
2082 let old = task.set_in_progress(InProgressState::Scheduled {
2083 done_event,
2084 reason: TaskExecutionReason::Stale,
2085 });
2086 debug_assert!(old.is_none(), "InProgress already exists");
2087 for task in task.iter_children() {
2090 new_children.remove(&task);
2091 }
2092 drop(task);
2093
2094 AggregationUpdateQueue::run(
2097 AggregationUpdateJob::DecreaseActiveCounts {
2098 task_ids: new_children.into_iter().collect(),
2099 },
2100 ctx,
2101 );
2102 return None;
2103 }
2104
2105 let mut new_children = take(new_children);
2107
2108 #[cfg(feature = "verify_determinism")]
2110 if stateful {
2111 task.set_stateful(true);
2112 }
2113
2114 if has_invalidator {
2116 task.set_invalidator(true);
2117 }
2118
2119 let old_counters: FxHashMap<_, _> = task
2121 .iter_cell_type_max_index()
2122 .map(|(&k, &v)| (k, v))
2123 .collect();
2124 let mut counters_to_remove = old_counters.clone();
2125
2126 for (&cell_type, &max_index) in cell_counters.iter() {
2127 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2128 if old_max_index != max_index {
2129 task.insert_cell_type_max_index(cell_type, max_index);
2130 }
2131 } else {
2132 task.insert_cell_type_max_index(cell_type, max_index);
2133 }
2134 }
2135 for (cell_type, _) in counters_to_remove {
2136 task.remove_cell_type_max_index(&cell_type);
2137 }
2138
2139 let mut queue = AggregationUpdateQueue::new();
2140
2141 let mut old_edges = Vec::new();
2142
2143 let has_children = !new_children.is_empty();
2144 let is_immutable = task.immutable();
2145 let task_dependencies_for_immutable =
2146 if !is_immutable
2148 && !session_dependent
2150 && !task.invalidator()
2152 && task.is_collectibles_dependencies_empty()
2154 {
2155 Some(
2156 task.iter_output_dependencies()
2158 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2159 .collect::<FxHashSet<_>>(),
2160 )
2161 } else {
2162 None
2163 };
2164
2165 if has_children {
2166 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2168
2169 old_edges.extend(
2171 task.iter_children()
2172 .filter(|task| !new_children.remove(task))
2173 .map(OutdatedEdge::Child),
2174 );
2175 } else {
2176 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2177 }
2178
2179 old_edges.extend(
2180 task.iter_outdated_collectibles()
2181 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2182 );
2183
2184 if self.should_track_dependencies() {
2185 old_edges.extend(
2192 task.iter_outdated_cell_dependencies()
2193 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2194 );
2195 old_edges.extend(
2196 task.iter_outdated_output_dependencies()
2197 .map(OutdatedEdge::OutputDependency),
2198 );
2199 }
2200
2201 let current_output = task.get_output();
2203 #[cfg(feature = "verify_determinism")]
2204 let no_output_set = current_output.is_none();
2205 let new_output = match result {
2206 Ok(RawVc::TaskOutput(output_task_id)) => {
2207 if let Some(OutputValue::Output(current_task_id)) = current_output
2208 && *current_task_id == output_task_id
2209 {
2210 None
2211 } else {
2212 Some(OutputValue::Output(output_task_id))
2213 }
2214 }
2215 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2216 if let Some(OutputValue::Cell(CellRef {
2217 task: current_task_id,
2218 cell: current_cell,
2219 })) = current_output
2220 && *current_task_id == output_task_id
2221 && *current_cell == cell
2222 {
2223 None
2224 } else {
2225 Some(OutputValue::Cell(CellRef {
2226 task: output_task_id,
2227 cell,
2228 }))
2229 }
2230 }
2231 Ok(RawVc::LocalOutput(..)) => {
2232 panic!("Non-local tasks must not return a local Vc");
2233 }
2234 Err(err) => {
2235 if let Some(OutputValue::Error(old_error)) = current_output
2236 && **old_error == err
2237 {
2238 None
2239 } else {
2240 Some(OutputValue::Error(Arc::new((&err).into())))
2241 }
2242 }
2243 };
2244 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2245 if new_output.is_some() && ctx.should_track_dependencies() {
2247 output_dependent_tasks = task.iter_output_dependent().collect();
2248 }
2249
2250 drop(task);
2251
2252 let mut is_now_immutable = false;
2254 if let Some(dependencies) = task_dependencies_for_immutable
2255 && dependencies
2256 .iter()
2257 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2258 {
2259 is_now_immutable = true;
2260 }
2261 #[cfg(feature = "trace_task_details")]
2262 span.record("immutable", is_immutable || is_now_immutable);
2263
2264 if !queue.is_empty() || !old_edges.is_empty() {
2265 #[cfg(feature = "trace_task_completion")]
2266 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2267 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2271 }
2272
2273 Some(TaskExecutionCompletePrepareResult {
2274 new_children,
2275 is_now_immutable,
2276 #[cfg(feature = "verify_determinism")]
2277 no_output_set,
2278 new_output,
2279 output_dependent_tasks,
2280 })
2281 }
2282
2283 fn task_execution_completed_invalidate_output_dependent(
2284 &self,
2285 ctx: &mut impl ExecuteContext<'_>,
2286 task_id: TaskId,
2287 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2288 ) {
2289 debug_assert!(!output_dependent_tasks.is_empty());
2290
2291 if output_dependent_tasks.len() > 1 {
2292 ctx.prepare_tasks(
2293 output_dependent_tasks
2294 .iter()
2295 .map(|&id| (id, TaskDataCategory::All)),
2296 );
2297 }
2298
2299 fn process_output_dependents(
2300 ctx: &mut impl ExecuteContext<'_>,
2301 task_id: TaskId,
2302 dependent_task_id: TaskId,
2303 queue: &mut AggregationUpdateQueue,
2304 ) {
2305 #[cfg(feature = "trace_task_output_dependencies")]
2306 let span = tracing::trace_span!(
2307 "invalidate output dependency",
2308 task = %task_id,
2309 dependent_task = %dependent_task_id,
2310 result = tracing::field::Empty,
2311 )
2312 .entered();
2313 let mut make_stale = true;
2314 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2315 let transient_task_type = dependent.get_transient_task_type();
2316 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2317 #[cfg(feature = "trace_task_output_dependencies")]
2319 span.record("result", "once task");
2320 return;
2321 }
2322 if dependent.outdated_output_dependencies_contains(&task_id) {
2323 #[cfg(feature = "trace_task_output_dependencies")]
2324 span.record("result", "outdated dependency");
2325 make_stale = false;
2330 } else if !dependent.output_dependencies_contains(&task_id) {
2331 #[cfg(feature = "trace_task_output_dependencies")]
2334 span.record("result", "no backward dependency");
2335 return;
2336 }
2337 make_task_dirty_internal(
2338 dependent,
2339 dependent_task_id,
2340 make_stale,
2341 #[cfg(feature = "trace_task_dirty")]
2342 TaskDirtyCause::OutputChange { task_id },
2343 queue,
2344 ctx,
2345 );
2346 #[cfg(feature = "trace_task_output_dependencies")]
2347 span.record("result", "marked dirty");
2348 }
2349
2350 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2351 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2352 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2353 let _ = scope_and_block(chunks.len(), |scope| {
2354 for chunk in chunks {
2355 let child_ctx = ctx.child_context();
2356 scope.spawn(move || {
2357 let mut ctx = child_ctx.create();
2358 let mut queue = AggregationUpdateQueue::new();
2359 for dependent_task_id in chunk {
2360 process_output_dependents(
2361 &mut ctx,
2362 task_id,
2363 dependent_task_id,
2364 &mut queue,
2365 )
2366 }
2367 queue.execute(&mut ctx);
2368 });
2369 }
2370 });
2371 } else {
2372 let mut queue = AggregationUpdateQueue::new();
2373 for dependent_task_id in output_dependent_tasks {
2374 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2375 }
2376 queue.execute(ctx);
2377 }
2378 }
2379
2380 fn task_execution_completed_unfinished_children_dirty(
2381 &self,
2382 ctx: &mut impl ExecuteContext<'_>,
2383 new_children: &FxHashSet<TaskId>,
2384 ) {
2385 debug_assert!(!new_children.is_empty());
2386
2387 let mut queue = AggregationUpdateQueue::new();
2388 ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2389 if !child_task.has_output() {
2390 let child_id = child_task.id();
2391 make_task_dirty_internal(
2392 child_task,
2393 child_id,
2394 false,
2395 #[cfg(feature = "trace_task_dirty")]
2396 TaskDirtyCause::InitialDirty,
2397 &mut queue,
2398 ctx,
2399 );
2400 }
2401 });
2402
2403 queue.execute(ctx);
2404 }
2405
2406 fn task_execution_completed_connect(
2407 &self,
2408 ctx: &mut impl ExecuteContext<'_>,
2409 task_id: TaskId,
2410 new_children: FxHashSet<TaskId>,
2411 ) -> bool {
2412 debug_assert!(!new_children.is_empty());
2413
2414 let mut task = ctx.task(task_id, TaskDataCategory::All);
2415 let Some(in_progress) = task.get_in_progress() else {
2416 panic!("Task execution completed, but task is not in progress: {task:#?}");
2417 };
2418 if matches!(in_progress, InProgressState::Canceled) {
2419 return false;
2421 }
2422 let InProgressState::InProgress(box InProgressStateInner {
2423 #[cfg(not(feature = "no_fast_stale"))]
2424 stale,
2425 once_task: is_once_task,
2426 ..
2427 }) = in_progress
2428 else {
2429 panic!("Task execution completed, but task is not in progress: {task:#?}");
2430 };
2431
2432 #[cfg(not(feature = "no_fast_stale"))]
2434 if *stale && !is_once_task {
2435 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2436 task.take_in_progress()
2437 else {
2438 unreachable!();
2439 };
2440 let old = task.set_in_progress(InProgressState::Scheduled {
2441 done_event,
2442 reason: TaskExecutionReason::Stale,
2443 });
2444 debug_assert!(old.is_none(), "InProgress already exists");
2445 drop(task);
2446
2447 AggregationUpdateQueue::run(
2450 AggregationUpdateJob::DecreaseActiveCounts {
2451 task_ids: new_children.into_iter().collect(),
2452 },
2453 ctx,
2454 );
2455 return true;
2456 }
2457
2458 let has_active_count = ctx.should_track_activeness()
2459 && task
2460 .get_activeness()
2461 .is_some_and(|activeness| activeness.active_counter > 0);
2462 connect_children(
2463 ctx,
2464 task_id,
2465 task,
2466 new_children,
2467 has_active_count,
2468 ctx.should_track_activeness(),
2469 );
2470
2471 false
2472 }
2473
2474 fn task_execution_completed_finish(
2475 &self,
2476 ctx: &mut impl ExecuteContext<'_>,
2477 task_id: TaskId,
2478 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2479 new_output: Option<OutputValue>,
2480 is_now_immutable: bool,
2481 ) -> (
2482 bool,
2483 Option<
2484 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2485 >,
2486 ) {
2487 let mut task = ctx.task(task_id, TaskDataCategory::All);
2488 let Some(in_progress) = task.take_in_progress() else {
2489 panic!("Task execution completed, but task is not in progress: {task:#?}");
2490 };
2491 if matches!(in_progress, InProgressState::Canceled) {
2492 return (false, None);
2494 }
2495 let InProgressState::InProgress(box InProgressStateInner {
2496 done_event,
2497 once_task: is_once_task,
2498 stale,
2499 session_dependent,
2500 marked_as_completed: _,
2501 new_children,
2502 }) = in_progress
2503 else {
2504 panic!("Task execution completed, but task is not in progress: {task:#?}");
2505 };
2506 debug_assert!(new_children.is_empty());
2507
2508 if stale && !is_once_task {
2510 let old = task.set_in_progress(InProgressState::Scheduled {
2511 done_event,
2512 reason: TaskExecutionReason::Stale,
2513 });
2514 debug_assert!(old.is_none(), "InProgress already exists");
2515 return (true, None);
2516 }
2517
2518 let mut old_content = None;
2520 if let Some(value) = new_output {
2521 old_content = task.set_output(value);
2522 }
2523
2524 if is_now_immutable {
2527 task.set_immutable(true);
2528 }
2529
2530 let in_progress_cells = task.take_in_progress_cells();
2532 if let Some(ref cells) = in_progress_cells {
2533 for state in cells.values() {
2534 state.event.notify(usize::MAX);
2535 }
2536 }
2537
2538 let old_dirtyness = task.get_dirty().cloned();
2540 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2541 None => (false, false),
2542 Some(Dirtyness::Dirty(_)) => (true, false),
2543 Some(Dirtyness::SessionDependent) => {
2544 let clean_in_current_session = task.current_session_clean();
2545 (true, clean_in_current_session)
2546 }
2547 };
2548
2549 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2551 (Some(Dirtyness::SessionDependent), true, true)
2552 } else {
2553 (None, false, false)
2554 };
2555
2556 let dirty_changed = old_dirtyness != new_dirtyness;
2558 if dirty_changed {
2559 if let Some(value) = new_dirtyness {
2560 task.set_dirty(value);
2561 } else if old_dirtyness.is_some() {
2562 task.take_dirty();
2563 }
2564 }
2565 if old_current_session_self_clean != new_current_session_self_clean {
2566 if new_current_session_self_clean {
2567 task.set_current_session_clean(true);
2568 } else if old_current_session_self_clean {
2569 task.set_current_session_clean(false);
2570 }
2571 }
2572
2573 let data_update = if old_self_dirty != new_self_dirty
2575 || old_current_session_self_clean != new_current_session_self_clean
2576 {
2577 let dirty_container_count = task
2578 .get_aggregated_dirty_container_count()
2579 .cloned()
2580 .unwrap_or_default();
2581 let current_session_clean_container_count = task
2582 .get_aggregated_current_session_clean_container_count()
2583 .copied()
2584 .unwrap_or_default();
2585 let result = ComputeDirtyAndCleanUpdate {
2586 old_dirty_container_count: dirty_container_count,
2587 new_dirty_container_count: dirty_container_count,
2588 old_current_session_clean_container_count: current_session_clean_container_count,
2589 new_current_session_clean_container_count: current_session_clean_container_count,
2590 old_self_dirty,
2591 new_self_dirty,
2592 old_current_session_self_clean,
2593 new_current_session_self_clean,
2594 }
2595 .compute();
2596 if result.dirty_count_update - result.current_session_clean_update < 0 {
2597 if let Some(activeness_state) = task.get_activeness_mut() {
2599 activeness_state.all_clean_event.notify(usize::MAX);
2600 activeness_state.unset_active_until_clean();
2601 if activeness_state.is_empty() {
2602 task.take_activeness();
2603 }
2604 }
2605 }
2606 result
2607 .aggregated_update(task_id)
2608 .and_then(|aggregated_update| {
2609 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2610 })
2611 } else {
2612 None
2613 };
2614
2615 #[cfg(feature = "verify_determinism")]
2616 let reschedule =
2617 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2618 #[cfg(not(feature = "verify_determinism"))]
2619 let reschedule = false;
2620 if reschedule {
2621 let old = task.set_in_progress(InProgressState::Scheduled {
2622 done_event,
2623 reason: TaskExecutionReason::Stale,
2624 });
2625 debug_assert!(old.is_none(), "InProgress already exists");
2626 drop(task);
2627 } else {
2628 drop(task);
2629
2630 done_event.notify(usize::MAX);
2632 }
2633
2634 drop(old_content);
2635
2636 if let Some(data_update) = data_update {
2637 AggregationUpdateQueue::run(data_update, ctx);
2638 }
2639
2640 (reschedule, in_progress_cells)
2642 }
2643
2644 fn task_execution_completed_cleanup(
2645 &self,
2646 ctx: &mut impl ExecuteContext<'_>,
2647 task_id: TaskId,
2648 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2649 is_error: bool,
2650 ) -> Vec<SharedReference> {
2651 let mut task = ctx.task(task_id, TaskDataCategory::All);
2652 let mut removed_cell_data = Vec::new();
2653 if !is_error {
2657 let to_remove_persistent: Vec<_> = task
2664 .iter_persistent_cell_data()
2665 .filter_map(|(cell, _)| {
2666 cell_counters
2667 .get(&cell.type_id)
2668 .is_none_or(|start_index| cell.index >= *start_index)
2669 .then_some(*cell)
2670 })
2671 .collect();
2672
2673 let to_remove_transient: Vec<_> = task
2675 .iter_transient_cell_data()
2676 .filter_map(|(cell, _)| {
2677 cell_counters
2678 .get(&cell.type_id)
2679 .is_none_or(|start_index| cell.index >= *start_index)
2680 .then_some(*cell)
2681 })
2682 .collect();
2683 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2684 for cell in to_remove_persistent {
2685 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2686 removed_cell_data.push(data.into_untyped());
2687 }
2688 }
2689 for cell in to_remove_transient {
2690 if let Some(data) = task.remove_transient_cell_data(&cell) {
2691 removed_cell_data.push(data);
2692 }
2693 }
2694 }
2695
2696 task.cleanup_after_execution();
2700
2701 drop(task);
2702
2703 removed_cell_data
2705 }
2706
2707 fn run_backend_job<'a>(
2708 self: &'a Arc<Self>,
2709 job: TurboTasksBackendJob,
2710 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2711 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2712 Box::pin(async move {
2713 match job {
2714 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2715 debug_assert!(self.should_persist());
2716
2717 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2718 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2719 let mut idle_start_listener = self.idle_start_event.listen();
2720 let mut idle_end_listener = self.idle_end_event.listen();
2721 let mut fresh_idle = true;
2722 loop {
2723 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2724 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2725 let idle_timeout = *IDLE_TIMEOUT;
2726 let (time, mut reason) =
2727 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2728 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2729 } else {
2730 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2731 };
2732
2733 let until = last_snapshot + time;
2734 if until > Instant::now() {
2735 let mut stop_listener = self.stopping_event.listen();
2736 if self.stopping.load(Ordering::Acquire) {
2737 return;
2738 }
2739 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2740 Instant::now() + idle_timeout
2741 } else {
2742 far_future()
2743 };
2744 loop {
2745 tokio::select! {
2746 _ = &mut stop_listener => {
2747 return;
2748 },
2749 _ = &mut idle_start_listener => {
2750 fresh_idle = true;
2751 idle_time = Instant::now() + idle_timeout;
2752 idle_start_listener = self.idle_start_event.listen()
2753 },
2754 _ = &mut idle_end_listener => {
2755 idle_time = until + idle_timeout;
2756 idle_end_listener = self.idle_end_event.listen()
2757 },
2758 _ = tokio::time::sleep_until(until) => {
2759 break;
2760 },
2761 _ = tokio::time::sleep_until(idle_time) => {
2762 if turbo_tasks.is_idle() {
2763 reason = "idle timeout";
2764 break;
2765 }
2766 },
2767 }
2768 }
2769 }
2770
2771 let this = self.clone();
2772 let snapshot = this.snapshot_and_persist(None, reason, turbo_tasks);
2773 if let Some((snapshot_start, new_data)) = snapshot {
2774 last_snapshot = snapshot_start;
2775
2776 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2779 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2780 let idle_ended = tokio::select! {
2781 biased;
2782 _ = &mut idle_end_listener => {
2783 idle_end_listener = self.idle_end_event.listen();
2784 true
2785 },
2786 _ = std::future::ready(()) => false,
2787 };
2788 if idle_ended {
2789 break;
2790 }
2791 match self.backing_storage.compact() {
2792 Ok(true) => {}
2793 Ok(false) => break,
2794 Err(err) => {
2795 eprintln!("Compaction failed: {err:?}");
2796 break;
2797 }
2798 }
2799 }
2800
2801 if !new_data {
2802 fresh_idle = false;
2803 continue;
2804 }
2805 let last_snapshot = last_snapshot.duration_since(self.start_time);
2806 self.last_snapshot.store(
2807 last_snapshot.as_millis().try_into().unwrap(),
2808 Ordering::Relaxed,
2809 );
2810
2811 turbo_tasks.schedule_backend_background_job(
2812 TurboTasksBackendJob::FollowUpSnapshot,
2813 );
2814 return;
2815 }
2816 }
2817 }
2818 }
2819 })
2820 }
2821
2822 fn try_read_own_task_cell(
2823 &self,
2824 task_id: TaskId,
2825 cell: CellId,
2826 options: ReadCellOptions,
2827 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2828 ) -> Result<TypedCellContent> {
2829 let mut ctx = self.execute_context(turbo_tasks);
2830 let task = ctx.task(task_id, TaskDataCategory::Data);
2831 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2832 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2833 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2834 } else {
2835 Ok(CellContent(None).into_typed(cell.type_id))
2836 }
2837 }
2838
2839 fn read_task_collectibles(
2840 &self,
2841 task_id: TaskId,
2842 collectible_type: TraitTypeId,
2843 reader_id: Option<TaskId>,
2844 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2845 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2846 let mut ctx = self.execute_context(turbo_tasks);
2847 let mut collectibles = AutoMap::default();
2848 {
2849 let mut task = ctx.task(task_id, TaskDataCategory::All);
2850 loop {
2852 let aggregation_number = get_aggregation_number(&task);
2853 if is_root_node(aggregation_number) {
2854 break;
2855 }
2856 drop(task);
2857 AggregationUpdateQueue::run(
2858 AggregationUpdateJob::UpdateAggregationNumber {
2859 task_id,
2860 base_aggregation_number: u32::MAX,
2861 distance: None,
2862 },
2863 &mut ctx,
2864 );
2865 task = ctx.task(task_id, TaskDataCategory::All);
2866 }
2867 for (collectible, count) in task.iter_aggregated_collectibles() {
2868 if *count > 0 && collectible.collectible_type == collectible_type {
2869 *collectibles
2870 .entry(RawVc::TaskCell(
2871 collectible.cell.task,
2872 collectible.cell.cell,
2873 ))
2874 .or_insert(0) += 1;
2875 }
2876 }
2877 for (&collectible, &count) in task.iter_collectibles() {
2878 if collectible.collectible_type == collectible_type {
2879 *collectibles
2880 .entry(RawVc::TaskCell(
2881 collectible.cell.task,
2882 collectible.cell.cell,
2883 ))
2884 .or_insert(0) += count;
2885 }
2886 }
2887 if let Some(reader_id) = reader_id {
2888 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2889 }
2890 }
2891 if let Some(reader_id) = reader_id {
2892 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2893 let target = CollectiblesRef {
2894 task: task_id,
2895 collectible_type,
2896 };
2897 if !reader.remove_outdated_collectibles_dependencies(&target) {
2898 let _ = reader.add_collectibles_dependencies(target);
2899 }
2900 }
2901 collectibles
2902 }
2903
2904 fn emit_collectible(
2905 &self,
2906 collectible_type: TraitTypeId,
2907 collectible: RawVc,
2908 task_id: TaskId,
2909 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2910 ) {
2911 self.assert_valid_collectible(task_id, collectible);
2912
2913 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2914 panic!("Collectibles need to be resolved");
2915 };
2916 let cell = CellRef {
2917 task: collectible_task,
2918 cell,
2919 };
2920 operation::UpdateCollectibleOperation::run(
2921 task_id,
2922 CollectibleRef {
2923 collectible_type,
2924 cell,
2925 },
2926 1,
2927 self.execute_context(turbo_tasks),
2928 );
2929 }
2930
2931 fn unemit_collectible(
2932 &self,
2933 collectible_type: TraitTypeId,
2934 collectible: RawVc,
2935 count: u32,
2936 task_id: TaskId,
2937 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2938 ) {
2939 self.assert_valid_collectible(task_id, collectible);
2940
2941 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2942 panic!("Collectibles need to be resolved");
2943 };
2944 let cell = CellRef {
2945 task: collectible_task,
2946 cell,
2947 };
2948 operation::UpdateCollectibleOperation::run(
2949 task_id,
2950 CollectibleRef {
2951 collectible_type,
2952 cell,
2953 },
2954 -(i32::try_from(count).unwrap()),
2955 self.execute_context(turbo_tasks),
2956 );
2957 }
2958
2959 fn update_task_cell(
2960 &self,
2961 task_id: TaskId,
2962 cell: CellId,
2963 is_serializable_cell_content: bool,
2964 content: CellContent,
2965 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2966 verification_mode: VerificationMode,
2967 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2968 ) {
2969 operation::UpdateCellOperation::run(
2970 task_id,
2971 cell,
2972 content,
2973 is_serializable_cell_content,
2974 updated_key_hashes,
2975 verification_mode,
2976 self.execute_context(turbo_tasks),
2977 );
2978 }
2979
2980 fn mark_own_task_as_session_dependent(
2981 &self,
2982 task_id: TaskId,
2983 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2984 ) {
2985 if !self.should_track_dependencies() {
2986 return;
2988 }
2989 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2990 let mut ctx = self.execute_context(turbo_tasks);
2991 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2992 let aggregation_number = get_aggregation_number(&task);
2993 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2994 drop(task);
2995 AggregationUpdateQueue::run(
2998 AggregationUpdateJob::UpdateAggregationNumber {
2999 task_id,
3000 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3001 distance: None,
3002 },
3003 &mut ctx,
3004 );
3005 task = ctx.task(task_id, TaskDataCategory::Meta);
3006 }
3007 if let Some(InProgressState::InProgress(box InProgressStateInner {
3008 session_dependent,
3009 ..
3010 })) = task.get_in_progress_mut()
3011 {
3012 *session_dependent = true;
3013 }
3014 }
3015
3016 fn mark_own_task_as_finished(
3017 &self,
3018 task: TaskId,
3019 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3020 ) {
3021 let mut ctx = self.execute_context(turbo_tasks);
3022 let mut task = ctx.task(task, TaskDataCategory::Data);
3023 if let Some(InProgressState::InProgress(box InProgressStateInner {
3024 marked_as_completed,
3025 ..
3026 })) = task.get_in_progress_mut()
3027 {
3028 *marked_as_completed = true;
3029 }
3034 }
3035
3036 fn connect_task(
3037 &self,
3038 task: TaskId,
3039 parent_task: Option<TaskId>,
3040 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3041 ) {
3042 self.assert_not_persistent_calling_transient(parent_task, task, None);
3043 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3044 }
3045
3046 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3047 let task_id = self.transient_task_id_factory.get();
3048 {
3049 let mut task = self.storage.access_mut(task_id);
3050 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3051 }
3052 #[cfg(feature = "verify_aggregation_graph")]
3053 self.root_tasks.lock().insert(task_id);
3054 task_id
3055 }
3056
3057 fn dispose_root_task(
3058 &self,
3059 task_id: TaskId,
3060 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3061 ) {
3062 #[cfg(feature = "verify_aggregation_graph")]
3063 self.root_tasks.lock().remove(&task_id);
3064
3065 let mut ctx = self.execute_context(turbo_tasks);
3066 let mut task = ctx.task(task_id, TaskDataCategory::All);
3067 let is_dirty = task.is_dirty();
3068 let has_dirty_containers = task.has_dirty_containers();
3069 if is_dirty.is_some() || has_dirty_containers {
3070 if let Some(activeness_state) = task.get_activeness_mut() {
3071 activeness_state.unset_root_type();
3073 activeness_state.set_active_until_clean();
3074 };
3075 } else if let Some(activeness_state) = task.take_activeness() {
3076 activeness_state.all_clean_event.notify(usize::MAX);
3079 }
3080 }
3081
3082 #[cfg(feature = "verify_aggregation_graph")]
3083 fn verify_aggregation_graph(
3084 &self,
3085 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3086 idle: bool,
3087 ) {
3088 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3089 return;
3090 }
3091 use std::{collections::VecDeque, env, io::stdout};
3092
3093 use crate::backend::operation::{get_uppers, is_aggregating_node};
3094
3095 let mut ctx = self.execute_context(turbo_tasks);
3096 let root_tasks = self.root_tasks.lock().clone();
3097
3098 for task_id in root_tasks.into_iter() {
3099 let mut queue = VecDeque::new();
3100 let mut visited = FxHashSet::default();
3101 let mut aggregated_nodes = FxHashSet::default();
3102 let mut collectibles = FxHashMap::default();
3103 let root_task_id = task_id;
3104 visited.insert(task_id);
3105 aggregated_nodes.insert(task_id);
3106 queue.push_back(task_id);
3107 let mut counter = 0;
3108 while let Some(task_id) = queue.pop_front() {
3109 counter += 1;
3110 if counter % 100000 == 0 {
3111 println!(
3112 "queue={}, visited={}, aggregated_nodes={}",
3113 queue.len(),
3114 visited.len(),
3115 aggregated_nodes.len()
3116 );
3117 }
3118 let task = ctx.task(task_id, TaskDataCategory::All);
3119 if idle && !self.is_idle.load(Ordering::Relaxed) {
3120 return;
3121 }
3122
3123 let uppers = get_uppers(&task);
3124 if task_id != root_task_id
3125 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3126 {
3127 panic!(
3128 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3129 {:?})",
3130 task_id,
3131 task.get_task_description(),
3132 uppers
3133 );
3134 }
3135
3136 for (collectible, _) in task.iter_aggregated_collectibles() {
3137 collectibles
3138 .entry(*collectible)
3139 .or_insert_with(|| (false, Vec::new()))
3140 .1
3141 .push(task_id);
3142 }
3143
3144 for (&collectible, &value) in task.iter_collectibles() {
3145 if value > 0 {
3146 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3147 *flag = true
3148 } else {
3149 panic!(
3150 "Task {} has a collectible {:?} that is not in any upper task",
3151 task_id, collectible
3152 );
3153 }
3154 }
3155 }
3156
3157 let is_dirty = task.has_dirty();
3158 let has_dirty_container = task.has_dirty_containers();
3159 let should_be_in_upper = is_dirty || has_dirty_container;
3160
3161 let aggregation_number = get_aggregation_number(&task);
3162 if is_aggregating_node(aggregation_number) {
3163 aggregated_nodes.insert(task_id);
3164 }
3165 for child_id in task.iter_children() {
3172 if visited.insert(child_id) {
3174 queue.push_back(child_id);
3175 }
3176 }
3177 drop(task);
3178
3179 if should_be_in_upper {
3180 for upper_id in uppers {
3181 let upper = ctx.task(upper_id, TaskDataCategory::All);
3182 let in_upper = upper
3183 .get_aggregated_dirty_containers(&task_id)
3184 .is_some_and(|&dirty| dirty > 0);
3185 if !in_upper {
3186 let containers: Vec<_> = upper
3187 .iter_aggregated_dirty_containers()
3188 .map(|(&k, &v)| (k, v))
3189 .collect();
3190 let upper_task_desc = upper.get_task_description();
3191 drop(upper);
3192 panic!(
3193 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3194 ({})\nThese dirty containers are present:\n{:#?}",
3195 task_id,
3196 ctx.task(task_id, TaskDataCategory::Data)
3197 .get_task_description(),
3198 upper_id,
3199 upper_task_desc,
3200 containers,
3201 );
3202 }
3203 }
3204 }
3205 }
3206
3207 for (collectible, (flag, task_ids)) in collectibles {
3208 if !flag {
3209 use std::io::Write;
3210 let mut stdout = stdout().lock();
3211 writeln!(
3212 stdout,
3213 "{:?} that is not emitted in any child task but in these aggregated \
3214 tasks: {:#?}",
3215 collectible,
3216 task_ids
3217 .iter()
3218 .map(|t| format!(
3219 "{t} {}",
3220 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3221 ))
3222 .collect::<Vec<_>>()
3223 )
3224 .unwrap();
3225
3226 let task_id = collectible.cell.task;
3227 let mut queue = {
3228 let task = ctx.task(task_id, TaskDataCategory::All);
3229 get_uppers(&task)
3230 };
3231 let mut visited = FxHashSet::default();
3232 for &upper_id in queue.iter() {
3233 visited.insert(upper_id);
3234 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3235 }
3236 while let Some(task_id) = queue.pop() {
3237 let task = ctx.task(task_id, TaskDataCategory::All);
3238 let desc = task.get_task_description();
3239 let aggregated_collectible = task
3240 .get_aggregated_collectibles(&collectible)
3241 .copied()
3242 .unwrap_or_default();
3243 let uppers = get_uppers(&task);
3244 drop(task);
3245 writeln!(
3246 stdout,
3247 "upper {task_id} {desc} collectible={aggregated_collectible}"
3248 )
3249 .unwrap();
3250 if task_ids.contains(&task_id) {
3251 writeln!(
3252 stdout,
3253 "Task has an upper connection to an aggregated task that doesn't \
3254 reference it. Upper connection is invalid!"
3255 )
3256 .unwrap();
3257 }
3258 for upper_id in uppers {
3259 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3260 if !visited.contains(&upper_id) {
3261 queue.push(upper_id);
3262 }
3263 }
3264 }
3265 panic!("See stdout for more details");
3266 }
3267 }
3268 }
3269 }
3270
3271 fn assert_not_persistent_calling_transient(
3272 &self,
3273 parent_id: Option<TaskId>,
3274 child_id: TaskId,
3275 cell_id: Option<CellId>,
3276 ) {
3277 if let Some(parent_id) = parent_id
3278 && !parent_id.is_transient()
3279 && child_id.is_transient()
3280 {
3281 self.panic_persistent_calling_transient(
3282 self.debug_get_task_description(parent_id),
3283 self.debug_get_cached_task_type(child_id).as_deref(),
3284 cell_id,
3285 );
3286 }
3287 }
3288
3289 fn panic_persistent_calling_transient(
3290 &self,
3291 parent: String,
3292 child: Option<&CachedTaskType>,
3293 cell_id: Option<CellId>,
3294 ) {
3295 let transient_reason = if let Some(child) = child {
3296 Cow::Owned(format!(
3297 " The callee is transient because it depends on:\n{}",
3298 self.debug_trace_transient_task(child, cell_id),
3299 ))
3300 } else {
3301 Cow::Borrowed("")
3302 };
3303 panic!(
3304 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3305 parent,
3306 child.map_or("unknown", |t| t.get_name()),
3307 transient_reason,
3308 );
3309 }
3310
3311 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3312 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3314 let task_info = if let Some(col_task_ty) = collectible
3316 .try_get_task_id()
3317 .map(|t| self.debug_get_task_description(t))
3318 {
3319 Cow::Owned(format!(" (return type of {col_task_ty})"))
3320 } else {
3321 Cow::Borrowed("")
3322 };
3323 panic!("Collectible{task_info} must be a ResolvedVc")
3324 };
3325 if col_task_id.is_transient() && !task_id.is_transient() {
3326 let transient_reason =
3327 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3328 Cow::Owned(format!(
3329 ". The collectible is transient because it depends on:\n{}",
3330 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3331 ))
3332 } else {
3333 Cow::Borrowed("")
3334 };
3335 panic!(
3337 "Collectible is transient, transient collectibles cannot be emitted from \
3338 persistent tasks{transient_reason}",
3339 )
3340 }
3341 }
3342}
3343
3344impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3345 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3346 self.0.startup(turbo_tasks);
3347 }
3348
3349 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3350 self.0.stopping();
3351 }
3352
3353 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3354 self.0.stop(turbo_tasks);
3355 }
3356
3357 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3358 self.0.idle_start(turbo_tasks);
3359 }
3360
3361 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3362 self.0.idle_end();
3363 }
3364
3365 fn get_or_create_persistent_task(
3366 &self,
3367 task_type: CachedTaskType,
3368 parent_task: Option<TaskId>,
3369 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3370 ) -> TaskId {
3371 self.0
3372 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3373 }
3374
3375 fn get_or_create_transient_task(
3376 &self,
3377 task_type: CachedTaskType,
3378 parent_task: Option<TaskId>,
3379 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3380 ) -> TaskId {
3381 self.0
3382 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3383 }
3384
3385 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3386 self.0.invalidate_task(task_id, turbo_tasks);
3387 }
3388
3389 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3390 self.0.invalidate_tasks(tasks, turbo_tasks);
3391 }
3392
3393 fn invalidate_tasks_set(
3394 &self,
3395 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3396 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3397 ) {
3398 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3399 }
3400
3401 fn invalidate_serialization(
3402 &self,
3403 task_id: TaskId,
3404 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3405 ) {
3406 self.0.invalidate_serialization(task_id, turbo_tasks);
3407 }
3408
3409 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3410 self.0.task_execution_canceled(task, turbo_tasks)
3411 }
3412
3413 fn try_start_task_execution(
3414 &self,
3415 task_id: TaskId,
3416 priority: TaskPriority,
3417 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3418 ) -> Option<TaskExecutionSpec<'_>> {
3419 self.0
3420 .try_start_task_execution(task_id, priority, turbo_tasks)
3421 }
3422
3423 fn task_execution_completed(
3424 &self,
3425 task_id: TaskId,
3426 result: Result<RawVc, TurboTasksExecutionError>,
3427 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3428 #[cfg(feature = "verify_determinism")] stateful: bool,
3429 has_invalidator: bool,
3430 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3431 ) -> bool {
3432 self.0.task_execution_completed(
3433 task_id,
3434 result,
3435 cell_counters,
3436 #[cfg(feature = "verify_determinism")]
3437 stateful,
3438 has_invalidator,
3439 turbo_tasks,
3440 )
3441 }
3442
3443 type BackendJob = TurboTasksBackendJob;
3444
3445 fn run_backend_job<'a>(
3446 &'a self,
3447 job: Self::BackendJob,
3448 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3449 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3450 self.0.run_backend_job(job, turbo_tasks)
3451 }
3452
3453 fn try_read_task_output(
3454 &self,
3455 task_id: TaskId,
3456 reader: Option<TaskId>,
3457 options: ReadOutputOptions,
3458 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3459 ) -> Result<Result<RawVc, EventListener>> {
3460 self.0
3461 .try_read_task_output(task_id, reader, options, turbo_tasks)
3462 }
3463
3464 fn try_read_task_cell(
3465 &self,
3466 task_id: TaskId,
3467 cell: CellId,
3468 reader: Option<TaskId>,
3469 options: ReadCellOptions,
3470 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3471 ) -> Result<Result<TypedCellContent, EventListener>> {
3472 self.0
3473 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3474 }
3475
3476 fn try_read_own_task_cell(
3477 &self,
3478 task_id: TaskId,
3479 cell: CellId,
3480 options: ReadCellOptions,
3481 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3482 ) -> Result<TypedCellContent> {
3483 self.0
3484 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3485 }
3486
3487 fn read_task_collectibles(
3488 &self,
3489 task_id: TaskId,
3490 collectible_type: TraitTypeId,
3491 reader: Option<TaskId>,
3492 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3493 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3494 self.0
3495 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3496 }
3497
3498 fn emit_collectible(
3499 &self,
3500 collectible_type: TraitTypeId,
3501 collectible: RawVc,
3502 task_id: TaskId,
3503 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3504 ) {
3505 self.0
3506 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3507 }
3508
3509 fn unemit_collectible(
3510 &self,
3511 collectible_type: TraitTypeId,
3512 collectible: RawVc,
3513 count: u32,
3514 task_id: TaskId,
3515 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3516 ) {
3517 self.0
3518 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3519 }
3520
3521 fn update_task_cell(
3522 &self,
3523 task_id: TaskId,
3524 cell: CellId,
3525 is_serializable_cell_content: bool,
3526 content: CellContent,
3527 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3528 verification_mode: VerificationMode,
3529 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3530 ) {
3531 self.0.update_task_cell(
3532 task_id,
3533 cell,
3534 is_serializable_cell_content,
3535 content,
3536 updated_key_hashes,
3537 verification_mode,
3538 turbo_tasks,
3539 );
3540 }
3541
3542 fn mark_own_task_as_finished(
3543 &self,
3544 task_id: TaskId,
3545 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3546 ) {
3547 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3548 }
3549
3550 fn mark_own_task_as_session_dependent(
3551 &self,
3552 task: TaskId,
3553 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3554 ) {
3555 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3556 }
3557
3558 fn connect_task(
3559 &self,
3560 task: TaskId,
3561 parent_task: Option<TaskId>,
3562 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3563 ) {
3564 self.0.connect_task(task, parent_task, turbo_tasks);
3565 }
3566
3567 fn create_transient_task(
3568 &self,
3569 task_type: TransientTaskType,
3570 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3571 ) -> TaskId {
3572 self.0.create_transient_task(task_type)
3573 }
3574
3575 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3576 self.0.dispose_root_task(task_id, turbo_tasks);
3577 }
3578
3579 fn task_statistics(&self) -> &TaskStatisticsApi {
3580 &self.0.task_statistics
3581 }
3582
3583 fn is_tracking_dependencies(&self) -> bool {
3584 self.0.options.dependency_tracking
3585 }
3586
3587 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3588 self.0.get_task_name(task, turbo_tasks)
3589 }
3590}
3591
3592enum DebugTraceTransientTask {
3593 Cached {
3594 task_name: &'static str,
3595 cell_type_id: Option<ValueTypeId>,
3596 cause_self: Option<Box<DebugTraceTransientTask>>,
3597 cause_args: Vec<DebugTraceTransientTask>,
3598 },
3599 Collapsed {
3601 task_name: &'static str,
3602 cell_type_id: Option<ValueTypeId>,
3603 },
3604 Uncached {
3605 cell_type_id: Option<ValueTypeId>,
3606 },
3607}
3608
3609impl DebugTraceTransientTask {
3610 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3611 let indent = " ".repeat(level);
3612 f.write_str(&indent)?;
3613
3614 fn fmt_cell_type_id(
3615 f: &mut fmt::Formatter<'_>,
3616 cell_type_id: Option<ValueTypeId>,
3617 ) -> fmt::Result {
3618 if let Some(ty) = cell_type_id {
3619 write!(
3620 f,
3621 " (read cell of type {})",
3622 get_value_type(ty).ty.global_name
3623 )
3624 } else {
3625 Ok(())
3626 }
3627 }
3628
3629 match self {
3631 Self::Cached {
3632 task_name,
3633 cell_type_id,
3634 ..
3635 }
3636 | Self::Collapsed {
3637 task_name,
3638 cell_type_id,
3639 ..
3640 } => {
3641 f.write_str(task_name)?;
3642 fmt_cell_type_id(f, *cell_type_id)?;
3643 if matches!(self, Self::Collapsed { .. }) {
3644 f.write_str(" (collapsed)")?;
3645 }
3646 }
3647 Self::Uncached { cell_type_id } => {
3648 f.write_str("unknown transient task")?;
3649 fmt_cell_type_id(f, *cell_type_id)?;
3650 }
3651 }
3652 f.write_char('\n')?;
3653
3654 if let Self::Cached {
3656 cause_self,
3657 cause_args,
3658 ..
3659 } = self
3660 {
3661 if let Some(c) = cause_self {
3662 writeln!(f, "{indent} self:")?;
3663 c.fmt_indented(f, level + 1)?;
3664 }
3665 if !cause_args.is_empty() {
3666 writeln!(f, "{indent} args:")?;
3667 for c in cause_args {
3668 c.fmt_indented(f, level + 1)?;
3669 }
3670 }
3671 }
3672 Ok(())
3673 }
3674}
3675
3676impl fmt::Display for DebugTraceTransientTask {
3677 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3678 self.fmt_indented(f, 0)
3679 }
3680}
3681
3682fn far_future() -> Instant {
3684 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3689}
3690
3691fn encode_task_data(
3696 task: TaskId,
3697 data: &TaskStorage,
3698 category: SpecificTaskDataCategory,
3699 scratch_buffer: &mut TurboBincodeBuffer,
3700) -> Result<TurboBincodeBuffer> {
3701 scratch_buffer.clear();
3702 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3703 data.encode(category, &mut encoder)?;
3704
3705 if cfg!(feature = "verify_serialization") {
3706 TaskStorage::new()
3707 .decode(
3708 category,
3709 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3710 )
3711 .with_context(|| {
3712 format!(
3713 "expected to be able to decode serialized data for '{category:?}' information \
3714 for {task}"
3715 )
3716 })?;
3717 }
3718 Ok(SmallVec::from_slice(scratch_buffer))
3719}