1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17 time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33 backend::{
34 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
35 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37 VerificationMode,
38 },
39 event::{Event, EventDescription, EventListener},
40 message_queue::{TimingEvent, TraceEvent},
41 registry::get_value_type,
42 scope::scope_and_block,
43 task_statistics::TaskStatisticsApi,
44 trace::TraceRawVcs,
45 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49 operation::AnyOperation,
50 storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55 backend::{
56 operation::{
57 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58 CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59 ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60 TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61 is_root_node, make_task_dirty_internal, prepare_new_children,
62 },
63 storage::Storage,
64 storage_schema::{TaskStorage, TaskStorageAccessors},
65 },
66 backing_storage::BackingStorage,
67 data::{
68 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69 InProgressState, InProgressStateInner, OutputValue, TransientTask,
70 },
71 error::TaskError,
72 utils::{
73 arc_or_owned::ArcOrOwned,
74 chunked_vec::ChunkedVec,
75 dash_map_drop_contents::drop_contents,
76 dash_map_raw_entry::{RawEntry, raw_entry},
77 ptr_eq_arc::PtrEqArc,
78 shard_amount::compute_shard_amount,
79 sharded::Sharded,
80 swap_retain,
81 },
82};
83
84const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
88
89const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
90
91static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
94 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
95 .ok()
96 .and_then(|v| v.parse::<u64>().ok())
97 .map(Duration::from_millis)
98 .unwrap_or(Duration::from_secs(2))
99});
100
101struct SnapshotRequest {
102 snapshot_requested: bool,
103 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
104}
105
106impl SnapshotRequest {
107 fn new() -> Self {
108 Self {
109 snapshot_requested: false,
110 suspended_operations: FxHashSet::default(),
111 }
112 }
113}
114
115pub enum StorageMode {
116 ReadOnly,
118 ReadWrite,
121 ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127 pub dependency_tracking: bool,
132
133 pub active_tracking: bool,
139
140 pub storage_mode: Option<StorageMode>,
142
143 pub num_workers: Option<usize>,
146
147 pub small_preallocation: bool,
149}
150
151impl Default for BackendOptions {
152 fn default() -> Self {
153 Self {
154 dependency_tracking: true,
155 active_tracking: true,
156 storage_mode: Some(StorageMode::ReadWrite),
157 num_workers: None,
158 small_preallocation: false,
159 }
160 }
161}
162
163pub enum TurboTasksBackendJob {
164 InitialSnapshot,
165 FollowUpSnapshot,
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173 options: BackendOptions,
174
175 start_time: Instant,
176
177 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179
180 persisted_task_cache_log: Option<TaskCacheLog>,
181 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182
183 storage: Storage,
184
185 local_is_partial: AtomicBool,
187
188 in_progress_operations: AtomicUsize,
194
195 snapshot_request: Mutex<SnapshotRequest>,
196 operations_suspended: Condvar,
200 snapshot_completed: Condvar,
203 last_snapshot: AtomicU64,
205
206 stopping: AtomicBool,
207 stopping_event: Event,
208 idle_start_event: Event,
209 idle_end_event: Event,
210 #[cfg(feature = "verify_aggregation_graph")]
211 is_idle: AtomicBool,
212
213 task_statistics: TaskStatisticsApi,
214
215 backing_storage: B,
216
217 #[cfg(feature = "verify_aggregation_graph")]
218 root_tasks: Mutex<FxHashSet<TaskId>>,
219}
220
221impl<B: BackingStorage> TurboTasksBackend<B> {
222 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
223 Self(Arc::new(TurboTasksBackendInner::new(
224 options,
225 backing_storage,
226 )))
227 }
228
229 pub fn backing_storage(&self) -> &B {
230 &self.0.backing_storage
231 }
232}
233
234impl<B: BackingStorage> TurboTasksBackendInner<B> {
235 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
236 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
237 let need_log = matches!(
238 options.storage_mode,
239 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
240 );
241 if !options.dependency_tracking {
242 options.active_tracking = false;
243 }
244 let small_preallocation = options.small_preallocation;
245 let next_task_id = backing_storage
246 .next_free_task_id()
247 .expect("Failed to get task id");
248 Self {
249 options,
250 start_time: Instant::now(),
251 persisted_task_id_factory: IdFactoryWithReuse::new(
252 next_task_id,
253 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
254 ),
255 transient_task_id_factory: IdFactoryWithReuse::new(
256 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
257 TaskId::MAX,
258 ),
259 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
260 task_cache: FxDashMap::default(),
261 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
262 storage: Storage::new(shard_amount, small_preallocation),
263 in_progress_operations: AtomicUsize::new(0),
264 snapshot_request: Mutex::new(SnapshotRequest::new()),
265 operations_suspended: Condvar::new(),
266 snapshot_completed: Condvar::new(),
267 last_snapshot: AtomicU64::new(0),
268 stopping: AtomicBool::new(false),
269 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
270 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
271 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
272 #[cfg(feature = "verify_aggregation_graph")]
273 is_idle: AtomicBool::new(false),
274 task_statistics: TaskStatisticsApi::default(),
275 backing_storage,
276 #[cfg(feature = "verify_aggregation_graph")]
277 root_tasks: Default::default(),
278 }
279 }
280
281 fn execute_context<'a>(
282 &'a self,
283 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
284 ) -> impl ExecuteContext<'a> {
285 ExecuteContextImpl::new(self, turbo_tasks)
286 }
287
288 fn suspending_requested(&self) -> bool {
289 self.should_persist()
290 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
291 }
292
293 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
294 #[cold]
295 fn operation_suspend_point_cold<B: BackingStorage>(
296 this: &TurboTasksBackendInner<B>,
297 suspend: impl FnOnce() -> AnyOperation,
298 ) {
299 let operation = Arc::new(suspend());
300 let mut snapshot_request = this.snapshot_request.lock();
301 if snapshot_request.snapshot_requested {
302 snapshot_request
303 .suspended_operations
304 .insert(operation.clone().into());
305 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
306 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
307 if value == SNAPSHOT_REQUESTED_BIT {
308 this.operations_suspended.notify_all();
309 }
310 this.snapshot_completed
311 .wait_while(&mut snapshot_request, |snapshot_request| {
312 snapshot_request.snapshot_requested
313 });
314 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
315 snapshot_request
316 .suspended_operations
317 .remove(&operation.into());
318 }
319 }
320
321 if self.suspending_requested() {
322 operation_suspend_point_cold(self, suspend);
323 }
324 }
325
326 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
327 if !self.should_persist() {
328 return OperationGuard { backend: None };
329 }
330 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
331 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
332 let mut snapshot_request = self.snapshot_request.lock();
333 if snapshot_request.snapshot_requested {
334 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
335 if value == SNAPSHOT_REQUESTED_BIT {
336 self.operations_suspended.notify_all();
337 }
338 self.snapshot_completed
339 .wait_while(&mut snapshot_request, |snapshot_request| {
340 snapshot_request.snapshot_requested
341 });
342 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
343 }
344 }
345 OperationGuard {
346 backend: Some(self),
347 }
348 }
349
350 fn should_persist(&self) -> bool {
351 matches!(
352 self.options.storage_mode,
353 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
354 )
355 }
356
357 fn should_restore(&self) -> bool {
358 self.options.storage_mode.is_some()
359 }
360
361 fn should_track_dependencies(&self) -> bool {
362 self.options.dependency_tracking
363 }
364
365 fn should_track_activeness(&self) -> bool {
366 self.options.active_tracking
367 }
368
369 fn track_cache_hit(&self, task_type: &CachedTaskType) {
370 self.task_statistics
371 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
372 }
373
374 fn track_cache_miss(&self, task_type: &CachedTaskType) {
375 self.task_statistics
376 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
377 }
378
379 fn task_error_to_turbo_tasks_execution_error(
384 &self,
385 error: &TaskError,
386 ctx: &mut impl ExecuteContext<'_>,
387 ) -> TurboTasksExecutionError {
388 match error {
389 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
390 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
391 message: item.message.clone(),
392 source: item
393 .source
394 .as_ref()
395 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
396 })),
397 TaskError::LocalTaskContext(local_task_context) => {
398 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
399 name: local_task_context.name.clone(),
400 source: local_task_context
401 .source
402 .as_ref()
403 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
404 }))
405 }
406 TaskError::TaskChain(chain) => {
407 let task_id = chain.last().unwrap();
408 let error = {
409 let task = ctx.task(*task_id, TaskDataCategory::Meta);
410 if let Some(OutputValue::Error(error)) = task.get_output() {
411 Some(error.clone())
412 } else {
413 None
414 }
415 };
416 let error = error.map_or_else(
417 || {
418 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
420 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
421 "Error no longer available",
422 )),
423 location: None,
424 }))
425 },
426 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
427 );
428 let mut current_error = error;
429 for &task_id in chain.iter().rev() {
430 current_error =
431 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
432 task_id,
433 source: Some(current_error),
434 turbo_tasks: ctx.turbo_tasks(),
435 }));
436 }
437 current_error
438 }
439 }
440 }
441}
442
443pub(crate) struct OperationGuard<'a, B: BackingStorage> {
444 backend: Option<&'a TurboTasksBackendInner<B>>,
445}
446
447impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
448 fn drop(&mut self) {
449 if let Some(backend) = self.backend {
450 let fetch_sub = backend
451 .in_progress_operations
452 .fetch_sub(1, Ordering::AcqRel);
453 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
454 backend.operations_suspended.notify_all();
455 }
456 }
457 }
458}
459
460struct TaskExecutionCompletePrepareResult {
462 pub new_children: FxHashSet<TaskId>,
463 pub is_now_immutable: bool,
464 #[cfg(feature = "verify_determinism")]
465 pub no_output_set: bool,
466 pub new_output: Option<OutputValue>,
467 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
468}
469
470impl<B: BackingStorage> TurboTasksBackendInner<B> {
472 fn connect_child(
473 &self,
474 parent_task: Option<TaskId>,
475 child_task: TaskId,
476 task_type: Option<ArcOrOwned<CachedTaskType>>,
477 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
478 ) {
479 operation::ConnectChildOperation::run(
480 parent_task,
481 child_task,
482 task_type,
483 self.execute_context(turbo_tasks),
484 );
485 }
486
487 fn try_read_task_output(
488 self: &Arc<Self>,
489 task_id: TaskId,
490 reader: Option<TaskId>,
491 options: ReadOutputOptions,
492 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
493 ) -> Result<Result<RawVc, EventListener>> {
494 self.assert_not_persistent_calling_transient(reader, task_id, None);
495
496 let mut ctx = self.execute_context(turbo_tasks);
497 let need_reader_task = if self.should_track_dependencies()
498 && !matches!(options.tracking, ReadTracking::Untracked)
499 && reader.is_some_and(|reader_id| reader_id != task_id)
500 && let Some(reader_id) = reader
501 && reader_id != task_id
502 {
503 Some(reader_id)
504 } else {
505 None
506 };
507 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
508 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
512 (task, Some(reader))
513 } else {
514 (ctx.task(task_id, TaskDataCategory::All), None)
515 };
516
517 fn listen_to_done_event(
518 reader_description: Option<EventDescription>,
519 tracking: ReadTracking,
520 done_event: &Event,
521 ) -> EventListener {
522 done_event.listen_with_note(move || {
523 move || {
524 if let Some(reader_description) = reader_description.as_ref() {
525 format!(
526 "try_read_task_output from {} ({})",
527 reader_description, tracking
528 )
529 } else {
530 format!("try_read_task_output ({})", tracking)
531 }
532 }
533 })
534 }
535
536 fn check_in_progress(
537 task: &impl TaskGuard,
538 reader_description: Option<EventDescription>,
539 tracking: ReadTracking,
540 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
541 {
542 match task.get_in_progress() {
543 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
544 listen_to_done_event(reader_description, tracking, done_event),
545 ))),
546 Some(InProgressState::InProgress(box InProgressStateInner {
547 done_event, ..
548 })) => Some(Ok(Err(listen_to_done_event(
549 reader_description,
550 tracking,
551 done_event,
552 )))),
553 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
554 "{} was canceled",
555 task.get_task_description()
556 ))),
557 None => None,
558 }
559 }
560
561 if matches!(options.consistency, ReadConsistency::Strong) {
562 loop {
564 let aggregation_number = get_aggregation_number(&task);
565 if is_root_node(aggregation_number) {
566 break;
567 }
568 drop(task);
569 drop(reader_task);
570 {
571 let _span = tracing::trace_span!(
572 "make root node for strongly consistent read",
573 %task_id
574 )
575 .entered();
576 AggregationUpdateQueue::run(
577 AggregationUpdateJob::UpdateAggregationNumber {
578 task_id,
579 base_aggregation_number: u32::MAX,
580 distance: None,
581 },
582 &mut ctx,
583 );
584 }
585 (task, reader_task) = if let Some(reader_id) = need_reader_task {
586 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
588 (task, Some(reader))
589 } else {
590 (ctx.task(task_id, TaskDataCategory::All), None)
591 }
592 }
593
594 let is_dirty = task.is_dirty();
595
596 let has_dirty_containers = task.has_dirty_containers();
598 if has_dirty_containers || is_dirty.is_some() {
599 let activeness = task.get_activeness_mut();
600 let mut task_ids_to_schedule: Vec<_> = Vec::new();
601 let activeness = if let Some(activeness) = activeness {
603 activeness.set_active_until_clean();
607 activeness
608 } else {
609 if ctx.should_track_activeness() {
613 task_ids_to_schedule = task.dirty_containers().collect();
615 task_ids_to_schedule.push(task_id);
616 }
617 let activeness =
618 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
619 activeness.set_active_until_clean();
620 activeness
621 };
622 let listener = activeness.all_clean_event.listen_with_note(move || {
623 let this = self.clone();
624 let tt = turbo_tasks.pin();
625 move || {
626 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
627 let mut ctx = this.execute_context(tt);
628 let mut visited = FxHashSet::default();
629 fn indent(s: &str) -> String {
630 s.split_inclusive('\n')
631 .flat_map(|line: &str| [" ", line].into_iter())
632 .collect::<String>()
633 }
634 fn get_info(
635 ctx: &mut impl ExecuteContext<'_>,
636 task_id: TaskId,
637 parent_and_count: Option<(TaskId, i32)>,
638 visited: &mut FxHashSet<TaskId>,
639 ) -> String {
640 let task = ctx.task(task_id, TaskDataCategory::All);
641 let is_dirty = task.is_dirty();
642 let in_progress =
643 task.get_in_progress()
644 .map_or("not in progress", |p| match p {
645 InProgressState::InProgress(_) => "in progress",
646 InProgressState::Scheduled { .. } => "scheduled",
647 InProgressState::Canceled => "canceled",
648 });
649 let activeness = task.get_activeness().map_or_else(
650 || "not active".to_string(),
651 |activeness| format!("{activeness:?}"),
652 );
653 let aggregation_number = get_aggregation_number(&task);
654 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
655 {
656 let uppers = get_uppers(&task);
657 !uppers.contains(&parent_task_id)
658 } else {
659 false
660 };
661
662 let has_dirty_containers = task.has_dirty_containers();
664
665 let task_description = task.get_task_description();
666 let is_dirty_label = if let Some(parent_priority) = is_dirty {
667 format!(", dirty({parent_priority})")
668 } else {
669 String::new()
670 };
671 let has_dirty_containers_label = if has_dirty_containers {
672 ", dirty containers"
673 } else {
674 ""
675 };
676 let count = if let Some((_, count)) = parent_and_count {
677 format!(" {count}")
678 } else {
679 String::new()
680 };
681 let mut info = format!(
682 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
683 {in_progress}, \
684 {activeness}{is_dirty_label}{has_dirty_containers_label})",
685 );
686 let children: Vec<_> = task.dirty_containers_with_count().collect();
687 drop(task);
688
689 if missing_upper {
690 info.push_str("\n ERROR: missing upper connection");
691 }
692
693 if has_dirty_containers || !children.is_empty() {
694 writeln!(info, "\n dirty tasks:").unwrap();
695
696 for (child_task_id, count) in children {
697 let task_description = ctx
698 .task(child_task_id, TaskDataCategory::Data)
699 .get_task_description();
700 if visited.insert(child_task_id) {
701 let child_info = get_info(
702 ctx,
703 child_task_id,
704 Some((task_id, count)),
705 visited,
706 );
707 info.push_str(&indent(&child_info));
708 if !info.ends_with('\n') {
709 info.push('\n');
710 }
711 } else {
712 writeln!(
713 info,
714 " {child_task_id} {task_description} {count} \
715 (already visited)"
716 )
717 .unwrap();
718 }
719 }
720 }
721 info
722 }
723 let info = get_info(&mut ctx, task_id, None, &mut visited);
724 format!(
725 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
726 )
727 }
728 });
729 drop(reader_task);
730 drop(task);
731 if !task_ids_to_schedule.is_empty() {
732 let mut queue = AggregationUpdateQueue::new();
733 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
734 queue.execute(&mut ctx);
735 }
736
737 return Ok(Err(listener));
738 }
739 }
740
741 let reader_description = reader_task
742 .as_ref()
743 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
744 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
745 {
746 return value;
747 }
748
749 if let Some(output) = task.get_output() {
750 let result = match output {
751 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
752 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
753 OutputValue::Error(error) => Err(error.clone()),
754 };
755 if let Some(mut reader_task) = reader_task.take()
756 && options.tracking.should_track(result.is_err())
757 && (!task.immutable() || cfg!(feature = "verify_immutable"))
758 {
759 #[cfg(feature = "trace_task_output_dependencies")]
760 let _span = tracing::trace_span!(
761 "add output dependency",
762 task = %task_id,
763 dependent_task = ?reader
764 )
765 .entered();
766 let mut queue = LeafDistanceUpdateQueue::new();
767 let reader = reader.unwrap();
768 if task.add_output_dependent(reader) {
769 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
771 let reader_leaf_distance =
772 reader_task.get_leaf_distance().copied().unwrap_or_default();
773 if reader_leaf_distance.distance <= leaf_distance.distance {
774 queue.push(
775 reader,
776 leaf_distance.distance,
777 leaf_distance.max_distance_in_buffer,
778 );
779 }
780 }
781
782 drop(task);
783
784 if !reader_task.remove_outdated_output_dependencies(&task_id) {
790 let _ = reader_task.add_output_dependencies(task_id);
791 }
792 drop(reader_task);
793
794 queue.execute(&mut ctx);
795 } else {
796 drop(task);
797 }
798
799 return result.map_err(|error| {
800 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
801 .with_task_context(task_id, turbo_tasks.pin())
802 .into()
803 });
804 }
805 drop(reader_task);
806
807 let note = EventDescription::new(|| {
808 move || {
809 if let Some(reader) = reader_description.as_ref() {
810 format!("try_read_task_output (recompute) from {reader}",)
811 } else {
812 "try_read_task_output (recompute, untracked)".to_string()
813 }
814 }
815 });
816
817 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
819 TaskExecutionReason::OutputNotAvailable,
820 EventDescription::new(|| task.get_task_desc_fn()),
821 note,
822 );
823
824 let old = task.set_in_progress(in_progress_state);
827 debug_assert!(old.is_none(), "InProgress already exists");
828 ctx.schedule_task(task, TaskPriority::Initial);
829
830 Ok(Err(listener))
831 }
832
833 fn try_read_task_cell(
834 &self,
835 task_id: TaskId,
836 reader: Option<TaskId>,
837 cell: CellId,
838 options: ReadCellOptions,
839 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
840 ) -> Result<Result<TypedCellContent, EventListener>> {
841 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
842
843 fn add_cell_dependency(
844 task_id: TaskId,
845 mut task: impl TaskGuard,
846 reader: Option<TaskId>,
847 reader_task: Option<impl TaskGuard>,
848 cell: CellId,
849 key: Option<u64>,
850 ) {
851 if let Some(mut reader_task) = reader_task
852 && (!task.immutable() || cfg!(feature = "verify_immutable"))
853 {
854 let reader = reader.unwrap();
855 let _ = task.add_cell_dependents((cell, key, reader));
856 drop(task);
857
858 let target = CellRef {
864 task: task_id,
865 cell,
866 };
867 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
868 let _ = reader_task.add_cell_dependencies((target, key));
869 }
870 drop(reader_task);
871 }
872 }
873
874 let ReadCellOptions {
875 is_serializable_cell_content,
876 tracking,
877 final_read_hint,
878 } = options;
879
880 let mut ctx = self.execute_context(turbo_tasks);
881 let (mut task, reader_task) = if self.should_track_dependencies()
882 && !matches!(tracking, ReadCellTracking::Untracked)
883 && let Some(reader_id) = reader
884 && reader_id != task_id
885 {
886 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
890 (task, Some(reader))
891 } else {
892 (ctx.task(task_id, TaskDataCategory::All), None)
893 };
894
895 let content = if final_read_hint {
896 task.remove_cell_data(is_serializable_cell_content, cell)
897 } else {
898 task.get_cell_data(is_serializable_cell_content, cell)
899 };
900 if let Some(content) = content {
901 if tracking.should_track(false) {
902 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
903 }
904 return Ok(Ok(TypedCellContent(
905 cell.type_id,
906 CellContent(Some(content.reference)),
907 )));
908 }
909
910 let in_progress = task.get_in_progress();
911 if matches!(
912 in_progress,
913 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
914 ) {
915 return Ok(Err(self
916 .listen_to_cell(&mut task, task_id, &reader_task, cell)
917 .0));
918 }
919 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
920
921 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
923 let Some(max_id) = max_id else {
924 let task_desc = task.get_task_description();
925 if tracking.should_track(true) {
926 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
927 }
928 bail!(
929 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
930 );
931 };
932 if cell.index >= max_id {
933 let task_desc = task.get_task_description();
934 if tracking.should_track(true) {
935 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
936 }
937 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
938 }
939
940 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
945 drop(reader_task);
946 if !new_listener {
947 return Ok(Err(listener));
948 }
949
950 let _span = tracing::trace_span!(
951 "recomputation",
952 cell_type = get_value_type(cell.type_id).global_name,
953 cell_index = cell.index
954 )
955 .entered();
956
957 if is_cancelled {
959 bail!("{} was canceled", task.get_task_description());
960 }
961
962 let _ = task.add_scheduled(
963 TaskExecutionReason::CellNotAvailable,
964 EventDescription::new(|| task.get_task_desc_fn()),
965 );
966 ctx.schedule_task(task, TaskPriority::Initial);
967
968 Ok(Err(listener))
969 }
970
971 fn listen_to_cell(
972 &self,
973 task: &mut impl TaskGuard,
974 task_id: TaskId,
975 reader_task: &Option<impl TaskGuard>,
976 cell: CellId,
977 ) -> (EventListener, bool) {
978 let note = || {
979 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
980 move || {
981 if let Some(reader_desc) = reader_desc.as_ref() {
982 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
983 } else {
984 "try_read_task_cell (in progress, untracked)".to_string()
985 }
986 }
987 };
988 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
989 let listener = in_progress.event.listen_with_note(note);
991 return (listener, false);
992 }
993 let in_progress = InProgressCellState::new(task_id, cell);
994 let listener = in_progress.event.listen_with_note(note);
995 let old = task.insert_in_progress_cells(cell, in_progress);
996 debug_assert!(old.is_none(), "InProgressCell already exists");
997 (listener, true)
998 }
999
1000 fn snapshot_and_persist(
1001 &self,
1002 parent_span: Option<tracing::Id>,
1003 reason: &str,
1004 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1005 ) -> Option<(Instant, bool)> {
1006 let snapshot_span =
1007 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1008 .entered();
1009 let start = Instant::now();
1010 let wall_start = SystemTime::now();
1014 debug_assert!(self.should_persist());
1015
1016 let suspended_operations;
1017 {
1018 let _span = tracing::info_span!("blocking").entered();
1019 let mut snapshot_request = self.snapshot_request.lock();
1020 snapshot_request.snapshot_requested = true;
1021 let active_operations = self
1022 .in_progress_operations
1023 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1024 if active_operations != 0 {
1025 self.operations_suspended
1026 .wait_while(&mut snapshot_request, |_| {
1027 self.in_progress_operations.load(Ordering::Relaxed)
1028 != SNAPSHOT_REQUESTED_BIT
1029 });
1030 }
1031 suspended_operations = snapshot_request
1032 .suspended_operations
1033 .iter()
1034 .map(|op| op.arc().clone())
1035 .collect::<Vec<_>>();
1036 }
1037 self.storage.start_snapshot();
1038 let mut persisted_task_cache_log = self
1039 .persisted_task_cache_log
1040 .as_ref()
1041 .map(|l| l.take(|i| i))
1042 .unwrap_or_default();
1043 let mut snapshot_request = self.snapshot_request.lock();
1044 snapshot_request.snapshot_requested = false;
1045 self.in_progress_operations
1046 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1047 self.snapshot_completed.notify_all();
1048 let snapshot_time = Instant::now();
1049 drop(snapshot_request);
1050
1051 #[cfg(feature = "print_cache_item_size")]
1052 #[derive(Default)]
1053 struct TaskCacheStats {
1054 data: usize,
1055 data_compressed: usize,
1056 data_count: usize,
1057 meta: usize,
1058 meta_compressed: usize,
1059 meta_count: usize,
1060 upper_count: usize,
1061 collectibles_count: usize,
1062 aggregated_collectibles_count: usize,
1063 children_count: usize,
1064 followers_count: usize,
1065 collectibles_dependents_count: usize,
1066 aggregated_dirty_containers_count: usize,
1067 output_size: usize,
1068 }
1069 #[cfg(feature = "print_cache_item_size")]
1070 impl TaskCacheStats {
1071 fn compressed_size(data: &[u8]) -> Result<usize> {
1072 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1073 data,
1074 &mut Vec::new(),
1075 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1076 )?)
1077 }
1078
1079 fn add_data(&mut self, data: &[u8]) {
1080 self.data += data.len();
1081 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1082 self.data_count += 1;
1083 }
1084
1085 fn add_meta(&mut self, data: &[u8]) {
1086 self.meta += data.len();
1087 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1088 self.meta_count += 1;
1089 }
1090
1091 fn add_counts(&mut self, storage: &TaskStorage) {
1092 let counts = storage.meta_counts();
1093 self.upper_count += counts.upper;
1094 self.collectibles_count += counts.collectibles;
1095 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1096 self.children_count += counts.children;
1097 self.followers_count += counts.followers;
1098 self.collectibles_dependents_count += counts.collectibles_dependents;
1099 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1100 if let Some(output) = storage.get_output() {
1101 use turbo_bincode::turbo_bincode_encode;
1102
1103 self.output_size += turbo_bincode_encode(&output)
1104 .map(|data| data.len())
1105 .unwrap_or(0);
1106 }
1107 }
1108 }
1109 #[cfg(feature = "print_cache_item_size")]
1110 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1111 Mutex::new(FxHashMap::default());
1112
1113 let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1114 if task_id.is_transient() {
1115 return (None, None);
1116 }
1117
1118 let meta_restored = inner.flags.meta_restored();
1119 let data_restored = inner.flags.data_restored();
1120
1121 let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1123 let data = data_restored.then(|| inner.clone_data_snapshot());
1124
1125 (meta, data)
1126 };
1127 let process = |task_id: TaskId,
1128 (meta, data): (Option<TaskStorage>, Option<TaskStorage>),
1129 buffer: &mut TurboBincodeBuffer| {
1130 #[cfg(feature = "print_cache_item_size")]
1131 if let Some(ref m) = meta {
1132 task_cache_stats
1133 .lock()
1134 .entry(self.get_task_name(task_id, turbo_tasks))
1135 .or_default()
1136 .add_counts(m);
1137 }
1138 (
1139 task_id,
1140 meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
1141 data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
1142 )
1143 };
1144 let process_snapshot =
1145 |task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
1146 if task_id.is_transient() {
1147 return (task_id, None, None);
1148 }
1149
1150 #[cfg(feature = "print_cache_item_size")]
1151 if inner.flags.meta_modified() {
1152 task_cache_stats
1153 .lock()
1154 .entry(self.get_task_name(task_id, turbo_tasks))
1155 .or_default()
1156 .add_counts(&inner);
1157 }
1158
1159 (
1161 task_id,
1162 inner.flags.meta_modified().then(|| {
1163 encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
1164 }),
1165 inner.flags.data_modified().then(|| {
1166 encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
1167 }),
1168 )
1169 };
1170
1171 let snapshot = self
1172 .storage
1173 .take_snapshot(&preprocess, &process, &process_snapshot);
1174
1175 let task_snapshots = snapshot
1176 .into_iter()
1177 .filter_map(|iter| {
1178 let mut iter = iter
1179 .filter_map(
1180 |(task_id, meta, data): (
1181 _,
1182 Option<Result<SmallVec<_>>>,
1183 Option<Result<SmallVec<_>>>,
1184 )| {
1185 let meta = match meta {
1186 Some(Ok(meta)) => {
1187 #[cfg(feature = "print_cache_item_size")]
1188 task_cache_stats
1189 .lock()
1190 .entry(self.get_task_name(task_id, turbo_tasks))
1191 .or_default()
1192 .add_meta(&meta);
1193 Some(meta)
1194 }
1195 None => None,
1196 Some(Err(err)) => {
1197 println!(
1198 "Serializing task {} failed (meta): {:?}",
1199 self.debug_get_task_description(task_id),
1200 err
1201 );
1202 None
1203 }
1204 };
1205 let data = match data {
1206 Some(Ok(data)) => {
1207 #[cfg(feature = "print_cache_item_size")]
1208 task_cache_stats
1209 .lock()
1210 .entry(self.get_task_name(task_id, turbo_tasks))
1211 .or_default()
1212 .add_data(&data);
1213 Some(data)
1214 }
1215 None => None,
1216 Some(Err(err)) => {
1217 println!(
1218 "Serializing task {} failed (data): {:?}",
1219 self.debug_get_task_description(task_id),
1220 err
1221 );
1222 None
1223 }
1224 };
1225 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1226 },
1227 )
1228 .peekable();
1229 iter.peek().is_some().then_some(iter)
1230 })
1231 .collect::<Vec<_>>();
1232
1233 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1234
1235 drop(snapshot_span);
1236 let snapshot_duration = start.elapsed();
1237 let task_count = task_snapshots.len();
1238
1239 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1240 return Some((snapshot_time, false));
1241 }
1242
1243 let persist_start = Instant::now();
1244 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1245 {
1246 if let Err(err) = self.backing_storage.save_snapshot(
1247 suspended_operations,
1248 persisted_task_cache_log,
1249 task_snapshots,
1250 ) {
1251 eprintln!("Persisting failed: {err:?}");
1252 return None;
1253 }
1254 #[cfg(feature = "print_cache_item_size")]
1255 {
1256 let mut task_cache_stats = task_cache_stats
1257 .into_inner()
1258 .into_iter()
1259 .collect::<Vec<_>>();
1260 if !task_cache_stats.is_empty() {
1261 use turbo_tasks::util::FormatBytes;
1262
1263 use crate::utils::markdown_table::print_markdown_table;
1264
1265 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1266 (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1267 .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1268 });
1269 println!(
1270 "Task cache stats: {} ({})",
1271 FormatBytes(
1272 task_cache_stats
1273 .iter()
1274 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1275 .sum::<usize>()
1276 ),
1277 FormatBytes(
1278 task_cache_stats
1279 .iter()
1280 .map(|(_, s)| s.data + s.meta)
1281 .sum::<usize>()
1282 )
1283 );
1284
1285 print_markdown_table(
1286 [
1287 "Task",
1288 " Total Size",
1289 " Data Size",
1290 " Data Count x Avg",
1291 " Data Count x Avg",
1292 " Meta Size",
1293 " Meta Count x Avg",
1294 " Meta Count x Avg",
1295 " Uppers",
1296 " Coll",
1297 " Agg Coll",
1298 " Children",
1299 " Followers",
1300 " Coll Deps",
1301 " Agg Dirty",
1302 " Output Size",
1303 ],
1304 task_cache_stats.iter(),
1305 |(task_desc, stats)| {
1306 [
1307 task_desc.to_string(),
1308 format!(
1309 " {} ({})",
1310 FormatBytes(stats.data_compressed + stats.meta_compressed),
1311 FormatBytes(stats.data + stats.meta)
1312 ),
1313 format!(
1314 " {} ({})",
1315 FormatBytes(stats.data_compressed),
1316 FormatBytes(stats.data)
1317 ),
1318 format!(" {} x", stats.data_count,),
1319 format!(
1320 "{} ({})",
1321 FormatBytes(
1322 stats
1323 .data_compressed
1324 .checked_div(stats.data_count)
1325 .unwrap_or(0)
1326 ),
1327 FormatBytes(
1328 stats.data.checked_div(stats.data_count).unwrap_or(0)
1329 ),
1330 ),
1331 format!(
1332 " {} ({})",
1333 FormatBytes(stats.meta_compressed),
1334 FormatBytes(stats.meta)
1335 ),
1336 format!(" {} x", stats.meta_count,),
1337 format!(
1338 "{} ({})",
1339 FormatBytes(
1340 stats
1341 .meta_compressed
1342 .checked_div(stats.meta_count)
1343 .unwrap_or(0)
1344 ),
1345 FormatBytes(
1346 stats.meta.checked_div(stats.meta_count).unwrap_or(0)
1347 ),
1348 ),
1349 format!(" {}", stats.upper_count),
1350 format!(" {}", stats.collectibles_count),
1351 format!(" {}", stats.aggregated_collectibles_count),
1352 format!(" {}", stats.children_count),
1353 format!(" {}", stats.followers_count),
1354 format!(" {}", stats.collectibles_dependents_count),
1355 format!(" {}", stats.aggregated_dirty_containers_count),
1356 format!(" {}", FormatBytes(stats.output_size)),
1357 ]
1358 },
1359 );
1360 }
1361 }
1362 }
1363
1364 let elapsed = start.elapsed();
1365 let persist_duration = persist_start.elapsed();
1366 if elapsed > Duration::from_secs(10) {
1368 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1369 "Finished writing to filesystem cache".to_string(),
1370 elapsed,
1371 )));
1372 }
1373
1374 let wall_start_ms = wall_start
1375 .duration_since(SystemTime::UNIX_EPOCH)
1376 .unwrap_or_default()
1377 .as_secs_f64()
1379 * 1000.0;
1380 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1381 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1382 "turbopack-persistence",
1383 wall_start_ms,
1384 wall_end_ms,
1385 vec![
1386 ("reason", serde_json::Value::from(reason)),
1387 (
1388 "snapshot_duration_ms",
1389 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1390 ),
1391 (
1392 "persist_duration_ms",
1393 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1394 ),
1395 ("task_count", serde_json::Value::from(task_count)),
1396 ],
1397 )));
1398
1399 Some((snapshot_time, true))
1400 }
1401
1402 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1403 if self.should_restore() {
1404 let uncompleted_operations = self
1408 .backing_storage
1409 .uncompleted_operations()
1410 .expect("Failed to get uncompleted operations");
1411 if !uncompleted_operations.is_empty() {
1412 let mut ctx = self.execute_context(turbo_tasks);
1413 for op in uncompleted_operations {
1414 op.execute(&mut ctx);
1415 }
1416 }
1417 }
1418
1419 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1422 let _span = trace_span!("persisting background job").entered();
1424 let _span = tracing::info_span!("thread").entered();
1425 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1426 }
1427 }
1428
1429 fn stopping(&self) {
1430 self.stopping.store(true, Ordering::Release);
1431 self.stopping_event.notify(usize::MAX);
1432 }
1433
1434 #[allow(unused_variables)]
1435 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1436 #[cfg(feature = "verify_aggregation_graph")]
1437 {
1438 self.is_idle.store(false, Ordering::Release);
1439 self.verify_aggregation_graph(turbo_tasks, false);
1440 }
1441 if self.should_persist() {
1442 self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1443 }
1444 drop_contents(&self.task_cache);
1445 self.storage.drop_contents();
1446 if let Err(err) = self.backing_storage.shutdown() {
1447 println!("Shutting down failed: {err}");
1448 }
1449 }
1450
1451 #[allow(unused_variables)]
1452 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1453 self.idle_start_event.notify(usize::MAX);
1454
1455 #[cfg(feature = "verify_aggregation_graph")]
1456 {
1457 use tokio::select;
1458
1459 self.is_idle.store(true, Ordering::Release);
1460 let this = self.clone();
1461 let turbo_tasks = turbo_tasks.pin();
1462 tokio::task::spawn(async move {
1463 select! {
1464 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1465 }
1467 _ = this.idle_end_event.listen() => {
1468 return;
1469 }
1470 }
1471 if !this.is_idle.load(Ordering::Relaxed) {
1472 return;
1473 }
1474 this.verify_aggregation_graph(&*turbo_tasks, true);
1475 });
1476 }
1477 }
1478
1479 fn idle_end(&self) {
1480 #[cfg(feature = "verify_aggregation_graph")]
1481 self.is_idle.store(false, Ordering::Release);
1482 self.idle_end_event.notify(usize::MAX);
1483 }
1484
1485 fn get_or_create_persistent_task(
1486 &self,
1487 task_type: CachedTaskType,
1488 parent_task: Option<TaskId>,
1489 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1490 ) -> TaskId {
1491 let is_root = task_type.native_fn.is_root;
1492
1493 if let Some(task_id) = self.task_cache.get(&task_type) {
1495 let task_id = *task_id;
1496 self.track_cache_hit(&task_type);
1497 self.connect_child(
1498 parent_task,
1499 task_id,
1500 Some(ArcOrOwned::Owned(task_type)),
1501 turbo_tasks,
1502 );
1503 return task_id;
1504 }
1505
1506 let mut ctx = self.execute_context(turbo_tasks);
1508
1509 let mut is_new = false;
1510 let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1511 self.track_cache_hit(&task_type);
1514 let task_type = match raw_entry(&self.task_cache, &task_type) {
1515 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1516 RawEntry::Vacant(e) => {
1517 let task_type = Arc::new(task_type);
1518 e.insert(task_type.clone(), task_id);
1519 ArcOrOwned::Arc(task_type)
1520 }
1521 };
1522 (task_id, task_type)
1523 } else {
1524 let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1527 RawEntry::Occupied(e) => {
1528 let task_id = *e.get();
1531 drop(e);
1532 self.track_cache_hit(&task_type);
1533 (task_id, ArcOrOwned::Owned(task_type))
1534 }
1535 RawEntry::Vacant(e) => {
1536 let task_type = Arc::new(task_type);
1538 let task_id = self.persisted_task_id_factory.get();
1539 e.insert(task_type.clone(), task_id);
1540 self.track_cache_miss(&task_type);
1542 is_new = true;
1543 if let Some(log) = &self.persisted_task_cache_log {
1544 log.lock(task_id).push((task_type.clone(), task_id));
1545 }
1546 (task_id, ArcOrOwned::Arc(task_type))
1547 }
1548 };
1549 (task_id, task_type)
1550 };
1551 if is_new && is_root {
1552 AggregationUpdateQueue::run(
1553 AggregationUpdateJob::UpdateAggregationNumber {
1554 task_id,
1555 base_aggregation_number: u32::MAX,
1556 distance: None,
1557 },
1558 &mut ctx,
1559 );
1560 }
1561 operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1563
1564 task_id
1565 }
1566
1567 fn get_or_create_transient_task(
1568 &self,
1569 task_type: CachedTaskType,
1570 parent_task: Option<TaskId>,
1571 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1572 ) -> TaskId {
1573 let is_root = task_type.native_fn.is_root;
1574
1575 if let Some(parent_task) = parent_task
1576 && !parent_task.is_transient()
1577 {
1578 self.panic_persistent_calling_transient(
1579 self.debug_get_task_description(parent_task),
1580 Some(&task_type),
1581 None,
1582 );
1583 }
1584 if let Some(task_id) = self.task_cache.get(&task_type) {
1586 let task_id = *task_id;
1587 self.track_cache_hit(&task_type);
1588 self.connect_child(
1589 parent_task,
1590 task_id,
1591 Some(ArcOrOwned::Owned(task_type)),
1592 turbo_tasks,
1593 );
1594 return task_id;
1595 }
1596 match raw_entry(&self.task_cache, &task_type) {
1598 RawEntry::Occupied(e) => {
1599 let task_id = *e.get();
1600 drop(e);
1601 self.track_cache_hit(&task_type);
1602 self.connect_child(
1603 parent_task,
1604 task_id,
1605 Some(ArcOrOwned::Owned(task_type)),
1606 turbo_tasks,
1607 );
1608 task_id
1609 }
1610 RawEntry::Vacant(e) => {
1611 let task_type = Arc::new(task_type);
1612 let task_id = self.transient_task_id_factory.get();
1613 e.insert(task_type.clone(), task_id);
1614 self.track_cache_miss(&task_type);
1615
1616 if is_root {
1617 let mut ctx = self.execute_context(turbo_tasks);
1618 AggregationUpdateQueue::run(
1619 AggregationUpdateJob::UpdateAggregationNumber {
1620 task_id,
1621 base_aggregation_number: u32::MAX,
1622 distance: None,
1623 },
1624 &mut ctx,
1625 );
1626 }
1627
1628 self.connect_child(
1629 parent_task,
1630 task_id,
1631 Some(ArcOrOwned::Arc(task_type)),
1632 turbo_tasks,
1633 );
1634
1635 task_id
1636 }
1637 }
1638 }
1639
1640 fn debug_trace_transient_task(
1643 &self,
1644 task_type: &CachedTaskType,
1645 cell_id: Option<CellId>,
1646 ) -> DebugTraceTransientTask {
1647 fn inner_id(
1650 backend: &TurboTasksBackendInner<impl BackingStorage>,
1651 task_id: TaskId,
1652 cell_type_id: Option<ValueTypeId>,
1653 visited_set: &mut FxHashSet<TaskId>,
1654 ) -> DebugTraceTransientTask {
1655 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1656 if visited_set.contains(&task_id) {
1657 let task_name = task_type.get_name();
1658 DebugTraceTransientTask::Collapsed {
1659 task_name,
1660 cell_type_id,
1661 }
1662 } else {
1663 inner_cached(backend, &task_type, cell_type_id, visited_set)
1664 }
1665 } else {
1666 DebugTraceTransientTask::Uncached { cell_type_id }
1667 }
1668 }
1669 fn inner_cached(
1670 backend: &TurboTasksBackendInner<impl BackingStorage>,
1671 task_type: &CachedTaskType,
1672 cell_type_id: Option<ValueTypeId>,
1673 visited_set: &mut FxHashSet<TaskId>,
1674 ) -> DebugTraceTransientTask {
1675 let task_name = task_type.get_name();
1676
1677 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1678 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1679 return None;
1683 };
1684 if task_id.is_transient() {
1685 Some(Box::new(inner_id(
1686 backend,
1687 task_id,
1688 cause_self_raw_vc.try_get_type_id(),
1689 visited_set,
1690 )))
1691 } else {
1692 None
1693 }
1694 });
1695 let cause_args = task_type
1696 .arg
1697 .get_raw_vcs()
1698 .into_iter()
1699 .filter_map(|raw_vc| {
1700 let Some(task_id) = raw_vc.try_get_task_id() else {
1701 return None;
1703 };
1704 if !task_id.is_transient() {
1705 return None;
1706 }
1707 Some((task_id, raw_vc.try_get_type_id()))
1708 })
1709 .collect::<IndexSet<_>>() .into_iter()
1711 .map(|(task_id, cell_type_id)| {
1712 inner_id(backend, task_id, cell_type_id, visited_set)
1713 })
1714 .collect();
1715
1716 DebugTraceTransientTask::Cached {
1717 task_name,
1718 cell_type_id,
1719 cause_self,
1720 cause_args,
1721 }
1722 }
1723 inner_cached(
1724 self,
1725 task_type,
1726 cell_id.map(|c| c.type_id),
1727 &mut FxHashSet::default(),
1728 )
1729 }
1730
1731 fn invalidate_task(
1732 &self,
1733 task_id: TaskId,
1734 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1735 ) {
1736 if !self.should_track_dependencies() {
1737 panic!("Dependency tracking is disabled so invalidation is not allowed");
1738 }
1739 operation::InvalidateOperation::run(
1740 smallvec![task_id],
1741 #[cfg(feature = "trace_task_dirty")]
1742 TaskDirtyCause::Invalidator,
1743 self.execute_context(turbo_tasks),
1744 );
1745 }
1746
1747 fn invalidate_tasks(
1748 &self,
1749 tasks: &[TaskId],
1750 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1751 ) {
1752 if !self.should_track_dependencies() {
1753 panic!("Dependency tracking is disabled so invalidation is not allowed");
1754 }
1755 operation::InvalidateOperation::run(
1756 tasks.iter().copied().collect(),
1757 #[cfg(feature = "trace_task_dirty")]
1758 TaskDirtyCause::Unknown,
1759 self.execute_context(turbo_tasks),
1760 );
1761 }
1762
1763 fn invalidate_tasks_set(
1764 &self,
1765 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1766 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1767 ) {
1768 if !self.should_track_dependencies() {
1769 panic!("Dependency tracking is disabled so invalidation is not allowed");
1770 }
1771 operation::InvalidateOperation::run(
1772 tasks.iter().copied().collect(),
1773 #[cfg(feature = "trace_task_dirty")]
1774 TaskDirtyCause::Unknown,
1775 self.execute_context(turbo_tasks),
1776 );
1777 }
1778
1779 fn invalidate_serialization(
1780 &self,
1781 task_id: TaskId,
1782 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1783 ) {
1784 if task_id.is_transient() {
1785 return;
1786 }
1787 let mut ctx = self.execute_context(turbo_tasks);
1788 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1789 task.invalidate_serialization();
1790 }
1791
1792 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1793 let task = self.storage.access_mut(task_id);
1794 if let Some(value) = task.get_persistent_task_type() {
1795 format!("{task_id:?} {}", value)
1796 } else if let Some(value) = task.get_transient_task_type() {
1797 format!("{task_id:?} {}", value)
1798 } else {
1799 format!("{task_id:?} unknown")
1800 }
1801 }
1802
1803 fn get_task_name(
1804 &self,
1805 task_id: TaskId,
1806 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1807 ) -> String {
1808 let mut ctx = self.execute_context(turbo_tasks);
1809 let task = ctx.task(task_id, TaskDataCategory::Data);
1810 if let Some(value) = task.get_persistent_task_type() {
1811 value.to_string()
1812 } else if let Some(value) = task.get_transient_task_type() {
1813 value.to_string()
1814 } else {
1815 "unknown".to_string()
1816 }
1817 }
1818
1819 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1820 let task = self.storage.access_mut(task_id);
1821 task.get_persistent_task_type().cloned()
1822 }
1823
1824 fn task_execution_canceled(
1825 &self,
1826 task_id: TaskId,
1827 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1828 ) {
1829 let mut ctx = self.execute_context(turbo_tasks);
1830 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1831 if let Some(in_progress) = task.take_in_progress() {
1832 match in_progress {
1833 InProgressState::Scheduled {
1834 done_event,
1835 reason: _,
1836 } => done_event.notify(usize::MAX),
1837 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1838 done_event.notify(usize::MAX)
1839 }
1840 InProgressState::Canceled => {}
1841 }
1842 }
1843 let old = task.set_in_progress(InProgressState::Canceled);
1844 debug_assert!(old.is_none(), "InProgress already exists");
1845 }
1846
1847 fn try_start_task_execution(
1848 &self,
1849 task_id: TaskId,
1850 priority: TaskPriority,
1851 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1852 ) -> Option<TaskExecutionSpec<'_>> {
1853 let execution_reason;
1854 let task_type;
1855 {
1856 let mut ctx = self.execute_context(turbo_tasks);
1857 let mut task = ctx.task(task_id, TaskDataCategory::All);
1858 task_type = task.get_task_type().to_owned();
1859 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1860 if let Some(tasks) = task.prefetch() {
1861 drop(task);
1862 ctx.prepare_tasks(tasks);
1863 task = ctx.task(task_id, TaskDataCategory::All);
1864 }
1865 let in_progress = task.take_in_progress()?;
1866 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1867 let old = task.set_in_progress(in_progress);
1868 debug_assert!(old.is_none(), "InProgress already exists");
1869 return None;
1870 };
1871 execution_reason = reason;
1872 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1873 InProgressStateInner {
1874 stale: false,
1875 once_task,
1876 done_event,
1877 session_dependent: false,
1878 marked_as_completed: false,
1879 new_children: Default::default(),
1880 },
1881 )));
1882 debug_assert!(old.is_none(), "InProgress already exists");
1883
1884 enum Collectible {
1886 Current(CollectibleRef, i32),
1887 Outdated(CollectibleRef),
1888 }
1889 let collectibles = task
1890 .iter_collectibles()
1891 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1892 .chain(
1893 task.iter_outdated_collectibles()
1894 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1895 )
1896 .collect::<Vec<_>>();
1897 for collectible in collectibles {
1898 match collectible {
1899 Collectible::Current(collectible, value) => {
1900 let _ = task.insert_outdated_collectible(collectible, value);
1901 }
1902 Collectible::Outdated(collectible) => {
1903 if task
1904 .collectibles()
1905 .is_none_or(|m| m.get(&collectible).is_none())
1906 {
1907 task.remove_outdated_collectibles(&collectible);
1908 }
1909 }
1910 }
1911 }
1912
1913 if self.should_track_dependencies() {
1914 let cell_dependencies = task.iter_cell_dependencies().collect();
1916 task.set_outdated_cell_dependencies(cell_dependencies);
1917
1918 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1919 task.set_outdated_output_dependencies(outdated_output_dependencies);
1920 }
1921 }
1922
1923 let (span, future) = match task_type {
1924 TaskType::Cached(task_type) => {
1925 let CachedTaskType {
1926 native_fn,
1927 this,
1928 arg,
1929 } = &*task_type;
1930 (
1931 native_fn.span(task_id.persistence(), execution_reason, priority),
1932 native_fn.execute(*this, &**arg),
1933 )
1934 }
1935 TaskType::Transient(task_type) => {
1936 let span = tracing::trace_span!("turbo_tasks::root_task");
1937 let future = match &*task_type {
1938 TransientTask::Root(f) => f(),
1939 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1940 };
1941 (span, future)
1942 }
1943 };
1944 Some(TaskExecutionSpec { future, span })
1945 }
1946
1947 fn task_execution_completed(
1948 &self,
1949 task_id: TaskId,
1950 result: Result<RawVc, TurboTasksExecutionError>,
1951 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1952 #[cfg(feature = "verify_determinism")] stateful: bool,
1953 has_invalidator: bool,
1954 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1955 ) -> bool {
1956 #[cfg(not(feature = "trace_task_details"))]
1971 let span = tracing::trace_span!(
1972 "task execution completed",
1973 new_children = tracing::field::Empty
1974 )
1975 .entered();
1976 #[cfg(feature = "trace_task_details")]
1977 let span = tracing::trace_span!(
1978 "task execution completed",
1979 task_id = display(task_id),
1980 result = match result.as_ref() {
1981 Ok(value) => display(either::Either::Left(value)),
1982 Err(err) => display(either::Either::Right(err)),
1983 },
1984 new_children = tracing::field::Empty,
1985 immutable = tracing::field::Empty,
1986 new_output = tracing::field::Empty,
1987 output_dependents = tracing::field::Empty,
1988 stale = tracing::field::Empty,
1989 )
1990 .entered();
1991
1992 let is_error = result.is_err();
1993
1994 let mut ctx = self.execute_context(turbo_tasks);
1995
1996 let Some(TaskExecutionCompletePrepareResult {
1997 new_children,
1998 is_now_immutable,
1999 #[cfg(feature = "verify_determinism")]
2000 no_output_set,
2001 new_output,
2002 output_dependent_tasks,
2003 }) = self.task_execution_completed_prepare(
2004 &mut ctx,
2005 #[cfg(feature = "trace_task_details")]
2006 &span,
2007 task_id,
2008 result,
2009 cell_counters,
2010 #[cfg(feature = "verify_determinism")]
2011 stateful,
2012 has_invalidator,
2013 )
2014 else {
2015 #[cfg(feature = "trace_task_details")]
2017 span.record("stale", "prepare");
2018 return true;
2019 };
2020
2021 #[cfg(feature = "trace_task_details")]
2022 span.record("new_output", new_output.is_some());
2023 #[cfg(feature = "trace_task_details")]
2024 span.record("output_dependents", output_dependent_tasks.len());
2025
2026 if !output_dependent_tasks.is_empty() {
2031 self.task_execution_completed_invalidate_output_dependent(
2032 &mut ctx,
2033 task_id,
2034 output_dependent_tasks,
2035 );
2036 }
2037
2038 let has_new_children = !new_children.is_empty();
2039 span.record("new_children", new_children.len());
2040
2041 if has_new_children {
2042 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2043 }
2044
2045 if has_new_children
2046 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2047 {
2048 #[cfg(feature = "trace_task_details")]
2050 span.record("stale", "connect");
2051 return true;
2052 }
2053
2054 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2055 &mut ctx,
2056 task_id,
2057 #[cfg(feature = "verify_determinism")]
2058 no_output_set,
2059 new_output,
2060 is_now_immutable,
2061 );
2062 if stale {
2063 #[cfg(feature = "trace_task_details")]
2065 span.record("stale", "finish");
2066 return true;
2067 }
2068
2069 let removed_data =
2070 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2071
2072 drop(removed_data);
2074 drop(in_progress_cells);
2075
2076 false
2077 }
2078
2079 fn task_execution_completed_prepare(
2080 &self,
2081 ctx: &mut impl ExecuteContext<'_>,
2082 #[cfg(feature = "trace_task_details")] span: &Span,
2083 task_id: TaskId,
2084 result: Result<RawVc, TurboTasksExecutionError>,
2085 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2086 #[cfg(feature = "verify_determinism")] stateful: bool,
2087 has_invalidator: bool,
2088 ) -> Option<TaskExecutionCompletePrepareResult> {
2089 let mut task = ctx.task(task_id, TaskDataCategory::All);
2090 let Some(in_progress) = task.get_in_progress_mut() else {
2091 panic!("Task execution completed, but task is not in progress: {task:#?}");
2092 };
2093 if matches!(in_progress, InProgressState::Canceled) {
2094 return Some(TaskExecutionCompletePrepareResult {
2095 new_children: Default::default(),
2096 is_now_immutable: false,
2097 #[cfg(feature = "verify_determinism")]
2098 no_output_set: false,
2099 new_output: None,
2100 output_dependent_tasks: Default::default(),
2101 });
2102 }
2103 let &mut InProgressState::InProgress(box InProgressStateInner {
2104 stale,
2105 ref mut new_children,
2106 session_dependent,
2107 once_task: is_once_task,
2108 ..
2109 }) = in_progress
2110 else {
2111 panic!("Task execution completed, but task is not in progress: {task:#?}");
2112 };
2113
2114 #[cfg(not(feature = "no_fast_stale"))]
2116 if stale && !is_once_task {
2117 let Some(InProgressState::InProgress(box InProgressStateInner {
2118 done_event,
2119 mut new_children,
2120 ..
2121 })) = task.take_in_progress()
2122 else {
2123 unreachable!();
2124 };
2125 let old = task.set_in_progress(InProgressState::Scheduled {
2126 done_event,
2127 reason: TaskExecutionReason::Stale,
2128 });
2129 debug_assert!(old.is_none(), "InProgress already exists");
2130 for task in task.iter_children() {
2133 new_children.remove(&task);
2134 }
2135 drop(task);
2136
2137 AggregationUpdateQueue::run(
2140 AggregationUpdateJob::DecreaseActiveCounts {
2141 task_ids: new_children.into_iter().collect(),
2142 },
2143 ctx,
2144 );
2145 return None;
2146 }
2147
2148 let mut new_children = take(new_children);
2150
2151 #[cfg(feature = "verify_determinism")]
2153 if stateful {
2154 task.set_stateful(true);
2155 }
2156
2157 if has_invalidator {
2159 task.set_invalidator(true);
2160 }
2161
2162 let old_counters: FxHashMap<_, _> = task
2164 .iter_cell_type_max_index()
2165 .map(|(&k, &v)| (k, v))
2166 .collect();
2167 let mut counters_to_remove = old_counters.clone();
2168
2169 for (&cell_type, &max_index) in cell_counters.iter() {
2170 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2171 if old_max_index != max_index {
2172 task.insert_cell_type_max_index(cell_type, max_index);
2173 }
2174 } else {
2175 task.insert_cell_type_max_index(cell_type, max_index);
2176 }
2177 }
2178 for (cell_type, _) in counters_to_remove {
2179 task.remove_cell_type_max_index(&cell_type);
2180 }
2181
2182 let mut queue = AggregationUpdateQueue::new();
2183
2184 let mut old_edges = Vec::new();
2185
2186 let has_children = !new_children.is_empty();
2187 let is_immutable = task.immutable();
2188 let task_dependencies_for_immutable =
2189 if !is_immutable
2191 && !session_dependent
2193 && !task.invalidator()
2195 && task.is_collectibles_dependencies_empty()
2197 {
2198 Some(
2199 task.iter_output_dependencies()
2201 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2202 .collect::<FxHashSet<_>>(),
2203 )
2204 } else {
2205 None
2206 };
2207
2208 if has_children {
2209 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2211
2212 old_edges.extend(
2214 task.iter_children()
2215 .filter(|task| !new_children.remove(task))
2216 .map(OutdatedEdge::Child),
2217 );
2218 } else {
2219 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2220 }
2221
2222 old_edges.extend(
2223 task.iter_outdated_collectibles()
2224 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2225 );
2226
2227 if self.should_track_dependencies() {
2228 old_edges.extend(
2235 task.iter_outdated_cell_dependencies()
2236 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2237 );
2238 old_edges.extend(
2239 task.iter_outdated_output_dependencies()
2240 .map(OutdatedEdge::OutputDependency),
2241 );
2242 }
2243
2244 let current_output = task.get_output();
2246 #[cfg(feature = "verify_determinism")]
2247 let no_output_set = current_output.is_none();
2248 let new_output = match result {
2249 Ok(RawVc::TaskOutput(output_task_id)) => {
2250 if let Some(OutputValue::Output(current_task_id)) = current_output
2251 && *current_task_id == output_task_id
2252 {
2253 None
2254 } else {
2255 Some(OutputValue::Output(output_task_id))
2256 }
2257 }
2258 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2259 if let Some(OutputValue::Cell(CellRef {
2260 task: current_task_id,
2261 cell: current_cell,
2262 })) = current_output
2263 && *current_task_id == output_task_id
2264 && *current_cell == cell
2265 {
2266 None
2267 } else {
2268 Some(OutputValue::Cell(CellRef {
2269 task: output_task_id,
2270 cell,
2271 }))
2272 }
2273 }
2274 Ok(RawVc::LocalOutput(..)) => {
2275 panic!("Non-local tasks must not return a local Vc");
2276 }
2277 Err(err) => {
2278 if let Some(OutputValue::Error(old_error)) = current_output
2279 && **old_error == err
2280 {
2281 None
2282 } else {
2283 Some(OutputValue::Error(Arc::new((&err).into())))
2284 }
2285 }
2286 };
2287 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2288 if new_output.is_some() && ctx.should_track_dependencies() {
2290 output_dependent_tasks = task.iter_output_dependent().collect();
2291 }
2292
2293 drop(task);
2294
2295 let mut is_now_immutable = false;
2297 if let Some(dependencies) = task_dependencies_for_immutable
2298 && dependencies
2299 .iter()
2300 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2301 {
2302 is_now_immutable = true;
2303 }
2304 #[cfg(feature = "trace_task_details")]
2305 span.record("immutable", is_immutable || is_now_immutable);
2306
2307 if !queue.is_empty() || !old_edges.is_empty() {
2308 #[cfg(feature = "trace_task_completion")]
2309 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2310 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2314 }
2315
2316 Some(TaskExecutionCompletePrepareResult {
2317 new_children,
2318 is_now_immutable,
2319 #[cfg(feature = "verify_determinism")]
2320 no_output_set,
2321 new_output,
2322 output_dependent_tasks,
2323 })
2324 }
2325
2326 fn task_execution_completed_invalidate_output_dependent(
2327 &self,
2328 ctx: &mut impl ExecuteContext<'_>,
2329 task_id: TaskId,
2330 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2331 ) {
2332 debug_assert!(!output_dependent_tasks.is_empty());
2333
2334 if output_dependent_tasks.len() > 1 {
2335 ctx.prepare_tasks(
2336 output_dependent_tasks
2337 .iter()
2338 .map(|&id| (id, TaskDataCategory::All)),
2339 );
2340 }
2341
2342 fn process_output_dependents(
2343 ctx: &mut impl ExecuteContext<'_>,
2344 task_id: TaskId,
2345 dependent_task_id: TaskId,
2346 queue: &mut AggregationUpdateQueue,
2347 ) {
2348 #[cfg(feature = "trace_task_output_dependencies")]
2349 let span = tracing::trace_span!(
2350 "invalidate output dependency",
2351 task = %task_id,
2352 dependent_task = %dependent_task_id,
2353 result = tracing::field::Empty,
2354 )
2355 .entered();
2356 let mut make_stale = true;
2357 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2358 let transient_task_type = dependent.get_transient_task_type();
2359 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2360 #[cfg(feature = "trace_task_output_dependencies")]
2362 span.record("result", "once task");
2363 return;
2364 }
2365 if dependent.outdated_output_dependencies_contains(&task_id) {
2366 #[cfg(feature = "trace_task_output_dependencies")]
2367 span.record("result", "outdated dependency");
2368 make_stale = false;
2373 } else if !dependent.output_dependencies_contains(&task_id) {
2374 #[cfg(feature = "trace_task_output_dependencies")]
2377 span.record("result", "no backward dependency");
2378 return;
2379 }
2380 make_task_dirty_internal(
2381 dependent,
2382 dependent_task_id,
2383 make_stale,
2384 #[cfg(feature = "trace_task_dirty")]
2385 TaskDirtyCause::OutputChange { task_id },
2386 queue,
2387 ctx,
2388 );
2389 #[cfg(feature = "trace_task_output_dependencies")]
2390 span.record("result", "marked dirty");
2391 }
2392
2393 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2394 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2395 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2396 let _ = scope_and_block(chunks.len(), |scope| {
2397 for chunk in chunks {
2398 let child_ctx = ctx.child_context();
2399 scope.spawn(move || {
2400 let mut ctx = child_ctx.create();
2401 let mut queue = AggregationUpdateQueue::new();
2402 for dependent_task_id in chunk {
2403 process_output_dependents(
2404 &mut ctx,
2405 task_id,
2406 dependent_task_id,
2407 &mut queue,
2408 )
2409 }
2410 queue.execute(&mut ctx);
2411 });
2412 }
2413 });
2414 } else {
2415 let mut queue = AggregationUpdateQueue::new();
2416 for dependent_task_id in output_dependent_tasks {
2417 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2418 }
2419 queue.execute(ctx);
2420 }
2421 }
2422
2423 fn task_execution_completed_unfinished_children_dirty(
2424 &self,
2425 ctx: &mut impl ExecuteContext<'_>,
2426 new_children: &FxHashSet<TaskId>,
2427 ) {
2428 debug_assert!(!new_children.is_empty());
2429
2430 let mut queue = AggregationUpdateQueue::new();
2431 ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2432 if !child_task.has_output() {
2433 let child_id = child_task.id();
2434 make_task_dirty_internal(
2435 child_task,
2436 child_id,
2437 false,
2438 #[cfg(feature = "trace_task_dirty")]
2439 TaskDirtyCause::InitialDirty,
2440 &mut queue,
2441 ctx,
2442 );
2443 }
2444 });
2445
2446 queue.execute(ctx);
2447 }
2448
2449 fn task_execution_completed_connect(
2450 &self,
2451 ctx: &mut impl ExecuteContext<'_>,
2452 task_id: TaskId,
2453 new_children: FxHashSet<TaskId>,
2454 ) -> bool {
2455 debug_assert!(!new_children.is_empty());
2456
2457 let mut task = ctx.task(task_id, TaskDataCategory::All);
2458 let Some(in_progress) = task.get_in_progress() else {
2459 panic!("Task execution completed, but task is not in progress: {task:#?}");
2460 };
2461 if matches!(in_progress, InProgressState::Canceled) {
2462 return false;
2464 }
2465 let InProgressState::InProgress(box InProgressStateInner {
2466 #[cfg(not(feature = "no_fast_stale"))]
2467 stale,
2468 once_task: is_once_task,
2469 ..
2470 }) = in_progress
2471 else {
2472 panic!("Task execution completed, but task is not in progress: {task:#?}");
2473 };
2474
2475 #[cfg(not(feature = "no_fast_stale"))]
2477 if *stale && !is_once_task {
2478 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2479 task.take_in_progress()
2480 else {
2481 unreachable!();
2482 };
2483 let old = task.set_in_progress(InProgressState::Scheduled {
2484 done_event,
2485 reason: TaskExecutionReason::Stale,
2486 });
2487 debug_assert!(old.is_none(), "InProgress already exists");
2488 drop(task);
2489
2490 AggregationUpdateQueue::run(
2493 AggregationUpdateJob::DecreaseActiveCounts {
2494 task_ids: new_children.into_iter().collect(),
2495 },
2496 ctx,
2497 );
2498 return true;
2499 }
2500
2501 let has_active_count = ctx.should_track_activeness()
2502 && task
2503 .get_activeness()
2504 .is_some_and(|activeness| activeness.active_counter > 0);
2505 connect_children(
2506 ctx,
2507 task_id,
2508 task,
2509 new_children,
2510 has_active_count,
2511 ctx.should_track_activeness(),
2512 );
2513
2514 false
2515 }
2516
2517 fn task_execution_completed_finish(
2518 &self,
2519 ctx: &mut impl ExecuteContext<'_>,
2520 task_id: TaskId,
2521 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2522 new_output: Option<OutputValue>,
2523 is_now_immutable: bool,
2524 ) -> (
2525 bool,
2526 Option<
2527 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2528 >,
2529 ) {
2530 let mut task = ctx.task(task_id, TaskDataCategory::All);
2531 let Some(in_progress) = task.take_in_progress() else {
2532 panic!("Task execution completed, but task is not in progress: {task:#?}");
2533 };
2534 if matches!(in_progress, InProgressState::Canceled) {
2535 return (false, None);
2537 }
2538 let InProgressState::InProgress(box InProgressStateInner {
2539 done_event,
2540 once_task: is_once_task,
2541 stale,
2542 session_dependent,
2543 marked_as_completed: _,
2544 new_children,
2545 }) = in_progress
2546 else {
2547 panic!("Task execution completed, but task is not in progress: {task:#?}");
2548 };
2549 debug_assert!(new_children.is_empty());
2550
2551 if stale && !is_once_task {
2553 let old = task.set_in_progress(InProgressState::Scheduled {
2554 done_event,
2555 reason: TaskExecutionReason::Stale,
2556 });
2557 debug_assert!(old.is_none(), "InProgress already exists");
2558 return (true, None);
2559 }
2560
2561 let mut old_content = None;
2563 if let Some(value) = new_output {
2564 old_content = task.set_output(value);
2565 }
2566
2567 if is_now_immutable {
2570 task.set_immutable(true);
2571 }
2572
2573 let in_progress_cells = task.take_in_progress_cells();
2575 if let Some(ref cells) = in_progress_cells {
2576 for state in cells.values() {
2577 state.event.notify(usize::MAX);
2578 }
2579 }
2580
2581 let old_dirtyness = task.get_dirty().cloned();
2583 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2584 None => (false, false),
2585 Some(Dirtyness::Dirty(_)) => (true, false),
2586 Some(Dirtyness::SessionDependent) => {
2587 let clean_in_current_session = task.current_session_clean();
2588 (true, clean_in_current_session)
2589 }
2590 };
2591
2592 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2594 (Some(Dirtyness::SessionDependent), true, true)
2595 } else {
2596 (None, false, false)
2597 };
2598
2599 let dirty_changed = old_dirtyness != new_dirtyness;
2601 if dirty_changed {
2602 if let Some(value) = new_dirtyness {
2603 task.set_dirty(value);
2604 } else if old_dirtyness.is_some() {
2605 task.take_dirty();
2606 }
2607 }
2608 if old_current_session_self_clean != new_current_session_self_clean {
2609 if new_current_session_self_clean {
2610 task.set_current_session_clean(true);
2611 } else if old_current_session_self_clean {
2612 task.set_current_session_clean(false);
2613 }
2614 }
2615
2616 let data_update = if old_self_dirty != new_self_dirty
2618 || old_current_session_self_clean != new_current_session_self_clean
2619 {
2620 let dirty_container_count = task
2621 .get_aggregated_dirty_container_count()
2622 .cloned()
2623 .unwrap_or_default();
2624 let current_session_clean_container_count = task
2625 .get_aggregated_current_session_clean_container_count()
2626 .copied()
2627 .unwrap_or_default();
2628 let result = ComputeDirtyAndCleanUpdate {
2629 old_dirty_container_count: dirty_container_count,
2630 new_dirty_container_count: dirty_container_count,
2631 old_current_session_clean_container_count: current_session_clean_container_count,
2632 new_current_session_clean_container_count: current_session_clean_container_count,
2633 old_self_dirty,
2634 new_self_dirty,
2635 old_current_session_self_clean,
2636 new_current_session_self_clean,
2637 }
2638 .compute();
2639 if result.dirty_count_update - result.current_session_clean_update < 0 {
2640 if let Some(activeness_state) = task.get_activeness_mut() {
2642 activeness_state.all_clean_event.notify(usize::MAX);
2643 activeness_state.unset_active_until_clean();
2644 if activeness_state.is_empty() {
2645 task.take_activeness();
2646 }
2647 }
2648 }
2649 result
2650 .aggregated_update(task_id)
2651 .and_then(|aggregated_update| {
2652 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2653 })
2654 } else {
2655 None
2656 };
2657
2658 #[cfg(feature = "verify_determinism")]
2659 let reschedule =
2660 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2661 #[cfg(not(feature = "verify_determinism"))]
2662 let reschedule = false;
2663 if reschedule {
2664 let old = task.set_in_progress(InProgressState::Scheduled {
2665 done_event,
2666 reason: TaskExecutionReason::Stale,
2667 });
2668 debug_assert!(old.is_none(), "InProgress already exists");
2669 drop(task);
2670 } else {
2671 drop(task);
2672
2673 done_event.notify(usize::MAX);
2675 }
2676
2677 drop(old_content);
2678
2679 if let Some(data_update) = data_update {
2680 AggregationUpdateQueue::run(data_update, ctx);
2681 }
2682
2683 (reschedule, in_progress_cells)
2685 }
2686
2687 fn task_execution_completed_cleanup(
2688 &self,
2689 ctx: &mut impl ExecuteContext<'_>,
2690 task_id: TaskId,
2691 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2692 is_error: bool,
2693 ) -> Vec<SharedReference> {
2694 let mut task = ctx.task(task_id, TaskDataCategory::All);
2695 let mut removed_cell_data = Vec::new();
2696 if !is_error {
2700 let to_remove_persistent: Vec<_> = task
2707 .iter_persistent_cell_data()
2708 .filter_map(|(cell, _)| {
2709 cell_counters
2710 .get(&cell.type_id)
2711 .is_none_or(|start_index| cell.index >= *start_index)
2712 .then_some(*cell)
2713 })
2714 .collect();
2715
2716 let to_remove_transient: Vec<_> = task
2718 .iter_transient_cell_data()
2719 .filter_map(|(cell, _)| {
2720 cell_counters
2721 .get(&cell.type_id)
2722 .is_none_or(|start_index| cell.index >= *start_index)
2723 .then_some(*cell)
2724 })
2725 .collect();
2726 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2727 for cell in to_remove_persistent {
2728 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2729 removed_cell_data.push(data.into_untyped());
2730 }
2731 }
2732 for cell in to_remove_transient {
2733 if let Some(data) = task.remove_transient_cell_data(&cell) {
2734 removed_cell_data.push(data);
2735 }
2736 }
2737 }
2738
2739 task.cleanup_after_execution();
2743
2744 drop(task);
2745
2746 removed_cell_data
2748 }
2749
2750 fn run_backend_job<'a>(
2751 self: &'a Arc<Self>,
2752 job: TurboTasksBackendJob,
2753 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2754 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2755 Box::pin(async move {
2756 match job {
2757 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2758 debug_assert!(self.should_persist());
2759
2760 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2761 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2762 let mut idle_start_listener = self.idle_start_event.listen();
2763 let mut idle_end_listener = self.idle_end_event.listen();
2764 let mut fresh_idle = true;
2765 loop {
2766 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2767 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2768 let idle_timeout = *IDLE_TIMEOUT;
2769 let (time, mut reason) =
2770 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2771 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2772 } else {
2773 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2774 };
2775
2776 let until = last_snapshot + time;
2777 if until > Instant::now() {
2778 let mut stop_listener = self.stopping_event.listen();
2779 if self.stopping.load(Ordering::Acquire) {
2780 return;
2781 }
2782 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2783 Instant::now() + idle_timeout
2784 } else {
2785 far_future()
2786 };
2787 loop {
2788 tokio::select! {
2789 _ = &mut stop_listener => {
2790 return;
2791 },
2792 _ = &mut idle_start_listener => {
2793 fresh_idle = true;
2794 idle_time = Instant::now() + idle_timeout;
2795 idle_start_listener = self.idle_start_event.listen()
2796 },
2797 _ = &mut idle_end_listener => {
2798 idle_time = until + idle_timeout;
2799 idle_end_listener = self.idle_end_event.listen()
2800 },
2801 _ = tokio::time::sleep_until(until) => {
2802 break;
2803 },
2804 _ = tokio::time::sleep_until(idle_time) => {
2805 if turbo_tasks.is_idle() {
2806 reason = "idle timeout";
2807 break;
2808 }
2809 },
2810 }
2811 }
2812 }
2813
2814 let this = self.clone();
2815 let snapshot = this.snapshot_and_persist(None, reason, turbo_tasks);
2816 if let Some((snapshot_start, new_data)) = snapshot {
2817 last_snapshot = snapshot_start;
2818 if !new_data {
2819 fresh_idle = false;
2820 continue;
2821 }
2822 let last_snapshot = last_snapshot.duration_since(self.start_time);
2823 self.last_snapshot.store(
2824 last_snapshot.as_millis().try_into().unwrap(),
2825 Ordering::Relaxed,
2826 );
2827
2828 turbo_tasks.schedule_backend_background_job(
2829 TurboTasksBackendJob::FollowUpSnapshot,
2830 );
2831 return;
2832 }
2833 }
2834 }
2835 }
2836 })
2837 }
2838
2839 fn try_read_own_task_cell(
2840 &self,
2841 task_id: TaskId,
2842 cell: CellId,
2843 options: ReadCellOptions,
2844 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2845 ) -> Result<TypedCellContent> {
2846 let mut ctx = self.execute_context(turbo_tasks);
2847 let task = ctx.task(task_id, TaskDataCategory::Data);
2848 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2849 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2850 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2851 } else {
2852 Ok(CellContent(None).into_typed(cell.type_id))
2853 }
2854 }
2855
2856 fn read_task_collectibles(
2857 &self,
2858 task_id: TaskId,
2859 collectible_type: TraitTypeId,
2860 reader_id: Option<TaskId>,
2861 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2862 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2863 let mut ctx = self.execute_context(turbo_tasks);
2864 let mut collectibles = AutoMap::default();
2865 {
2866 let mut task = ctx.task(task_id, TaskDataCategory::All);
2867 loop {
2869 let aggregation_number = get_aggregation_number(&task);
2870 if is_root_node(aggregation_number) {
2871 break;
2872 }
2873 drop(task);
2874 AggregationUpdateQueue::run(
2875 AggregationUpdateJob::UpdateAggregationNumber {
2876 task_id,
2877 base_aggregation_number: u32::MAX,
2878 distance: None,
2879 },
2880 &mut ctx,
2881 );
2882 task = ctx.task(task_id, TaskDataCategory::All);
2883 }
2884 for (collectible, count) in task.iter_aggregated_collectibles() {
2885 if *count > 0 && collectible.collectible_type == collectible_type {
2886 *collectibles
2887 .entry(RawVc::TaskCell(
2888 collectible.cell.task,
2889 collectible.cell.cell,
2890 ))
2891 .or_insert(0) += 1;
2892 }
2893 }
2894 for (&collectible, &count) in task.iter_collectibles() {
2895 if collectible.collectible_type == collectible_type {
2896 *collectibles
2897 .entry(RawVc::TaskCell(
2898 collectible.cell.task,
2899 collectible.cell.cell,
2900 ))
2901 .or_insert(0) += count;
2902 }
2903 }
2904 if let Some(reader_id) = reader_id {
2905 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2906 }
2907 }
2908 if let Some(reader_id) = reader_id {
2909 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2910 let target = CollectiblesRef {
2911 task: task_id,
2912 collectible_type,
2913 };
2914 if !reader.remove_outdated_collectibles_dependencies(&target) {
2915 let _ = reader.add_collectibles_dependencies(target);
2916 }
2917 }
2918 collectibles
2919 }
2920
2921 fn emit_collectible(
2922 &self,
2923 collectible_type: TraitTypeId,
2924 collectible: RawVc,
2925 task_id: TaskId,
2926 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2927 ) {
2928 self.assert_valid_collectible(task_id, collectible);
2929
2930 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2931 panic!("Collectibles need to be resolved");
2932 };
2933 let cell = CellRef {
2934 task: collectible_task,
2935 cell,
2936 };
2937 operation::UpdateCollectibleOperation::run(
2938 task_id,
2939 CollectibleRef {
2940 collectible_type,
2941 cell,
2942 },
2943 1,
2944 self.execute_context(turbo_tasks),
2945 );
2946 }
2947
2948 fn unemit_collectible(
2949 &self,
2950 collectible_type: TraitTypeId,
2951 collectible: RawVc,
2952 count: u32,
2953 task_id: TaskId,
2954 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2955 ) {
2956 self.assert_valid_collectible(task_id, collectible);
2957
2958 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2959 panic!("Collectibles need to be resolved");
2960 };
2961 let cell = CellRef {
2962 task: collectible_task,
2963 cell,
2964 };
2965 operation::UpdateCollectibleOperation::run(
2966 task_id,
2967 CollectibleRef {
2968 collectible_type,
2969 cell,
2970 },
2971 -(i32::try_from(count).unwrap()),
2972 self.execute_context(turbo_tasks),
2973 );
2974 }
2975
2976 fn update_task_cell(
2977 &self,
2978 task_id: TaskId,
2979 cell: CellId,
2980 is_serializable_cell_content: bool,
2981 content: CellContent,
2982 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
2983 verification_mode: VerificationMode,
2984 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2985 ) {
2986 operation::UpdateCellOperation::run(
2987 task_id,
2988 cell,
2989 content,
2990 is_serializable_cell_content,
2991 updated_key_hashes,
2992 verification_mode,
2993 self.execute_context(turbo_tasks),
2994 );
2995 }
2996
2997 fn mark_own_task_as_session_dependent(
2998 &self,
2999 task_id: TaskId,
3000 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3001 ) {
3002 if !self.should_track_dependencies() {
3003 return;
3005 }
3006 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3007 let mut ctx = self.execute_context(turbo_tasks);
3008 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3009 let aggregation_number = get_aggregation_number(&task);
3010 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3011 drop(task);
3012 AggregationUpdateQueue::run(
3015 AggregationUpdateJob::UpdateAggregationNumber {
3016 task_id,
3017 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3018 distance: None,
3019 },
3020 &mut ctx,
3021 );
3022 task = ctx.task(task_id, TaskDataCategory::Meta);
3023 }
3024 if let Some(InProgressState::InProgress(box InProgressStateInner {
3025 session_dependent,
3026 ..
3027 })) = task.get_in_progress_mut()
3028 {
3029 *session_dependent = true;
3030 }
3031 }
3032
3033 fn mark_own_task_as_finished(
3034 &self,
3035 task: TaskId,
3036 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3037 ) {
3038 let mut ctx = self.execute_context(turbo_tasks);
3039 let mut task = ctx.task(task, TaskDataCategory::Data);
3040 if let Some(InProgressState::InProgress(box InProgressStateInner {
3041 marked_as_completed,
3042 ..
3043 })) = task.get_in_progress_mut()
3044 {
3045 *marked_as_completed = true;
3046 }
3051 }
3052
3053 fn connect_task(
3054 &self,
3055 task: TaskId,
3056 parent_task: Option<TaskId>,
3057 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3058 ) {
3059 self.assert_not_persistent_calling_transient(parent_task, task, None);
3060 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3061 }
3062
3063 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3064 let task_id = self.transient_task_id_factory.get();
3065 {
3066 let mut task = self.storage.access_mut(task_id);
3067 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3068 }
3069 #[cfg(feature = "verify_aggregation_graph")]
3070 self.root_tasks.lock().insert(task_id);
3071 task_id
3072 }
3073
3074 fn dispose_root_task(
3075 &self,
3076 task_id: TaskId,
3077 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3078 ) {
3079 #[cfg(feature = "verify_aggregation_graph")]
3080 self.root_tasks.lock().remove(&task_id);
3081
3082 let mut ctx = self.execute_context(turbo_tasks);
3083 let mut task = ctx.task(task_id, TaskDataCategory::All);
3084 let is_dirty = task.is_dirty();
3085 let has_dirty_containers = task.has_dirty_containers();
3086 if is_dirty.is_some() || has_dirty_containers {
3087 if let Some(activeness_state) = task.get_activeness_mut() {
3088 activeness_state.unset_root_type();
3090 activeness_state.set_active_until_clean();
3091 };
3092 } else if let Some(activeness_state) = task.take_activeness() {
3093 activeness_state.all_clean_event.notify(usize::MAX);
3096 }
3097 }
3098
3099 #[cfg(feature = "verify_aggregation_graph")]
3100 fn verify_aggregation_graph(
3101 &self,
3102 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3103 idle: bool,
3104 ) {
3105 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3106 return;
3107 }
3108 use std::{collections::VecDeque, env, io::stdout};
3109
3110 use crate::backend::operation::{get_uppers, is_aggregating_node};
3111
3112 let mut ctx = self.execute_context(turbo_tasks);
3113 let root_tasks = self.root_tasks.lock().clone();
3114
3115 for task_id in root_tasks.into_iter() {
3116 let mut queue = VecDeque::new();
3117 let mut visited = FxHashSet::default();
3118 let mut aggregated_nodes = FxHashSet::default();
3119 let mut collectibles = FxHashMap::default();
3120 let root_task_id = task_id;
3121 visited.insert(task_id);
3122 aggregated_nodes.insert(task_id);
3123 queue.push_back(task_id);
3124 let mut counter = 0;
3125 while let Some(task_id) = queue.pop_front() {
3126 counter += 1;
3127 if counter % 100000 == 0 {
3128 println!(
3129 "queue={}, visited={}, aggregated_nodes={}",
3130 queue.len(),
3131 visited.len(),
3132 aggregated_nodes.len()
3133 );
3134 }
3135 let task = ctx.task(task_id, TaskDataCategory::All);
3136 if idle && !self.is_idle.load(Ordering::Relaxed) {
3137 return;
3138 }
3139
3140 let uppers = get_uppers(&task);
3141 if task_id != root_task_id
3142 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3143 {
3144 panic!(
3145 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3146 {:?})",
3147 task_id,
3148 task.get_task_description(),
3149 uppers
3150 );
3151 }
3152
3153 for (collectible, _) in task.iter_aggregated_collectibles() {
3154 collectibles
3155 .entry(*collectible)
3156 .or_insert_with(|| (false, Vec::new()))
3157 .1
3158 .push(task_id);
3159 }
3160
3161 for (&collectible, &value) in task.iter_collectibles() {
3162 if value > 0 {
3163 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3164 *flag = true
3165 } else {
3166 panic!(
3167 "Task {} has a collectible {:?} that is not in any upper task",
3168 task_id, collectible
3169 );
3170 }
3171 }
3172 }
3173
3174 let is_dirty = task.has_dirty();
3175 let has_dirty_container = task.has_dirty_containers();
3176 let should_be_in_upper = is_dirty || has_dirty_container;
3177
3178 let aggregation_number = get_aggregation_number(&task);
3179 if is_aggregating_node(aggregation_number) {
3180 aggregated_nodes.insert(task_id);
3181 }
3182 for child_id in task.iter_children() {
3189 if visited.insert(child_id) {
3191 queue.push_back(child_id);
3192 }
3193 }
3194 drop(task);
3195
3196 if should_be_in_upper {
3197 for upper_id in uppers {
3198 let upper = ctx.task(upper_id, TaskDataCategory::All);
3199 let in_upper = upper
3200 .get_aggregated_dirty_containers(&task_id)
3201 .is_some_and(|&dirty| dirty > 0);
3202 if !in_upper {
3203 let containers: Vec<_> = upper
3204 .iter_aggregated_dirty_containers()
3205 .map(|(&k, &v)| (k, v))
3206 .collect();
3207 let upper_task_desc = upper.get_task_description();
3208 drop(upper);
3209 panic!(
3210 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3211 ({})\nThese dirty containers are present:\n{:#?}",
3212 task_id,
3213 ctx.task(task_id, TaskDataCategory::Data)
3214 .get_task_description(),
3215 upper_id,
3216 upper_task_desc,
3217 containers,
3218 );
3219 }
3220 }
3221 }
3222 }
3223
3224 for (collectible, (flag, task_ids)) in collectibles {
3225 if !flag {
3226 use std::io::Write;
3227 let mut stdout = stdout().lock();
3228 writeln!(
3229 stdout,
3230 "{:?} that is not emitted in any child task but in these aggregated \
3231 tasks: {:#?}",
3232 collectible,
3233 task_ids
3234 .iter()
3235 .map(|t| format!(
3236 "{t} {}",
3237 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3238 ))
3239 .collect::<Vec<_>>()
3240 )
3241 .unwrap();
3242
3243 let task_id = collectible.cell.task;
3244 let mut queue = {
3245 let task = ctx.task(task_id, TaskDataCategory::All);
3246 get_uppers(&task)
3247 };
3248 let mut visited = FxHashSet::default();
3249 for &upper_id in queue.iter() {
3250 visited.insert(upper_id);
3251 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3252 }
3253 while let Some(task_id) = queue.pop() {
3254 let task = ctx.task(task_id, TaskDataCategory::All);
3255 let desc = task.get_task_description();
3256 let aggregated_collectible = task
3257 .get_aggregated_collectibles(&collectible)
3258 .copied()
3259 .unwrap_or_default();
3260 let uppers = get_uppers(&task);
3261 drop(task);
3262 writeln!(
3263 stdout,
3264 "upper {task_id} {desc} collectible={aggregated_collectible}"
3265 )
3266 .unwrap();
3267 if task_ids.contains(&task_id) {
3268 writeln!(
3269 stdout,
3270 "Task has an upper connection to an aggregated task that doesn't \
3271 reference it. Upper connection is invalid!"
3272 )
3273 .unwrap();
3274 }
3275 for upper_id in uppers {
3276 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3277 if !visited.contains(&upper_id) {
3278 queue.push(upper_id);
3279 }
3280 }
3281 }
3282 panic!("See stdout for more details");
3283 }
3284 }
3285 }
3286 }
3287
3288 fn assert_not_persistent_calling_transient(
3289 &self,
3290 parent_id: Option<TaskId>,
3291 child_id: TaskId,
3292 cell_id: Option<CellId>,
3293 ) {
3294 if let Some(parent_id) = parent_id
3295 && !parent_id.is_transient()
3296 && child_id.is_transient()
3297 {
3298 self.panic_persistent_calling_transient(
3299 self.debug_get_task_description(parent_id),
3300 self.debug_get_cached_task_type(child_id).as_deref(),
3301 cell_id,
3302 );
3303 }
3304 }
3305
3306 fn panic_persistent_calling_transient(
3307 &self,
3308 parent: String,
3309 child: Option<&CachedTaskType>,
3310 cell_id: Option<CellId>,
3311 ) {
3312 let transient_reason = if let Some(child) = child {
3313 Cow::Owned(format!(
3314 " The callee is transient because it depends on:\n{}",
3315 self.debug_trace_transient_task(child, cell_id),
3316 ))
3317 } else {
3318 Cow::Borrowed("")
3319 };
3320 panic!(
3321 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3322 parent,
3323 child.map_or("unknown", |t| t.get_name()),
3324 transient_reason,
3325 );
3326 }
3327
3328 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3329 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3331 let task_info = if let Some(col_task_ty) = collectible
3333 .try_get_task_id()
3334 .map(|t| self.debug_get_task_description(t))
3335 {
3336 Cow::Owned(format!(" (return type of {col_task_ty})"))
3337 } else {
3338 Cow::Borrowed("")
3339 };
3340 panic!("Collectible{task_info} must be a ResolvedVc")
3341 };
3342 if col_task_id.is_transient() && !task_id.is_transient() {
3343 let transient_reason =
3344 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3345 Cow::Owned(format!(
3346 ". The collectible is transient because it depends on:\n{}",
3347 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3348 ))
3349 } else {
3350 Cow::Borrowed("")
3351 };
3352 panic!(
3354 "Collectible is transient, transient collectibles cannot be emitted from \
3355 persistent tasks{transient_reason}",
3356 )
3357 }
3358 }
3359}
3360
3361impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3362 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3363 self.0.startup(turbo_tasks);
3364 }
3365
3366 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3367 self.0.stopping();
3368 }
3369
3370 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3371 self.0.stop(turbo_tasks);
3372 }
3373
3374 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3375 self.0.idle_start(turbo_tasks);
3376 }
3377
3378 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3379 self.0.idle_end();
3380 }
3381
3382 fn get_or_create_persistent_task(
3383 &self,
3384 task_type: CachedTaskType,
3385 parent_task: Option<TaskId>,
3386 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3387 ) -> TaskId {
3388 self.0
3389 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3390 }
3391
3392 fn get_or_create_transient_task(
3393 &self,
3394 task_type: CachedTaskType,
3395 parent_task: Option<TaskId>,
3396 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3397 ) -> TaskId {
3398 self.0
3399 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3400 }
3401
3402 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3403 self.0.invalidate_task(task_id, turbo_tasks);
3404 }
3405
3406 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3407 self.0.invalidate_tasks(tasks, turbo_tasks);
3408 }
3409
3410 fn invalidate_tasks_set(
3411 &self,
3412 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3413 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3414 ) {
3415 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3416 }
3417
3418 fn invalidate_serialization(
3419 &self,
3420 task_id: TaskId,
3421 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3422 ) {
3423 self.0.invalidate_serialization(task_id, turbo_tasks);
3424 }
3425
3426 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3427 self.0.task_execution_canceled(task, turbo_tasks)
3428 }
3429
3430 fn try_start_task_execution(
3431 &self,
3432 task_id: TaskId,
3433 priority: TaskPriority,
3434 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3435 ) -> Option<TaskExecutionSpec<'_>> {
3436 self.0
3437 .try_start_task_execution(task_id, priority, turbo_tasks)
3438 }
3439
3440 fn task_execution_completed(
3441 &self,
3442 task_id: TaskId,
3443 result: Result<RawVc, TurboTasksExecutionError>,
3444 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3445 #[cfg(feature = "verify_determinism")] stateful: bool,
3446 has_invalidator: bool,
3447 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3448 ) -> bool {
3449 self.0.task_execution_completed(
3450 task_id,
3451 result,
3452 cell_counters,
3453 #[cfg(feature = "verify_determinism")]
3454 stateful,
3455 has_invalidator,
3456 turbo_tasks,
3457 )
3458 }
3459
3460 type BackendJob = TurboTasksBackendJob;
3461
3462 fn run_backend_job<'a>(
3463 &'a self,
3464 job: Self::BackendJob,
3465 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3466 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3467 self.0.run_backend_job(job, turbo_tasks)
3468 }
3469
3470 fn try_read_task_output(
3471 &self,
3472 task_id: TaskId,
3473 reader: Option<TaskId>,
3474 options: ReadOutputOptions,
3475 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3476 ) -> Result<Result<RawVc, EventListener>> {
3477 self.0
3478 .try_read_task_output(task_id, reader, options, turbo_tasks)
3479 }
3480
3481 fn try_read_task_cell(
3482 &self,
3483 task_id: TaskId,
3484 cell: CellId,
3485 reader: Option<TaskId>,
3486 options: ReadCellOptions,
3487 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3488 ) -> Result<Result<TypedCellContent, EventListener>> {
3489 self.0
3490 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3491 }
3492
3493 fn try_read_own_task_cell(
3494 &self,
3495 task_id: TaskId,
3496 cell: CellId,
3497 options: ReadCellOptions,
3498 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3499 ) -> Result<TypedCellContent> {
3500 self.0
3501 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3502 }
3503
3504 fn read_task_collectibles(
3505 &self,
3506 task_id: TaskId,
3507 collectible_type: TraitTypeId,
3508 reader: Option<TaskId>,
3509 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3510 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3511 self.0
3512 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3513 }
3514
3515 fn emit_collectible(
3516 &self,
3517 collectible_type: TraitTypeId,
3518 collectible: RawVc,
3519 task_id: TaskId,
3520 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3521 ) {
3522 self.0
3523 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3524 }
3525
3526 fn unemit_collectible(
3527 &self,
3528 collectible_type: TraitTypeId,
3529 collectible: RawVc,
3530 count: u32,
3531 task_id: TaskId,
3532 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3533 ) {
3534 self.0
3535 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3536 }
3537
3538 fn update_task_cell(
3539 &self,
3540 task_id: TaskId,
3541 cell: CellId,
3542 is_serializable_cell_content: bool,
3543 content: CellContent,
3544 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3545 verification_mode: VerificationMode,
3546 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3547 ) {
3548 self.0.update_task_cell(
3549 task_id,
3550 cell,
3551 is_serializable_cell_content,
3552 content,
3553 updated_key_hashes,
3554 verification_mode,
3555 turbo_tasks,
3556 );
3557 }
3558
3559 fn mark_own_task_as_finished(
3560 &self,
3561 task_id: TaskId,
3562 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3563 ) {
3564 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3565 }
3566
3567 fn mark_own_task_as_session_dependent(
3568 &self,
3569 task: TaskId,
3570 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3571 ) {
3572 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3573 }
3574
3575 fn connect_task(
3576 &self,
3577 task: TaskId,
3578 parent_task: Option<TaskId>,
3579 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3580 ) {
3581 self.0.connect_task(task, parent_task, turbo_tasks);
3582 }
3583
3584 fn create_transient_task(
3585 &self,
3586 task_type: TransientTaskType,
3587 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3588 ) -> TaskId {
3589 self.0.create_transient_task(task_type)
3590 }
3591
3592 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3593 self.0.dispose_root_task(task_id, turbo_tasks);
3594 }
3595
3596 fn task_statistics(&self) -> &TaskStatisticsApi {
3597 &self.0.task_statistics
3598 }
3599
3600 fn is_tracking_dependencies(&self) -> bool {
3601 self.0.options.dependency_tracking
3602 }
3603
3604 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3605 self.0.get_task_name(task, turbo_tasks)
3606 }
3607}
3608
3609enum DebugTraceTransientTask {
3610 Cached {
3611 task_name: &'static str,
3612 cell_type_id: Option<ValueTypeId>,
3613 cause_self: Option<Box<DebugTraceTransientTask>>,
3614 cause_args: Vec<DebugTraceTransientTask>,
3615 },
3616 Collapsed {
3618 task_name: &'static str,
3619 cell_type_id: Option<ValueTypeId>,
3620 },
3621 Uncached {
3622 cell_type_id: Option<ValueTypeId>,
3623 },
3624}
3625
3626impl DebugTraceTransientTask {
3627 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3628 let indent = " ".repeat(level);
3629 f.write_str(&indent)?;
3630
3631 fn fmt_cell_type_id(
3632 f: &mut fmt::Formatter<'_>,
3633 cell_type_id: Option<ValueTypeId>,
3634 ) -> fmt::Result {
3635 if let Some(ty) = cell_type_id {
3636 write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3637 } else {
3638 Ok(())
3639 }
3640 }
3641
3642 match self {
3644 Self::Cached {
3645 task_name,
3646 cell_type_id,
3647 ..
3648 }
3649 | Self::Collapsed {
3650 task_name,
3651 cell_type_id,
3652 ..
3653 } => {
3654 f.write_str(task_name)?;
3655 fmt_cell_type_id(f, *cell_type_id)?;
3656 if matches!(self, Self::Collapsed { .. }) {
3657 f.write_str(" (collapsed)")?;
3658 }
3659 }
3660 Self::Uncached { cell_type_id } => {
3661 f.write_str("unknown transient task")?;
3662 fmt_cell_type_id(f, *cell_type_id)?;
3663 }
3664 }
3665 f.write_char('\n')?;
3666
3667 if let Self::Cached {
3669 cause_self,
3670 cause_args,
3671 ..
3672 } = self
3673 {
3674 if let Some(c) = cause_self {
3675 writeln!(f, "{indent} self:")?;
3676 c.fmt_indented(f, level + 1)?;
3677 }
3678 if !cause_args.is_empty() {
3679 writeln!(f, "{indent} args:")?;
3680 for c in cause_args {
3681 c.fmt_indented(f, level + 1)?;
3682 }
3683 }
3684 }
3685 Ok(())
3686 }
3687}
3688
3689impl fmt::Display for DebugTraceTransientTask {
3690 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3691 self.fmt_indented(f, 0)
3692 }
3693}
3694
3695fn far_future() -> Instant {
3697 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3702}
3703
3704fn encode_task_data(
3709 task: TaskId,
3710 data: &TaskStorage,
3711 category: SpecificTaskDataCategory,
3712 scratch_buffer: &mut TurboBincodeBuffer,
3713) -> Result<TurboBincodeBuffer> {
3714 scratch_buffer.clear();
3715 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3716 data.encode(category, &mut encoder)?;
3717
3718 if cfg!(feature = "verify_serialization") {
3719 TaskStorage::new()
3720 .decode(
3721 category,
3722 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3723 )
3724 .with_context(|| {
3725 format!(
3726 "expected to be able to decode serialized data for '{category:?}' information \
3727 for {task}"
3728 )
3729 })?;
3730 }
3731 Ok(SmallVec::from_slice(scratch_buffer))
3732}