1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17 time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33 backend::{
34 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
35 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37 VerificationMode,
38 },
39 event::{Event, EventDescription, EventListener},
40 message_queue::{TimingEvent, TraceEvent},
41 registry::get_value_type,
42 scope::scope_and_block,
43 task_statistics::TaskStatisticsApi,
44 trace::TraceRawVcs,
45 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49 operation::AnyOperation,
50 storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55 backend::{
56 operation::{
57 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58 CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59 ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60 TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61 is_root_node, make_task_dirty_internal, prepare_new_children,
62 },
63 storage::Storage,
64 storage_schema::{TaskStorage, TaskStorageAccessors},
65 },
66 backing_storage::{BackingStorage, SnapshotItem},
67 data::{
68 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69 InProgressState, InProgressStateInner, OutputValue, TransientTask,
70 },
71 error::TaskError,
72 utils::{
73 arc_or_owned::ArcOrOwned,
74 chunked_vec::ChunkedVec,
75 dash_map_drop_contents::drop_contents,
76 dash_map_raw_entry::{RawEntry, raw_entry},
77 ptr_eq_arc::PtrEqArc,
78 shard_amount::compute_shard_amount,
79 sharded::Sharded,
80 swap_retain,
81 },
82};
83
84const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
88
89const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
90
91static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
94 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
95 .ok()
96 .and_then(|v| v.parse::<u64>().ok())
97 .map(Duration::from_millis)
98 .unwrap_or(Duration::from_secs(2))
99});
100
101struct SnapshotRequest {
102 snapshot_requested: bool,
103 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
104}
105
106impl SnapshotRequest {
107 fn new() -> Self {
108 Self {
109 snapshot_requested: false,
110 suspended_operations: FxHashSet::default(),
111 }
112 }
113}
114
115pub enum StorageMode {
116 ReadOnly,
118 ReadWrite,
121 ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127 pub dependency_tracking: bool,
132
133 pub active_tracking: bool,
139
140 pub storage_mode: Option<StorageMode>,
142
143 pub num_workers: Option<usize>,
146
147 pub small_preallocation: bool,
149}
150
151impl Default for BackendOptions {
152 fn default() -> Self {
153 Self {
154 dependency_tracking: true,
155 active_tracking: true,
156 storage_mode: Some(StorageMode::ReadWrite),
157 num_workers: None,
158 small_preallocation: false,
159 }
160 }
161}
162
163pub enum TurboTasksBackendJob {
164 InitialSnapshot,
165 FollowUpSnapshot,
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173 options: BackendOptions,
174
175 start_time: Instant,
176
177 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179
180 persisted_task_cache_log: Option<TaskCacheLog>,
181 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182
183 storage: Storage,
184
185 local_is_partial: bool,
188
189 in_progress_operations: AtomicUsize,
195
196 snapshot_request: Mutex<SnapshotRequest>,
197 operations_suspended: Condvar,
201 snapshot_completed: Condvar,
204 last_snapshot: AtomicU64,
206
207 stopping: AtomicBool,
208 stopping_event: Event,
209 idle_start_event: Event,
210 idle_end_event: Event,
211 #[cfg(feature = "verify_aggregation_graph")]
212 is_idle: AtomicBool,
213
214 task_statistics: TaskStatisticsApi,
215
216 backing_storage: B,
217
218 #[cfg(feature = "verify_aggregation_graph")]
219 root_tasks: Mutex<FxHashSet<TaskId>>,
220}
221
222impl<B: BackingStorage> TurboTasksBackend<B> {
223 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
224 Self(Arc::new(TurboTasksBackendInner::new(
225 options,
226 backing_storage,
227 )))
228 }
229
230 pub fn backing_storage(&self) -> &B {
231 &self.0.backing_storage
232 }
233}
234
235impl<B: BackingStorage> TurboTasksBackendInner<B> {
236 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
237 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
238 let need_log = matches!(
239 options.storage_mode,
240 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
241 );
242 if !options.dependency_tracking {
243 options.active_tracking = false;
244 }
245 let small_preallocation = options.small_preallocation;
246 let next_task_id = backing_storage
247 .next_free_task_id()
248 .expect("Failed to get task id");
249 Self {
250 options,
251 start_time: Instant::now(),
252 persisted_task_id_factory: IdFactoryWithReuse::new(
253 next_task_id,
254 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
255 ),
256 transient_task_id_factory: IdFactoryWithReuse::new(
257 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
258 TaskId::MAX,
259 ),
260 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
261 task_cache: FxDashMap::default(),
262 local_is_partial: next_task_id != TaskId::MIN,
263 storage: Storage::new(shard_amount, small_preallocation),
264 in_progress_operations: AtomicUsize::new(0),
265 snapshot_request: Mutex::new(SnapshotRequest::new()),
266 operations_suspended: Condvar::new(),
267 snapshot_completed: Condvar::new(),
268 last_snapshot: AtomicU64::new(0),
269 stopping: AtomicBool::new(false),
270 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
271 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
272 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
273 #[cfg(feature = "verify_aggregation_graph")]
274 is_idle: AtomicBool::new(false),
275 task_statistics: TaskStatisticsApi::default(),
276 backing_storage,
277 #[cfg(feature = "verify_aggregation_graph")]
278 root_tasks: Default::default(),
279 }
280 }
281
282 fn execute_context<'a>(
283 &'a self,
284 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
285 ) -> impl ExecuteContext<'a> {
286 ExecuteContextImpl::new(self, turbo_tasks)
287 }
288
289 fn suspending_requested(&self) -> bool {
290 self.should_persist()
291 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
292 }
293
294 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
295 #[cold]
296 fn operation_suspend_point_cold<B: BackingStorage>(
297 this: &TurboTasksBackendInner<B>,
298 suspend: impl FnOnce() -> AnyOperation,
299 ) {
300 let operation = Arc::new(suspend());
301 let mut snapshot_request = this.snapshot_request.lock();
302 if snapshot_request.snapshot_requested {
303 snapshot_request
304 .suspended_operations
305 .insert(operation.clone().into());
306 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
307 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
308 if value == SNAPSHOT_REQUESTED_BIT {
309 this.operations_suspended.notify_all();
310 }
311 this.snapshot_completed
312 .wait_while(&mut snapshot_request, |snapshot_request| {
313 snapshot_request.snapshot_requested
314 });
315 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316 snapshot_request
317 .suspended_operations
318 .remove(&operation.into());
319 }
320 }
321
322 if self.suspending_requested() {
323 operation_suspend_point_cold(self, suspend);
324 }
325 }
326
327 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
328 if !self.should_persist() {
329 return OperationGuard { backend: None };
330 }
331 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
332 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
333 let mut snapshot_request = self.snapshot_request.lock();
334 if snapshot_request.snapshot_requested {
335 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
336 if value == SNAPSHOT_REQUESTED_BIT {
337 self.operations_suspended.notify_all();
338 }
339 self.snapshot_completed
340 .wait_while(&mut snapshot_request, |snapshot_request| {
341 snapshot_request.snapshot_requested
342 });
343 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
344 }
345 }
346 OperationGuard {
347 backend: Some(self),
348 }
349 }
350
351 fn should_persist(&self) -> bool {
352 matches!(
353 self.options.storage_mode,
354 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
355 )
356 }
357
358 fn should_restore(&self) -> bool {
359 self.options.storage_mode.is_some()
360 }
361
362 fn should_track_dependencies(&self) -> bool {
363 self.options.dependency_tracking
364 }
365
366 fn set_initial_aggregation_number(
372 &self,
373 task_id: TaskId,
374 is_root: bool,
375 is_session_dependent: bool,
376 ctx: &mut impl ExecuteContext<'_>,
377 ) {
378 let base_aggregation_number = if is_root {
379 u32::MAX
380 } else if is_session_dependent && self.should_track_dependencies() {
381 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
382 SESSION_DEPENDENT_AGGREGATION_NUMBER
383 } else {
384 return;
385 };
386
387 AggregationUpdateQueue::run(
388 AggregationUpdateJob::UpdateAggregationNumber {
389 task_id,
390 base_aggregation_number,
391 distance: None,
392 },
393 ctx,
394 );
395 }
396
397 fn should_track_activeness(&self) -> bool {
398 self.options.active_tracking
399 }
400
401 fn track_cache_hit(&self, task_type: &CachedTaskType) {
402 self.task_statistics
403 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
404 }
405
406 fn track_cache_miss(&self, task_type: &CachedTaskType) {
407 self.task_statistics
408 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
409 }
410
411 fn task_error_to_turbo_tasks_execution_error(
416 &self,
417 error: &TaskError,
418 ctx: &mut impl ExecuteContext<'_>,
419 ) -> TurboTasksExecutionError {
420 match error {
421 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
422 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
423 message: item.message.clone(),
424 source: item
425 .source
426 .as_ref()
427 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
428 })),
429 TaskError::LocalTaskContext(local_task_context) => {
430 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
431 name: local_task_context.name.clone(),
432 source: local_task_context
433 .source
434 .as_ref()
435 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
436 }))
437 }
438 TaskError::TaskChain(chain) => {
439 let task_id = chain.last().unwrap();
440 let error = {
441 let task = ctx.task(*task_id, TaskDataCategory::Meta);
442 if let Some(OutputValue::Error(error)) = task.get_output() {
443 Some(error.clone())
444 } else {
445 None
446 }
447 };
448 let error = error.map_or_else(
449 || {
450 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
452 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
453 "Error no longer available",
454 )),
455 location: None,
456 }))
457 },
458 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
459 );
460 let mut current_error = error;
461 for &task_id in chain.iter().rev() {
462 current_error =
463 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
464 task_id,
465 source: Some(current_error),
466 turbo_tasks: ctx.turbo_tasks(),
467 }));
468 }
469 current_error
470 }
471 }
472 }
473}
474
475pub(crate) struct OperationGuard<'a, B: BackingStorage> {
476 backend: Option<&'a TurboTasksBackendInner<B>>,
477}
478
479impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
480 fn drop(&mut self) {
481 if let Some(backend) = self.backend {
482 let fetch_sub = backend
483 .in_progress_operations
484 .fetch_sub(1, Ordering::AcqRel);
485 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
486 backend.operations_suspended.notify_all();
487 }
488 }
489 }
490}
491
492struct TaskExecutionCompletePrepareResult {
494 pub new_children: FxHashSet<TaskId>,
495 pub is_now_immutable: bool,
496 #[cfg(feature = "verify_determinism")]
497 pub no_output_set: bool,
498 pub new_output: Option<OutputValue>,
499 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
500}
501
502impl<B: BackingStorage> TurboTasksBackendInner<B> {
504 fn try_read_task_output(
505 self: &Arc<Self>,
506 task_id: TaskId,
507 reader: Option<TaskId>,
508 options: ReadOutputOptions,
509 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
510 ) -> Result<Result<RawVc, EventListener>> {
511 self.assert_not_persistent_calling_transient(reader, task_id, None);
512
513 let mut ctx = self.execute_context(turbo_tasks);
514 let need_reader_task = if self.should_track_dependencies()
515 && !matches!(options.tracking, ReadTracking::Untracked)
516 && reader.is_some_and(|reader_id| reader_id != task_id)
517 && let Some(reader_id) = reader
518 && reader_id != task_id
519 {
520 Some(reader_id)
521 } else {
522 None
523 };
524 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
525 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
529 (task, Some(reader))
530 } else {
531 (ctx.task(task_id, TaskDataCategory::All), None)
532 };
533
534 fn listen_to_done_event(
535 reader_description: Option<EventDescription>,
536 tracking: ReadTracking,
537 done_event: &Event,
538 ) -> EventListener {
539 done_event.listen_with_note(move || {
540 move || {
541 if let Some(reader_description) = reader_description.as_ref() {
542 format!(
543 "try_read_task_output from {} ({})",
544 reader_description, tracking
545 )
546 } else {
547 format!("try_read_task_output ({})", tracking)
548 }
549 }
550 })
551 }
552
553 fn check_in_progress(
554 task: &impl TaskGuard,
555 reader_description: Option<EventDescription>,
556 tracking: ReadTracking,
557 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
558 {
559 match task.get_in_progress() {
560 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
561 listen_to_done_event(reader_description, tracking, done_event),
562 ))),
563 Some(InProgressState::InProgress(box InProgressStateInner {
564 done_event, ..
565 })) => Some(Ok(Err(listen_to_done_event(
566 reader_description,
567 tracking,
568 done_event,
569 )))),
570 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
571 "{} was canceled",
572 task.get_task_description()
573 ))),
574 None => None,
575 }
576 }
577
578 if matches!(options.consistency, ReadConsistency::Strong) {
579 loop {
581 let aggregation_number = get_aggregation_number(&task);
582 if is_root_node(aggregation_number) {
583 break;
584 }
585 drop(task);
586 drop(reader_task);
587 {
588 let _span = tracing::trace_span!(
589 "make root node for strongly consistent read",
590 %task_id
591 )
592 .entered();
593 AggregationUpdateQueue::run(
594 AggregationUpdateJob::UpdateAggregationNumber {
595 task_id,
596 base_aggregation_number: u32::MAX,
597 distance: None,
598 },
599 &mut ctx,
600 );
601 }
602 (task, reader_task) = if let Some(reader_id) = need_reader_task {
603 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
605 (task, Some(reader))
606 } else {
607 (ctx.task(task_id, TaskDataCategory::All), None)
608 }
609 }
610
611 let is_dirty = task.is_dirty();
612
613 let has_dirty_containers = task.has_dirty_containers();
615 if has_dirty_containers || is_dirty.is_some() {
616 let activeness = task.get_activeness_mut();
617 let mut task_ids_to_schedule: Vec<_> = Vec::new();
618 let activeness = if let Some(activeness) = activeness {
620 activeness.set_active_until_clean();
624 activeness
625 } else {
626 if ctx.should_track_activeness() {
630 task_ids_to_schedule = task.dirty_containers().collect();
632 task_ids_to_schedule.push(task_id);
633 }
634 let activeness =
635 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
636 activeness.set_active_until_clean();
637 activeness
638 };
639 let listener = activeness.all_clean_event.listen_with_note(move || {
640 let this = self.clone();
641 let tt = turbo_tasks.pin();
642 move || {
643 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
644 let mut ctx = this.execute_context(tt);
645 let mut visited = FxHashSet::default();
646 fn indent(s: &str) -> String {
647 s.split_inclusive('\n')
648 .flat_map(|line: &str| [" ", line].into_iter())
649 .collect::<String>()
650 }
651 fn get_info(
652 ctx: &mut impl ExecuteContext<'_>,
653 task_id: TaskId,
654 parent_and_count: Option<(TaskId, i32)>,
655 visited: &mut FxHashSet<TaskId>,
656 ) -> String {
657 let task = ctx.task(task_id, TaskDataCategory::All);
658 let is_dirty = task.is_dirty();
659 let in_progress =
660 task.get_in_progress()
661 .map_or("not in progress", |p| match p {
662 InProgressState::InProgress(_) => "in progress",
663 InProgressState::Scheduled { .. } => "scheduled",
664 InProgressState::Canceled => "canceled",
665 });
666 let activeness = task.get_activeness().map_or_else(
667 || "not active".to_string(),
668 |activeness| format!("{activeness:?}"),
669 );
670 let aggregation_number = get_aggregation_number(&task);
671 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
672 {
673 let uppers = get_uppers(&task);
674 !uppers.contains(&parent_task_id)
675 } else {
676 false
677 };
678
679 let has_dirty_containers = task.has_dirty_containers();
681
682 let task_description = task.get_task_description();
683 let is_dirty_label = if let Some(parent_priority) = is_dirty {
684 format!(", dirty({parent_priority})")
685 } else {
686 String::new()
687 };
688 let has_dirty_containers_label = if has_dirty_containers {
689 ", dirty containers"
690 } else {
691 ""
692 };
693 let count = if let Some((_, count)) = parent_and_count {
694 format!(" {count}")
695 } else {
696 String::new()
697 };
698 let mut info = format!(
699 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
700 {in_progress}, \
701 {activeness}{is_dirty_label}{has_dirty_containers_label})",
702 );
703 let children: Vec<_> = task.dirty_containers_with_count().collect();
704 drop(task);
705
706 if missing_upper {
707 info.push_str("\n ERROR: missing upper connection");
708 }
709
710 if has_dirty_containers || !children.is_empty() {
711 writeln!(info, "\n dirty tasks:").unwrap();
712
713 for (child_task_id, count) in children {
714 let task_description = ctx
715 .task(child_task_id, TaskDataCategory::Data)
716 .get_task_description();
717 if visited.insert(child_task_id) {
718 let child_info = get_info(
719 ctx,
720 child_task_id,
721 Some((task_id, count)),
722 visited,
723 );
724 info.push_str(&indent(&child_info));
725 if !info.ends_with('\n') {
726 info.push('\n');
727 }
728 } else {
729 writeln!(
730 info,
731 " {child_task_id} {task_description} {count} \
732 (already visited)"
733 )
734 .unwrap();
735 }
736 }
737 }
738 info
739 }
740 let info = get_info(&mut ctx, task_id, None, &mut visited);
741 format!(
742 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
743 )
744 }
745 });
746 drop(reader_task);
747 drop(task);
748 if !task_ids_to_schedule.is_empty() {
749 let mut queue = AggregationUpdateQueue::new();
750 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
751 queue.execute(&mut ctx);
752 }
753
754 return Ok(Err(listener));
755 }
756 }
757
758 let reader_description = reader_task
759 .as_ref()
760 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
761 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
762 {
763 return value;
764 }
765
766 if let Some(output) = task.get_output() {
767 let result = match output {
768 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
769 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
770 OutputValue::Error(error) => Err(error.clone()),
771 };
772 if let Some(mut reader_task) = reader_task.take()
773 && options.tracking.should_track(result.is_err())
774 && (!task.immutable() || cfg!(feature = "verify_immutable"))
775 {
776 #[cfg(feature = "trace_task_output_dependencies")]
777 let _span = tracing::trace_span!(
778 "add output dependency",
779 task = %task_id,
780 dependent_task = ?reader
781 )
782 .entered();
783 let mut queue = LeafDistanceUpdateQueue::new();
784 let reader = reader.unwrap();
785 if task.add_output_dependent(reader) {
786 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
788 let reader_leaf_distance =
789 reader_task.get_leaf_distance().copied().unwrap_or_default();
790 if reader_leaf_distance.distance <= leaf_distance.distance {
791 queue.push(
792 reader,
793 leaf_distance.distance,
794 leaf_distance.max_distance_in_buffer,
795 );
796 }
797 }
798
799 drop(task);
800
801 if !reader_task.remove_outdated_output_dependencies(&task_id) {
807 let _ = reader_task.add_output_dependencies(task_id);
808 }
809 drop(reader_task);
810
811 queue.execute(&mut ctx);
812 } else {
813 drop(task);
814 }
815
816 return result.map_err(|error| {
817 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
818 .with_task_context(task_id, turbo_tasks.pin())
819 .into()
820 });
821 }
822 drop(reader_task);
823
824 let note = EventDescription::new(|| {
825 move || {
826 if let Some(reader) = reader_description.as_ref() {
827 format!("try_read_task_output (recompute) from {reader}",)
828 } else {
829 "try_read_task_output (recompute, untracked)".to_string()
830 }
831 }
832 });
833
834 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
836 TaskExecutionReason::OutputNotAvailable,
837 EventDescription::new(|| task.get_task_desc_fn()),
838 note,
839 );
840
841 let old = task.set_in_progress(in_progress_state);
844 debug_assert!(old.is_none(), "InProgress already exists");
845 ctx.schedule_task(task, TaskPriority::Initial);
846
847 Ok(Err(listener))
848 }
849
850 fn try_read_task_cell(
851 &self,
852 task_id: TaskId,
853 reader: Option<TaskId>,
854 cell: CellId,
855 options: ReadCellOptions,
856 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
857 ) -> Result<Result<TypedCellContent, EventListener>> {
858 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
859
860 fn add_cell_dependency(
861 task_id: TaskId,
862 mut task: impl TaskGuard,
863 reader: Option<TaskId>,
864 reader_task: Option<impl TaskGuard>,
865 cell: CellId,
866 key: Option<u64>,
867 ) {
868 if let Some(mut reader_task) = reader_task
869 && (!task.immutable() || cfg!(feature = "verify_immutable"))
870 {
871 let reader = reader.unwrap();
872 let _ = task.add_cell_dependents((cell, key, reader));
873 drop(task);
874
875 let target = CellRef {
881 task: task_id,
882 cell,
883 };
884 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
885 let _ = reader_task.add_cell_dependencies((target, key));
886 }
887 drop(reader_task);
888 }
889 }
890
891 let ReadCellOptions {
892 is_serializable_cell_content,
893 tracking,
894 final_read_hint,
895 } = options;
896
897 let mut ctx = self.execute_context(turbo_tasks);
898 let (mut task, reader_task) = if self.should_track_dependencies()
899 && !matches!(tracking, ReadCellTracking::Untracked)
900 && let Some(reader_id) = reader
901 && reader_id != task_id
902 {
903 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
907 (task, Some(reader))
908 } else {
909 (ctx.task(task_id, TaskDataCategory::All), None)
910 };
911
912 let content = if final_read_hint {
913 task.remove_cell_data(is_serializable_cell_content, cell)
914 } else {
915 task.get_cell_data(is_serializable_cell_content, cell)
916 };
917 if let Some(content) = content {
918 if tracking.should_track(false) {
919 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
920 }
921 return Ok(Ok(TypedCellContent(
922 cell.type_id,
923 CellContent(Some(content.reference)),
924 )));
925 }
926
927 let in_progress = task.get_in_progress();
928 if matches!(
929 in_progress,
930 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
931 ) {
932 return Ok(Err(self
933 .listen_to_cell(&mut task, task_id, &reader_task, cell)
934 .0));
935 }
936 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
937
938 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
940 let Some(max_id) = max_id else {
941 let task_desc = task.get_task_description();
942 if tracking.should_track(true) {
943 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
944 }
945 bail!(
946 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
947 );
948 };
949 if cell.index >= max_id {
950 let task_desc = task.get_task_description();
951 if tracking.should_track(true) {
952 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
953 }
954 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
955 }
956
957 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
962 drop(reader_task);
963 if !new_listener {
964 return Ok(Err(listener));
965 }
966
967 let _span = tracing::trace_span!(
968 "recomputation",
969 cell_type = get_value_type(cell.type_id).ty.global_name,
970 cell_index = cell.index
971 )
972 .entered();
973
974 if is_cancelled {
976 bail!("{} was canceled", task.get_task_description());
977 }
978
979 let _ = task.add_scheduled(
980 TaskExecutionReason::CellNotAvailable,
981 EventDescription::new(|| task.get_task_desc_fn()),
982 );
983 ctx.schedule_task(task, TaskPriority::Initial);
984
985 Ok(Err(listener))
986 }
987
988 fn listen_to_cell(
989 &self,
990 task: &mut impl TaskGuard,
991 task_id: TaskId,
992 reader_task: &Option<impl TaskGuard>,
993 cell: CellId,
994 ) -> (EventListener, bool) {
995 let note = || {
996 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
997 move || {
998 if let Some(reader_desc) = reader_desc.as_ref() {
999 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
1000 } else {
1001 "try_read_task_cell (in progress, untracked)".to_string()
1002 }
1003 }
1004 };
1005 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
1006 let listener = in_progress.event.listen_with_note(note);
1008 return (listener, false);
1009 }
1010 let in_progress = InProgressCellState::new(task_id, cell);
1011 let listener = in_progress.event.listen_with_note(note);
1012 let old = task.insert_in_progress_cells(cell, in_progress);
1013 debug_assert!(old.is_none(), "InProgressCell already exists");
1014 (listener, true)
1015 }
1016
1017 fn snapshot_and_persist(
1018 &self,
1019 parent_span: Option<tracing::Id>,
1020 reason: &str,
1021 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1022 ) -> Option<(Instant, bool)> {
1023 let snapshot_span =
1024 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1025 .entered();
1026 let start = Instant::now();
1027 let wall_start = SystemTime::now();
1031 debug_assert!(self.should_persist());
1032
1033 let suspended_operations;
1034 {
1035 let _span = tracing::info_span!("blocking").entered();
1036 let mut snapshot_request = self.snapshot_request.lock();
1037 snapshot_request.snapshot_requested = true;
1038 let active_operations = self
1039 .in_progress_operations
1040 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1041 if active_operations != 0 {
1042 self.operations_suspended
1043 .wait_while(&mut snapshot_request, |_| {
1044 self.in_progress_operations.load(Ordering::Relaxed)
1045 != SNAPSHOT_REQUESTED_BIT
1046 });
1047 }
1048 suspended_operations = snapshot_request
1049 .suspended_operations
1050 .iter()
1051 .map(|op| op.arc().clone())
1052 .collect::<Vec<_>>();
1053 }
1054 self.storage.start_snapshot();
1055 let mut persisted_task_cache_log = self
1056 .persisted_task_cache_log
1057 .as_ref()
1058 .map(|l| l.take(|i| i))
1059 .unwrap_or_default();
1060 let mut snapshot_request = self.snapshot_request.lock();
1061 snapshot_request.snapshot_requested = false;
1062 self.in_progress_operations
1063 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1064 self.snapshot_completed.notify_all();
1065 let snapshot_time = Instant::now();
1066 drop(snapshot_request);
1067
1068 #[cfg(feature = "print_cache_item_size")]
1069 #[derive(Default)]
1070 struct TaskCacheStats {
1071 data: usize,
1072 #[cfg(feature = "print_cache_item_size_with_compressed")]
1073 data_compressed: usize,
1074 data_count: usize,
1075 meta: usize,
1076 #[cfg(feature = "print_cache_item_size_with_compressed")]
1077 meta_compressed: usize,
1078 meta_count: usize,
1079 upper_count: usize,
1080 collectibles_count: usize,
1081 aggregated_collectibles_count: usize,
1082 children_count: usize,
1083 followers_count: usize,
1084 collectibles_dependents_count: usize,
1085 aggregated_dirty_containers_count: usize,
1086 output_size: usize,
1087 }
1088 #[cfg(feature = "print_cache_item_size")]
1091 struct FormatSizes {
1092 size: usize,
1093 #[cfg(feature = "print_cache_item_size_with_compressed")]
1094 compressed_size: usize,
1095 }
1096 #[cfg(feature = "print_cache_item_size")]
1097 impl std::fmt::Display for FormatSizes {
1098 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1099 use turbo_tasks::util::FormatBytes;
1100 #[cfg(feature = "print_cache_item_size_with_compressed")]
1101 {
1102 write!(
1103 f,
1104 "{} ({} compressed)",
1105 FormatBytes(self.size),
1106 FormatBytes(self.compressed_size)
1107 )
1108 }
1109 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1110 {
1111 write!(f, "{}", FormatBytes(self.size))
1112 }
1113 }
1114 }
1115 #[cfg(feature = "print_cache_item_size")]
1116 impl TaskCacheStats {
1117 #[cfg(feature = "print_cache_item_size_with_compressed")]
1118 fn compressed_size(data: &[u8]) -> Result<usize> {
1119 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1120 data,
1121 &mut Vec::new(),
1122 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1123 )?)
1124 }
1125
1126 fn add_data(&mut self, data: &[u8]) {
1127 self.data += data.len();
1128 #[cfg(feature = "print_cache_item_size_with_compressed")]
1129 {
1130 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1131 }
1132 self.data_count += 1;
1133 }
1134
1135 fn add_meta(&mut self, data: &[u8]) {
1136 self.meta += data.len();
1137 #[cfg(feature = "print_cache_item_size_with_compressed")]
1138 {
1139 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1140 }
1141 self.meta_count += 1;
1142 }
1143
1144 fn add_counts(&mut self, storage: &TaskStorage) {
1145 let counts = storage.meta_counts();
1146 self.upper_count += counts.upper;
1147 self.collectibles_count += counts.collectibles;
1148 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1149 self.children_count += counts.children;
1150 self.followers_count += counts.followers;
1151 self.collectibles_dependents_count += counts.collectibles_dependents;
1152 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1153 if let Some(output) = storage.get_output() {
1154 use turbo_bincode::turbo_bincode_encode;
1155
1156 self.output_size += turbo_bincode_encode(&output)
1157 .map(|data| data.len())
1158 .unwrap_or(0);
1159 }
1160 }
1161
1162 fn task_name(storage: &TaskStorage) -> String {
1164 storage
1165 .get_persistent_task_type()
1166 .map(|t| t.to_string())
1167 .unwrap_or_else(|| "<unknown>".to_string())
1168 }
1169
1170 fn sort_key(&self) -> usize {
1173 #[cfg(feature = "print_cache_item_size_with_compressed")]
1174 {
1175 self.data_compressed + self.meta_compressed
1176 }
1177 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1178 {
1179 self.data + self.meta
1180 }
1181 }
1182
1183 fn format_total(&self) -> FormatSizes {
1184 FormatSizes {
1185 size: self.data + self.meta,
1186 #[cfg(feature = "print_cache_item_size_with_compressed")]
1187 compressed_size: self.data_compressed + self.meta_compressed,
1188 }
1189 }
1190
1191 fn format_data(&self) -> FormatSizes {
1192 FormatSizes {
1193 size: self.data,
1194 #[cfg(feature = "print_cache_item_size_with_compressed")]
1195 compressed_size: self.data_compressed,
1196 }
1197 }
1198
1199 fn format_avg_data(&self) -> FormatSizes {
1200 FormatSizes {
1201 size: self.data.checked_div(self.data_count).unwrap_or(0),
1202 #[cfg(feature = "print_cache_item_size_with_compressed")]
1203 compressed_size: self
1204 .data_compressed
1205 .checked_div(self.data_count)
1206 .unwrap_or(0),
1207 }
1208 }
1209
1210 fn format_meta(&self) -> FormatSizes {
1211 FormatSizes {
1212 size: self.meta,
1213 #[cfg(feature = "print_cache_item_size_with_compressed")]
1214 compressed_size: self.meta_compressed,
1215 }
1216 }
1217
1218 fn format_avg_meta(&self) -> FormatSizes {
1219 FormatSizes {
1220 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1221 #[cfg(feature = "print_cache_item_size_with_compressed")]
1222 compressed_size: self
1223 .meta_compressed
1224 .checked_div(self.meta_count)
1225 .unwrap_or(0),
1226 }
1227 }
1228 }
1229 #[cfg(feature = "print_cache_item_size")]
1230 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1231 Mutex::new(FxHashMap::default());
1232
1233 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1240 let encode_category = |task_id: TaskId,
1241 data: &TaskStorage,
1242 category: SpecificTaskDataCategory,
1243 buffer: &mut TurboBincodeBuffer|
1244 -> Option<TurboBincodeBuffer> {
1245 match encode_task_data(task_id, data, category, buffer) {
1246 Ok(encoded) => {
1247 #[cfg(feature = "print_cache_item_size")]
1248 {
1249 let mut stats = task_cache_stats.lock();
1250 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1251 match category {
1252 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1253 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1254 }
1255 }
1256 Some(encoded)
1257 }
1258 Err(err) => {
1259 eprintln!(
1260 "Serializing task {} failed ({:?}): {:?}",
1261 self.debug_get_task_description(task_id),
1262 category,
1263 err
1264 );
1265 None
1266 }
1267 }
1268 };
1269 if task_id.is_transient() {
1270 unreachable!("transient task_ids should never be enqueued to be persisted");
1271 }
1272
1273 let encode_meta = inner.flags.meta_modified();
1274 let encode_data = inner.flags.data_modified();
1275
1276 #[cfg(feature = "print_cache_item_size")]
1277 if encode_data || encode_meta {
1278 task_cache_stats
1279 .lock()
1280 .entry(TaskCacheStats::task_name(inner))
1281 .or_default()
1282 .add_counts(inner);
1283 }
1284
1285 let meta = if encode_meta {
1286 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1287 } else {
1288 None
1289 };
1290
1291 let data = if encode_data {
1292 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1293 } else {
1294 None
1295 };
1296
1297 SnapshotItem {
1298 task_id,
1299 meta,
1300 data,
1301 }
1302 };
1303
1304 let task_snapshots = self.storage.take_snapshot(&process);
1306
1307 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1308
1309 drop(snapshot_span);
1310 let snapshot_duration = start.elapsed();
1311 let task_count = task_snapshots.len();
1312
1313 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1314 return Some((snapshot_time, false));
1315 }
1316
1317 let persist_start = Instant::now();
1318 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1319 {
1320 if let Err(err) = self.backing_storage.save_snapshot(
1321 suspended_operations,
1322 persisted_task_cache_log,
1323 task_snapshots,
1324 ) {
1325 eprintln!("Persisting failed: {err:?}");
1326 return None;
1327 }
1328 #[cfg(feature = "print_cache_item_size")]
1329 {
1330 let mut task_cache_stats = task_cache_stats
1331 .into_inner()
1332 .into_iter()
1333 .collect::<Vec<_>>();
1334 if !task_cache_stats.is_empty() {
1335 use turbo_tasks::util::FormatBytes;
1336
1337 use crate::utils::markdown_table::print_markdown_table;
1338
1339 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1340 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1341 });
1342
1343 println!(
1344 "Task cache stats: {}",
1345 FormatSizes {
1346 size: task_cache_stats
1347 .iter()
1348 .map(|(_, s)| s.data + s.meta)
1349 .sum::<usize>(),
1350 #[cfg(feature = "print_cache_item_size_with_compressed")]
1351 compressed_size: task_cache_stats
1352 .iter()
1353 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1354 .sum::<usize>()
1355 },
1356 );
1357
1358 print_markdown_table(
1359 [
1360 "Task",
1361 " Total Size",
1362 " Data Size",
1363 " Data Count x Avg",
1364 " Data Count x Avg",
1365 " Meta Size",
1366 " Meta Count x Avg",
1367 " Meta Count x Avg",
1368 " Uppers",
1369 " Coll",
1370 " Agg Coll",
1371 " Children",
1372 " Followers",
1373 " Coll Deps",
1374 " Agg Dirty",
1375 " Output Size",
1376 ],
1377 task_cache_stats.iter(),
1378 |(task_desc, stats)| {
1379 [
1380 task_desc.to_string(),
1381 format!(" {}", stats.format_total()),
1382 format!(" {}", stats.format_data()),
1383 format!(" {} x", stats.data_count),
1384 format!("{}", stats.format_avg_data()),
1385 format!(" {}", stats.format_meta()),
1386 format!(" {} x", stats.meta_count),
1387 format!("{}", stats.format_avg_meta()),
1388 format!(" {}", stats.upper_count),
1389 format!(" {}", stats.collectibles_count),
1390 format!(" {}", stats.aggregated_collectibles_count),
1391 format!(" {}", stats.children_count),
1392 format!(" {}", stats.followers_count),
1393 format!(" {}", stats.collectibles_dependents_count),
1394 format!(" {}", stats.aggregated_dirty_containers_count),
1395 format!(" {}", FormatBytes(stats.output_size)),
1396 ]
1397 },
1398 );
1399 }
1400 }
1401 }
1402
1403 let elapsed = start.elapsed();
1404 let persist_duration = persist_start.elapsed();
1405 if elapsed > Duration::from_secs(10) {
1407 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1408 "Finished writing to filesystem cache".to_string(),
1409 elapsed,
1410 )));
1411 }
1412
1413 let wall_start_ms = wall_start
1414 .duration_since(SystemTime::UNIX_EPOCH)
1415 .unwrap_or_default()
1416 .as_secs_f64()
1418 * 1000.0;
1419 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1420 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1421 "turbopack-persistence",
1422 wall_start_ms,
1423 wall_end_ms,
1424 vec![
1425 ("reason", serde_json::Value::from(reason)),
1426 (
1427 "snapshot_duration_ms",
1428 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1429 ),
1430 (
1431 "persist_duration_ms",
1432 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1433 ),
1434 ("task_count", serde_json::Value::from(task_count)),
1435 ],
1436 )));
1437
1438 Some((snapshot_time, true))
1439 }
1440
1441 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1442 if self.should_restore() {
1443 let uncompleted_operations = self
1447 .backing_storage
1448 .uncompleted_operations()
1449 .expect("Failed to get uncompleted operations");
1450 if !uncompleted_operations.is_empty() {
1451 let mut ctx = self.execute_context(turbo_tasks);
1452 for op in uncompleted_operations {
1453 op.execute(&mut ctx);
1454 }
1455 }
1456 }
1457
1458 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1461 let _span = trace_span!("persisting background job").entered();
1463 let _span = tracing::info_span!("thread").entered();
1464 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1465 }
1466 }
1467
1468 fn stopping(&self) {
1469 self.stopping.store(true, Ordering::Release);
1470 self.stopping_event.notify(usize::MAX);
1471 }
1472
1473 #[allow(unused_variables)]
1474 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1475 #[cfg(feature = "verify_aggregation_graph")]
1476 {
1477 self.is_idle.store(false, Ordering::Release);
1478 self.verify_aggregation_graph(turbo_tasks, false);
1479 }
1480 if self.should_persist() {
1481 self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1482 }
1483 drop_contents(&self.task_cache);
1484 self.storage.drop_contents();
1485 if let Err(err) = self.backing_storage.shutdown() {
1486 println!("Shutting down failed: {err}");
1487 }
1488 }
1489
1490 #[allow(unused_variables)]
1491 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1492 self.idle_start_event.notify(usize::MAX);
1493
1494 #[cfg(feature = "verify_aggregation_graph")]
1495 {
1496 use tokio::select;
1497
1498 self.is_idle.store(true, Ordering::Release);
1499 let this = self.clone();
1500 let turbo_tasks = turbo_tasks.pin();
1501 tokio::task::spawn(async move {
1502 select! {
1503 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1504 }
1506 _ = this.idle_end_event.listen() => {
1507 return;
1508 }
1509 }
1510 if !this.is_idle.load(Ordering::Relaxed) {
1511 return;
1512 }
1513 this.verify_aggregation_graph(&*turbo_tasks, true);
1514 });
1515 }
1516 }
1517
1518 fn idle_end(&self) {
1519 #[cfg(feature = "verify_aggregation_graph")]
1520 self.is_idle.store(false, Ordering::Release);
1521 self.idle_end_event.notify(usize::MAX);
1522 }
1523
1524 fn get_or_create_persistent_task(
1525 &self,
1526 task_type: CachedTaskType,
1527 parent_task: Option<TaskId>,
1528 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1529 ) -> TaskId {
1530 let is_root = task_type.native_fn.is_root;
1531 let is_session_dependent = task_type.native_fn.is_session_dependent;
1532 let mut ctx = self.execute_context(turbo_tasks);
1534 if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1538 self.track_cache_hit(&task_type);
1539 operation::ConnectChildOperation::run(
1540 parent_task,
1541 task_id,
1542 Some(ArcOrOwned::Owned(task_type)),
1543 ctx,
1544 );
1545 return task_id;
1546 }
1547
1548 let mut is_new = false;
1549 let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1550 self.track_cache_hit(&task_type);
1553 let task_type = match raw_entry(&self.task_cache, &task_type) {
1554 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1555 RawEntry::Vacant(e) => {
1556 let task_type = Arc::new(task_type);
1557 e.insert(task_type.clone(), task_id);
1558 ArcOrOwned::Arc(task_type)
1559 }
1560 };
1561 (task_id, task_type)
1562 } else {
1563 let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1566 RawEntry::Occupied(e) => {
1567 let task_id = *e.get();
1570 drop(e);
1571 self.track_cache_hit(&task_type);
1572 (task_id, ArcOrOwned::Owned(task_type))
1573 }
1574 RawEntry::Vacant(e) => {
1575 let task_type = Arc::new(task_type);
1577 let task_id = self.persisted_task_id_factory.get();
1578 e.insert(task_type.clone(), task_id);
1579 self.track_cache_miss(&task_type);
1581 is_new = true;
1582 if let Some(log) = &self.persisted_task_cache_log {
1583 log.lock(task_id).push((task_type.clone(), task_id));
1584 }
1585 (task_id, ArcOrOwned::Arc(task_type))
1586 }
1587 };
1588 (task_id, task_type)
1589 };
1590 if is_new {
1591 self.set_initial_aggregation_number(task_id, is_root, is_session_dependent, &mut ctx);
1592 }
1593 operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1595
1596 task_id
1597 }
1598
1599 fn get_or_create_transient_task(
1600 &self,
1601 task_type: CachedTaskType,
1602 parent_task: Option<TaskId>,
1603 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1604 ) -> TaskId {
1605 let is_root = task_type.native_fn.is_root;
1606 let is_session_dependent = task_type.native_fn.is_session_dependent;
1607
1608 if let Some(parent_task) = parent_task
1609 && !parent_task.is_transient()
1610 {
1611 self.panic_persistent_calling_transient(
1612 self.debug_get_task_description(parent_task),
1613 Some(&task_type),
1614 None,
1615 );
1616 }
1617 let mut ctx = self.execute_context(turbo_tasks);
1618 if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1622 self.track_cache_hit(&task_type);
1623 operation::ConnectChildOperation::run(
1624 parent_task,
1625 task_id,
1626 Some(ArcOrOwned::Owned(task_type)),
1627 ctx,
1628 );
1629 return task_id;
1630 }
1631 match raw_entry(&self.task_cache, &task_type) {
1633 RawEntry::Occupied(e) => {
1634 let task_id = *e.get();
1635 drop(e);
1636 self.track_cache_hit(&task_type);
1637 operation::ConnectChildOperation::run(
1638 parent_task,
1639 task_id,
1640 Some(ArcOrOwned::Owned(task_type)),
1641 ctx,
1642 );
1643 task_id
1644 }
1645 RawEntry::Vacant(e) => {
1646 let task_type = Arc::new(task_type);
1647 let task_id = self.transient_task_id_factory.get();
1648 e.insert(task_type.clone(), task_id);
1649 self.track_cache_miss(&task_type);
1650
1651 self.set_initial_aggregation_number(
1652 task_id,
1653 is_root,
1654 is_session_dependent,
1655 &mut ctx,
1656 );
1657
1658 operation::ConnectChildOperation::run(
1659 parent_task,
1660 task_id,
1661 Some(ArcOrOwned::Arc(task_type)),
1662 ctx,
1663 );
1664
1665 task_id
1666 }
1667 }
1668 }
1669
1670 fn debug_trace_transient_task(
1673 &self,
1674 task_type: &CachedTaskType,
1675 cell_id: Option<CellId>,
1676 ) -> DebugTraceTransientTask {
1677 fn inner_id(
1680 backend: &TurboTasksBackendInner<impl BackingStorage>,
1681 task_id: TaskId,
1682 cell_type_id: Option<ValueTypeId>,
1683 visited_set: &mut FxHashSet<TaskId>,
1684 ) -> DebugTraceTransientTask {
1685 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1686 if visited_set.contains(&task_id) {
1687 let task_name = task_type.get_name();
1688 DebugTraceTransientTask::Collapsed {
1689 task_name,
1690 cell_type_id,
1691 }
1692 } else {
1693 inner_cached(backend, &task_type, cell_type_id, visited_set)
1694 }
1695 } else {
1696 DebugTraceTransientTask::Uncached { cell_type_id }
1697 }
1698 }
1699 fn inner_cached(
1700 backend: &TurboTasksBackendInner<impl BackingStorage>,
1701 task_type: &CachedTaskType,
1702 cell_type_id: Option<ValueTypeId>,
1703 visited_set: &mut FxHashSet<TaskId>,
1704 ) -> DebugTraceTransientTask {
1705 let task_name = task_type.get_name();
1706
1707 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1708 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1709 return None;
1713 };
1714 if task_id.is_transient() {
1715 Some(Box::new(inner_id(
1716 backend,
1717 task_id,
1718 cause_self_raw_vc.try_get_type_id(),
1719 visited_set,
1720 )))
1721 } else {
1722 None
1723 }
1724 });
1725 let cause_args = task_type
1726 .arg
1727 .get_raw_vcs()
1728 .into_iter()
1729 .filter_map(|raw_vc| {
1730 let Some(task_id) = raw_vc.try_get_task_id() else {
1731 return None;
1733 };
1734 if !task_id.is_transient() {
1735 return None;
1736 }
1737 Some((task_id, raw_vc.try_get_type_id()))
1738 })
1739 .collect::<IndexSet<_>>() .into_iter()
1741 .map(|(task_id, cell_type_id)| {
1742 inner_id(backend, task_id, cell_type_id, visited_set)
1743 })
1744 .collect();
1745
1746 DebugTraceTransientTask::Cached {
1747 task_name,
1748 cell_type_id,
1749 cause_self,
1750 cause_args,
1751 }
1752 }
1753 inner_cached(
1754 self,
1755 task_type,
1756 cell_id.map(|c| c.type_id),
1757 &mut FxHashSet::default(),
1758 )
1759 }
1760
1761 fn invalidate_task(
1762 &self,
1763 task_id: TaskId,
1764 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1765 ) {
1766 if !self.should_track_dependencies() {
1767 panic!("Dependency tracking is disabled so invalidation is not allowed");
1768 }
1769 operation::InvalidateOperation::run(
1770 smallvec![task_id],
1771 #[cfg(feature = "trace_task_dirty")]
1772 TaskDirtyCause::Invalidator,
1773 self.execute_context(turbo_tasks),
1774 );
1775 }
1776
1777 fn invalidate_tasks(
1778 &self,
1779 tasks: &[TaskId],
1780 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1781 ) {
1782 if !self.should_track_dependencies() {
1783 panic!("Dependency tracking is disabled so invalidation is not allowed");
1784 }
1785 operation::InvalidateOperation::run(
1786 tasks.iter().copied().collect(),
1787 #[cfg(feature = "trace_task_dirty")]
1788 TaskDirtyCause::Unknown,
1789 self.execute_context(turbo_tasks),
1790 );
1791 }
1792
1793 fn invalidate_tasks_set(
1794 &self,
1795 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1796 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1797 ) {
1798 if !self.should_track_dependencies() {
1799 panic!("Dependency tracking is disabled so invalidation is not allowed");
1800 }
1801 operation::InvalidateOperation::run(
1802 tasks.iter().copied().collect(),
1803 #[cfg(feature = "trace_task_dirty")]
1804 TaskDirtyCause::Unknown,
1805 self.execute_context(turbo_tasks),
1806 );
1807 }
1808
1809 fn invalidate_serialization(
1810 &self,
1811 task_id: TaskId,
1812 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1813 ) {
1814 if task_id.is_transient() {
1815 return;
1816 }
1817 let mut ctx = self.execute_context(turbo_tasks);
1818 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1819 task.invalidate_serialization();
1820 }
1821
1822 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1823 let task = self.storage.access_mut(task_id);
1824 if let Some(value) = task.get_persistent_task_type() {
1825 format!("{task_id:?} {}", value)
1826 } else if let Some(value) = task.get_transient_task_type() {
1827 format!("{task_id:?} {}", value)
1828 } else {
1829 format!("{task_id:?} unknown")
1830 }
1831 }
1832
1833 fn get_task_name(
1834 &self,
1835 task_id: TaskId,
1836 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1837 ) -> String {
1838 let mut ctx = self.execute_context(turbo_tasks);
1839 let task = ctx.task(task_id, TaskDataCategory::Data);
1840 if let Some(value) = task.get_persistent_task_type() {
1841 value.to_string()
1842 } else if let Some(value) = task.get_transient_task_type() {
1843 value.to_string()
1844 } else {
1845 "unknown".to_string()
1846 }
1847 }
1848
1849 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1850 let task = self.storage.access_mut(task_id);
1851 task.get_persistent_task_type().cloned()
1852 }
1853
1854 fn task_execution_canceled(
1855 &self,
1856 task_id: TaskId,
1857 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1858 ) {
1859 let mut ctx = self.execute_context(turbo_tasks);
1860 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1861 if let Some(in_progress) = task.take_in_progress() {
1862 match in_progress {
1863 InProgressState::Scheduled {
1864 done_event,
1865 reason: _,
1866 } => done_event.notify(usize::MAX),
1867 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1868 done_event.notify(usize::MAX)
1869 }
1870 InProgressState::Canceled => {}
1871 }
1872 }
1873 let old = task.set_in_progress(InProgressState::Canceled);
1874 debug_assert!(old.is_none(), "InProgress already exists");
1875 }
1876
1877 fn try_start_task_execution(
1878 &self,
1879 task_id: TaskId,
1880 priority: TaskPriority,
1881 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1882 ) -> Option<TaskExecutionSpec<'_>> {
1883 let execution_reason;
1884 let task_type;
1885 {
1886 let mut ctx = self.execute_context(turbo_tasks);
1887 let mut task = ctx.task(task_id, TaskDataCategory::All);
1888 task_type = task.get_task_type().to_owned();
1889 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1890 if let Some(tasks) = task.prefetch() {
1891 drop(task);
1892 ctx.prepare_tasks(tasks);
1893 task = ctx.task(task_id, TaskDataCategory::All);
1894 }
1895 let in_progress = task.take_in_progress()?;
1896 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1897 let old = task.set_in_progress(in_progress);
1898 debug_assert!(old.is_none(), "InProgress already exists");
1899 return None;
1900 };
1901 execution_reason = reason;
1902 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1903 InProgressStateInner {
1904 stale: false,
1905 once_task,
1906 done_event,
1907 marked_as_completed: false,
1908 new_children: Default::default(),
1909 },
1910 )));
1911 debug_assert!(old.is_none(), "InProgress already exists");
1912
1913 enum Collectible {
1915 Current(CollectibleRef, i32),
1916 Outdated(CollectibleRef),
1917 }
1918 let collectibles = task
1919 .iter_collectibles()
1920 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1921 .chain(
1922 task.iter_outdated_collectibles()
1923 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1924 )
1925 .collect::<Vec<_>>();
1926 for collectible in collectibles {
1927 match collectible {
1928 Collectible::Current(collectible, value) => {
1929 let _ = task.insert_outdated_collectible(collectible, value);
1930 }
1931 Collectible::Outdated(collectible) => {
1932 if task
1933 .collectibles()
1934 .is_none_or(|m| m.get(&collectible).is_none())
1935 {
1936 task.remove_outdated_collectibles(&collectible);
1937 }
1938 }
1939 }
1940 }
1941
1942 if self.should_track_dependencies() {
1943 let cell_dependencies = task.iter_cell_dependencies().collect();
1945 task.set_outdated_cell_dependencies(cell_dependencies);
1946
1947 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1948 task.set_outdated_output_dependencies(outdated_output_dependencies);
1949 }
1950 }
1951
1952 let (span, future) = match task_type {
1953 TaskType::Cached(task_type) => {
1954 let CachedTaskType {
1955 native_fn,
1956 this,
1957 arg,
1958 } = &*task_type;
1959 (
1960 native_fn.span(task_id.persistence(), execution_reason, priority),
1961 native_fn.execute(*this, &**arg),
1962 )
1963 }
1964 TaskType::Transient(task_type) => {
1965 let span = tracing::trace_span!("turbo_tasks::root_task");
1966 let future = match &*task_type {
1967 TransientTask::Root(f) => f(),
1968 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1969 };
1970 (span, future)
1971 }
1972 };
1973 Some(TaskExecutionSpec { future, span })
1974 }
1975
1976 fn task_execution_completed(
1977 &self,
1978 task_id: TaskId,
1979 result: Result<RawVc, TurboTasksExecutionError>,
1980 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1981 #[cfg(feature = "verify_determinism")] stateful: bool,
1982 has_invalidator: bool,
1983 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1984 ) -> bool {
1985 #[cfg(not(feature = "trace_task_details"))]
2000 let span = tracing::trace_span!(
2001 "task execution completed",
2002 new_children = tracing::field::Empty
2003 )
2004 .entered();
2005 #[cfg(feature = "trace_task_details")]
2006 let span = tracing::trace_span!(
2007 "task execution completed",
2008 task_id = display(task_id),
2009 result = match result.as_ref() {
2010 Ok(value) => display(either::Either::Left(value)),
2011 Err(err) => display(either::Either::Right(err)),
2012 },
2013 new_children = tracing::field::Empty,
2014 immutable = tracing::field::Empty,
2015 new_output = tracing::field::Empty,
2016 output_dependents = tracing::field::Empty,
2017 stale = tracing::field::Empty,
2018 )
2019 .entered();
2020
2021 let is_error = result.is_err();
2022
2023 let mut ctx = self.execute_context(turbo_tasks);
2024
2025 let Some(TaskExecutionCompletePrepareResult {
2026 new_children,
2027 is_now_immutable,
2028 #[cfg(feature = "verify_determinism")]
2029 no_output_set,
2030 new_output,
2031 output_dependent_tasks,
2032 }) = self.task_execution_completed_prepare(
2033 &mut ctx,
2034 #[cfg(feature = "trace_task_details")]
2035 &span,
2036 task_id,
2037 result,
2038 cell_counters,
2039 #[cfg(feature = "verify_determinism")]
2040 stateful,
2041 has_invalidator,
2042 )
2043 else {
2044 #[cfg(feature = "trace_task_details")]
2046 span.record("stale", "prepare");
2047 return true;
2048 };
2049
2050 #[cfg(feature = "trace_task_details")]
2051 span.record("new_output", new_output.is_some());
2052 #[cfg(feature = "trace_task_details")]
2053 span.record("output_dependents", output_dependent_tasks.len());
2054
2055 if !output_dependent_tasks.is_empty() {
2060 self.task_execution_completed_invalidate_output_dependent(
2061 &mut ctx,
2062 task_id,
2063 output_dependent_tasks,
2064 );
2065 }
2066
2067 let has_new_children = !new_children.is_empty();
2068 span.record("new_children", new_children.len());
2069
2070 if has_new_children {
2071 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2072 }
2073
2074 if has_new_children
2075 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2076 {
2077 #[cfg(feature = "trace_task_details")]
2079 span.record("stale", "connect");
2080 return true;
2081 }
2082
2083 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2084 &mut ctx,
2085 task_id,
2086 #[cfg(feature = "verify_determinism")]
2087 no_output_set,
2088 new_output,
2089 is_now_immutable,
2090 );
2091 if stale {
2092 #[cfg(feature = "trace_task_details")]
2094 span.record("stale", "finish");
2095 return true;
2096 }
2097
2098 let removed_data =
2099 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2100
2101 drop(removed_data);
2103 drop(in_progress_cells);
2104
2105 false
2106 }
2107
2108 fn task_execution_completed_prepare(
2109 &self,
2110 ctx: &mut impl ExecuteContext<'_>,
2111 #[cfg(feature = "trace_task_details")] span: &Span,
2112 task_id: TaskId,
2113 result: Result<RawVc, TurboTasksExecutionError>,
2114 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2115 #[cfg(feature = "verify_determinism")] stateful: bool,
2116 has_invalidator: bool,
2117 ) -> Option<TaskExecutionCompletePrepareResult> {
2118 let mut task = ctx.task(task_id, TaskDataCategory::All);
2119 let Some(in_progress) = task.get_in_progress_mut() else {
2120 panic!("Task execution completed, but task is not in progress: {task:#?}");
2121 };
2122 if matches!(in_progress, InProgressState::Canceled) {
2123 return Some(TaskExecutionCompletePrepareResult {
2124 new_children: Default::default(),
2125 is_now_immutable: false,
2126 #[cfg(feature = "verify_determinism")]
2127 no_output_set: false,
2128 new_output: None,
2129 output_dependent_tasks: Default::default(),
2130 });
2131 }
2132 let &mut InProgressState::InProgress(box InProgressStateInner {
2133 stale,
2134 ref mut new_children,
2135 once_task: is_once_task,
2136 ..
2137 }) = in_progress
2138 else {
2139 panic!("Task execution completed, but task is not in progress: {task:#?}");
2140 };
2141
2142 #[cfg(not(feature = "no_fast_stale"))]
2144 if stale && !is_once_task {
2145 let Some(InProgressState::InProgress(box InProgressStateInner {
2146 done_event,
2147 mut new_children,
2148 ..
2149 })) = task.take_in_progress()
2150 else {
2151 unreachable!();
2152 };
2153 let old = task.set_in_progress(InProgressState::Scheduled {
2154 done_event,
2155 reason: TaskExecutionReason::Stale,
2156 });
2157 debug_assert!(old.is_none(), "InProgress already exists");
2158 for task in task.iter_children() {
2161 new_children.remove(&task);
2162 }
2163 drop(task);
2164
2165 AggregationUpdateQueue::run(
2168 AggregationUpdateJob::DecreaseActiveCounts {
2169 task_ids: new_children.into_iter().collect(),
2170 },
2171 ctx,
2172 );
2173 return None;
2174 }
2175
2176 let mut new_children = take(new_children);
2178
2179 #[cfg(feature = "verify_determinism")]
2181 if stateful {
2182 task.set_stateful(true);
2183 }
2184
2185 if has_invalidator {
2187 task.set_invalidator(true);
2188 }
2189
2190 let old_counters: FxHashMap<_, _> = task
2192 .iter_cell_type_max_index()
2193 .map(|(&k, &v)| (k, v))
2194 .collect();
2195 let mut counters_to_remove = old_counters.clone();
2196
2197 for (&cell_type, &max_index) in cell_counters.iter() {
2198 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2199 if old_max_index != max_index {
2200 task.insert_cell_type_max_index(cell_type, max_index);
2201 }
2202 } else {
2203 task.insert_cell_type_max_index(cell_type, max_index);
2204 }
2205 }
2206 for (cell_type, _) in counters_to_remove {
2207 task.remove_cell_type_max_index(&cell_type);
2208 }
2209
2210 let mut queue = AggregationUpdateQueue::new();
2211
2212 let mut old_edges = Vec::new();
2213
2214 let has_children = !new_children.is_empty();
2215 let is_immutable = task.immutable();
2216 let task_dependencies_for_immutable =
2217 if !is_immutable
2219 && !task.is_session_dependent()
2221 && !task.invalidator()
2223 && task.is_collectibles_dependencies_empty()
2225 {
2226 Some(
2227 task.iter_output_dependencies()
2229 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2230 .collect::<FxHashSet<_>>(),
2231 )
2232 } else {
2233 None
2234 };
2235
2236 if has_children {
2237 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2239
2240 old_edges.extend(
2242 task.iter_children()
2243 .filter(|task| !new_children.remove(task))
2244 .map(OutdatedEdge::Child),
2245 );
2246 } else {
2247 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2248 }
2249
2250 old_edges.extend(
2251 task.iter_outdated_collectibles()
2252 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2253 );
2254
2255 if self.should_track_dependencies() {
2256 old_edges.extend(
2263 task.iter_outdated_cell_dependencies()
2264 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2265 );
2266 old_edges.extend(
2267 task.iter_outdated_output_dependencies()
2268 .map(OutdatedEdge::OutputDependency),
2269 );
2270 }
2271
2272 let current_output = task.get_output();
2274 #[cfg(feature = "verify_determinism")]
2275 let no_output_set = current_output.is_none();
2276 let new_output = match result {
2277 Ok(RawVc::TaskOutput(output_task_id)) => {
2278 if let Some(OutputValue::Output(current_task_id)) = current_output
2279 && *current_task_id == output_task_id
2280 {
2281 None
2282 } else {
2283 Some(OutputValue::Output(output_task_id))
2284 }
2285 }
2286 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2287 if let Some(OutputValue::Cell(CellRef {
2288 task: current_task_id,
2289 cell: current_cell,
2290 })) = current_output
2291 && *current_task_id == output_task_id
2292 && *current_cell == cell
2293 {
2294 None
2295 } else {
2296 Some(OutputValue::Cell(CellRef {
2297 task: output_task_id,
2298 cell,
2299 }))
2300 }
2301 }
2302 Ok(RawVc::LocalOutput(..)) => {
2303 panic!("Non-local tasks must not return a local Vc");
2304 }
2305 Err(err) => {
2306 if let Some(OutputValue::Error(old_error)) = current_output
2307 && **old_error == err
2308 {
2309 None
2310 } else {
2311 Some(OutputValue::Error(Arc::new((&err).into())))
2312 }
2313 }
2314 };
2315 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2316 if new_output.is_some() && ctx.should_track_dependencies() {
2318 output_dependent_tasks = task.iter_output_dependent().collect();
2319 }
2320
2321 drop(task);
2322
2323 let mut is_now_immutable = false;
2325 if let Some(dependencies) = task_dependencies_for_immutable
2326 && dependencies
2327 .iter()
2328 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2329 {
2330 is_now_immutable = true;
2331 }
2332 #[cfg(feature = "trace_task_details")]
2333 span.record("immutable", is_immutable || is_now_immutable);
2334
2335 if !queue.is_empty() || !old_edges.is_empty() {
2336 #[cfg(feature = "trace_task_completion")]
2337 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2338 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2342 }
2343
2344 Some(TaskExecutionCompletePrepareResult {
2345 new_children,
2346 is_now_immutable,
2347 #[cfg(feature = "verify_determinism")]
2348 no_output_set,
2349 new_output,
2350 output_dependent_tasks,
2351 })
2352 }
2353
2354 fn task_execution_completed_invalidate_output_dependent(
2355 &self,
2356 ctx: &mut impl ExecuteContext<'_>,
2357 task_id: TaskId,
2358 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2359 ) {
2360 debug_assert!(!output_dependent_tasks.is_empty());
2361
2362 if output_dependent_tasks.len() > 1 {
2363 ctx.prepare_tasks(
2364 output_dependent_tasks
2365 .iter()
2366 .map(|&id| (id, TaskDataCategory::All)),
2367 );
2368 }
2369
2370 fn process_output_dependents(
2371 ctx: &mut impl ExecuteContext<'_>,
2372 task_id: TaskId,
2373 dependent_task_id: TaskId,
2374 queue: &mut AggregationUpdateQueue,
2375 ) {
2376 #[cfg(feature = "trace_task_output_dependencies")]
2377 let span = tracing::trace_span!(
2378 "invalidate output dependency",
2379 task = %task_id,
2380 dependent_task = %dependent_task_id,
2381 result = tracing::field::Empty,
2382 )
2383 .entered();
2384 let mut make_stale = true;
2385 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2386 let transient_task_type = dependent.get_transient_task_type();
2387 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2388 #[cfg(feature = "trace_task_output_dependencies")]
2390 span.record("result", "once task");
2391 return;
2392 }
2393 if dependent.outdated_output_dependencies_contains(&task_id) {
2394 #[cfg(feature = "trace_task_output_dependencies")]
2395 span.record("result", "outdated dependency");
2396 make_stale = false;
2401 } else if !dependent.output_dependencies_contains(&task_id) {
2402 #[cfg(feature = "trace_task_output_dependencies")]
2405 span.record("result", "no backward dependency");
2406 return;
2407 }
2408 make_task_dirty_internal(
2409 dependent,
2410 dependent_task_id,
2411 make_stale,
2412 #[cfg(feature = "trace_task_dirty")]
2413 TaskDirtyCause::OutputChange { task_id },
2414 queue,
2415 ctx,
2416 );
2417 #[cfg(feature = "trace_task_output_dependencies")]
2418 span.record("result", "marked dirty");
2419 }
2420
2421 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2422 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2423 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2424 let _ = scope_and_block(chunks.len(), |scope| {
2425 for chunk in chunks {
2426 let child_ctx = ctx.child_context();
2427 scope.spawn(move || {
2428 let mut ctx = child_ctx.create();
2429 let mut queue = AggregationUpdateQueue::new();
2430 for dependent_task_id in chunk {
2431 process_output_dependents(
2432 &mut ctx,
2433 task_id,
2434 dependent_task_id,
2435 &mut queue,
2436 )
2437 }
2438 queue.execute(&mut ctx);
2439 });
2440 }
2441 });
2442 } else {
2443 let mut queue = AggregationUpdateQueue::new();
2444 for dependent_task_id in output_dependent_tasks {
2445 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2446 }
2447 queue.execute(ctx);
2448 }
2449 }
2450
2451 fn task_execution_completed_unfinished_children_dirty(
2452 &self,
2453 ctx: &mut impl ExecuteContext<'_>,
2454 new_children: &FxHashSet<TaskId>,
2455 ) {
2456 debug_assert!(!new_children.is_empty());
2457
2458 let mut queue = AggregationUpdateQueue::new();
2459 ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2460 if !child_task.has_output() {
2461 let child_id = child_task.id();
2462 make_task_dirty_internal(
2463 child_task,
2464 child_id,
2465 false,
2466 #[cfg(feature = "trace_task_dirty")]
2467 TaskDirtyCause::InitialDirty,
2468 &mut queue,
2469 ctx,
2470 );
2471 }
2472 });
2473
2474 queue.execute(ctx);
2475 }
2476
2477 fn task_execution_completed_connect(
2478 &self,
2479 ctx: &mut impl ExecuteContext<'_>,
2480 task_id: TaskId,
2481 new_children: FxHashSet<TaskId>,
2482 ) -> bool {
2483 debug_assert!(!new_children.is_empty());
2484
2485 let mut task = ctx.task(task_id, TaskDataCategory::All);
2486 let Some(in_progress) = task.get_in_progress() else {
2487 panic!("Task execution completed, but task is not in progress: {task:#?}");
2488 };
2489 if matches!(in_progress, InProgressState::Canceled) {
2490 return false;
2492 }
2493 let InProgressState::InProgress(box InProgressStateInner {
2494 #[cfg(not(feature = "no_fast_stale"))]
2495 stale,
2496 once_task: is_once_task,
2497 ..
2498 }) = in_progress
2499 else {
2500 panic!("Task execution completed, but task is not in progress: {task:#?}");
2501 };
2502
2503 #[cfg(not(feature = "no_fast_stale"))]
2505 if *stale && !is_once_task {
2506 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2507 task.take_in_progress()
2508 else {
2509 unreachable!();
2510 };
2511 let old = task.set_in_progress(InProgressState::Scheduled {
2512 done_event,
2513 reason: TaskExecutionReason::Stale,
2514 });
2515 debug_assert!(old.is_none(), "InProgress already exists");
2516 drop(task);
2517
2518 AggregationUpdateQueue::run(
2521 AggregationUpdateJob::DecreaseActiveCounts {
2522 task_ids: new_children.into_iter().collect(),
2523 },
2524 ctx,
2525 );
2526 return true;
2527 }
2528
2529 let has_active_count = ctx.should_track_activeness()
2530 && task
2531 .get_activeness()
2532 .is_some_and(|activeness| activeness.active_counter > 0);
2533 connect_children(
2534 ctx,
2535 task_id,
2536 task,
2537 new_children,
2538 has_active_count,
2539 ctx.should_track_activeness(),
2540 );
2541
2542 false
2543 }
2544
2545 fn task_execution_completed_finish(
2546 &self,
2547 ctx: &mut impl ExecuteContext<'_>,
2548 task_id: TaskId,
2549 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2550 new_output: Option<OutputValue>,
2551 is_now_immutable: bool,
2552 ) -> (
2553 bool,
2554 Option<
2555 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2556 >,
2557 ) {
2558 let mut task = ctx.task(task_id, TaskDataCategory::All);
2559 let Some(in_progress) = task.take_in_progress() else {
2560 panic!("Task execution completed, but task is not in progress: {task:#?}");
2561 };
2562 if matches!(in_progress, InProgressState::Canceled) {
2563 return (false, None);
2565 }
2566 let InProgressState::InProgress(box InProgressStateInner {
2567 done_event,
2568 once_task: is_once_task,
2569 stale,
2570 marked_as_completed: _,
2571 new_children,
2572 }) = in_progress
2573 else {
2574 panic!("Task execution completed, but task is not in progress: {task:#?}");
2575 };
2576 debug_assert!(new_children.is_empty());
2577
2578 if stale && !is_once_task {
2580 let old = task.set_in_progress(InProgressState::Scheduled {
2581 done_event,
2582 reason: TaskExecutionReason::Stale,
2583 });
2584 debug_assert!(old.is_none(), "InProgress already exists");
2585 return (true, None);
2586 }
2587
2588 let mut old_content = None;
2590 if let Some(value) = new_output {
2591 old_content = task.set_output(value);
2592 }
2593
2594 if is_now_immutable {
2597 task.set_immutable(true);
2598 }
2599
2600 let in_progress_cells = task.take_in_progress_cells();
2602 if let Some(ref cells) = in_progress_cells {
2603 for state in cells.values() {
2604 state.event.notify(usize::MAX);
2605 }
2606 }
2607
2608 let old_dirtyness = task.get_dirty().cloned();
2610 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2611 None => (false, false),
2612 Some(Dirtyness::Dirty(_)) => (true, false),
2613 Some(Dirtyness::SessionDependent) => {
2614 let clean_in_current_session = task.current_session_clean();
2615 (true, clean_in_current_session)
2616 }
2617 };
2618
2619 let session_dependent = task.is_session_dependent();
2621 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2622 (Some(Dirtyness::SessionDependent), true, true)
2623 } else {
2624 (None, false, false)
2625 };
2626
2627 let dirty_changed = old_dirtyness != new_dirtyness;
2629 if dirty_changed {
2630 if let Some(value) = new_dirtyness {
2631 task.set_dirty(value);
2632 } else if old_dirtyness.is_some() {
2633 task.take_dirty();
2634 }
2635 }
2636 if old_current_session_self_clean != new_current_session_self_clean {
2637 if new_current_session_self_clean {
2638 task.set_current_session_clean(true);
2639 } else if old_current_session_self_clean {
2640 task.set_current_session_clean(false);
2641 }
2642 }
2643
2644 let data_update = if old_self_dirty != new_self_dirty
2646 || old_current_session_self_clean != new_current_session_self_clean
2647 {
2648 let dirty_container_count = task
2649 .get_aggregated_dirty_container_count()
2650 .cloned()
2651 .unwrap_or_default();
2652 let current_session_clean_container_count = task
2653 .get_aggregated_current_session_clean_container_count()
2654 .copied()
2655 .unwrap_or_default();
2656 let result = ComputeDirtyAndCleanUpdate {
2657 old_dirty_container_count: dirty_container_count,
2658 new_dirty_container_count: dirty_container_count,
2659 old_current_session_clean_container_count: current_session_clean_container_count,
2660 new_current_session_clean_container_count: current_session_clean_container_count,
2661 old_self_dirty,
2662 new_self_dirty,
2663 old_current_session_self_clean,
2664 new_current_session_self_clean,
2665 }
2666 .compute();
2667 if result.dirty_count_update - result.current_session_clean_update < 0 {
2668 if let Some(activeness_state) = task.get_activeness_mut() {
2670 activeness_state.all_clean_event.notify(usize::MAX);
2671 activeness_state.unset_active_until_clean();
2672 if activeness_state.is_empty() {
2673 task.take_activeness();
2674 }
2675 }
2676 }
2677 result
2678 .aggregated_update(task_id)
2679 .and_then(|aggregated_update| {
2680 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2681 })
2682 } else {
2683 None
2684 };
2685
2686 #[cfg(feature = "verify_determinism")]
2687 let reschedule =
2688 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2689 #[cfg(not(feature = "verify_determinism"))]
2690 let reschedule = false;
2691 if reschedule {
2692 let old = task.set_in_progress(InProgressState::Scheduled {
2693 done_event,
2694 reason: TaskExecutionReason::Stale,
2695 });
2696 debug_assert!(old.is_none(), "InProgress already exists");
2697 drop(task);
2698 } else {
2699 drop(task);
2700
2701 done_event.notify(usize::MAX);
2703 }
2704
2705 drop(old_content);
2706
2707 if let Some(data_update) = data_update {
2708 AggregationUpdateQueue::run(data_update, ctx);
2709 }
2710
2711 (reschedule, in_progress_cells)
2713 }
2714
2715 fn task_execution_completed_cleanup(
2716 &self,
2717 ctx: &mut impl ExecuteContext<'_>,
2718 task_id: TaskId,
2719 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2720 is_error: bool,
2721 ) -> Vec<SharedReference> {
2722 let mut task = ctx.task(task_id, TaskDataCategory::All);
2723 let mut removed_cell_data = Vec::new();
2724 if !is_error {
2728 let to_remove_persistent: Vec<_> = task
2735 .iter_persistent_cell_data()
2736 .filter_map(|(cell, _)| {
2737 cell_counters
2738 .get(&cell.type_id)
2739 .is_none_or(|start_index| cell.index >= *start_index)
2740 .then_some(*cell)
2741 })
2742 .collect();
2743
2744 let to_remove_transient: Vec<_> = task
2746 .iter_transient_cell_data()
2747 .filter_map(|(cell, _)| {
2748 cell_counters
2749 .get(&cell.type_id)
2750 .is_none_or(|start_index| cell.index >= *start_index)
2751 .then_some(*cell)
2752 })
2753 .collect();
2754 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2755 for cell in to_remove_persistent {
2756 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2757 removed_cell_data.push(data.into_untyped());
2758 }
2759 }
2760 for cell in to_remove_transient {
2761 if let Some(data) = task.remove_transient_cell_data(&cell) {
2762 removed_cell_data.push(data);
2763 }
2764 }
2765 }
2766
2767 task.cleanup_after_execution();
2771
2772 drop(task);
2773
2774 removed_cell_data
2776 }
2777
2778 fn run_backend_job<'a>(
2779 self: &'a Arc<Self>,
2780 job: TurboTasksBackendJob,
2781 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2782 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2783 Box::pin(async move {
2784 match job {
2785 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2786 debug_assert!(self.should_persist());
2787
2788 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2789 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2790 let mut idle_start_listener = self.idle_start_event.listen();
2791 let mut idle_end_listener = self.idle_end_event.listen();
2792 let mut fresh_idle = true;
2793 loop {
2794 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2795 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2796 let idle_timeout = *IDLE_TIMEOUT;
2797 let (time, mut reason) =
2798 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2799 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2800 } else {
2801 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2802 };
2803
2804 let until = last_snapshot + time;
2805 if until > Instant::now() {
2806 let mut stop_listener = self.stopping_event.listen();
2807 if self.stopping.load(Ordering::Acquire) {
2808 return;
2809 }
2810 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2811 Instant::now() + idle_timeout
2812 } else {
2813 far_future()
2814 };
2815 loop {
2816 tokio::select! {
2817 _ = &mut stop_listener => {
2818 return;
2819 },
2820 _ = &mut idle_start_listener => {
2821 fresh_idle = true;
2822 idle_time = Instant::now() + idle_timeout;
2823 idle_start_listener = self.idle_start_event.listen()
2824 },
2825 _ = &mut idle_end_listener => {
2826 idle_time = until + idle_timeout;
2827 idle_end_listener = self.idle_end_event.listen()
2828 },
2829 _ = tokio::time::sleep_until(until) => {
2830 break;
2831 },
2832 _ = tokio::time::sleep_until(idle_time) => {
2833 if turbo_tasks.is_idle() {
2834 reason = "idle timeout";
2835 break;
2836 }
2837 },
2838 }
2839 }
2840 }
2841
2842 let this = self.clone();
2843 let background_span =
2847 tracing::info_span!(parent: None, "background snapshot");
2848 let snapshot =
2849 this.snapshot_and_persist(background_span.id(), reason, turbo_tasks);
2850 if let Some((snapshot_start, new_data)) = snapshot {
2851 last_snapshot = snapshot_start;
2852
2853 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2860 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2861 let idle_ended = tokio::select! {
2862 biased;
2863 _ = &mut idle_end_listener => {
2864 idle_end_listener = self.idle_end_event.listen();
2865 true
2866 },
2867 _ = std::future::ready(()) => false,
2868 };
2869 if idle_ended {
2870 break;
2871 }
2872 let _compact_span = tracing::info_span!(
2876 parent: background_span.id(),
2877 "compact database"
2878 )
2879 .entered();
2880 match self.backing_storage.compact() {
2881 Ok(true) => {}
2882 Ok(false) => break,
2883 Err(err) => {
2884 eprintln!("Compaction failed: {err:?}");
2885 break;
2886 }
2887 }
2888 }
2889
2890 if !new_data {
2891 fresh_idle = false;
2892 continue;
2893 }
2894 let last_snapshot = last_snapshot.duration_since(self.start_time);
2895 self.last_snapshot.store(
2896 last_snapshot.as_millis().try_into().unwrap(),
2897 Ordering::Relaxed,
2898 );
2899
2900 turbo_tasks.schedule_backend_background_job(
2901 TurboTasksBackendJob::FollowUpSnapshot,
2902 );
2903 return;
2904 }
2905 }
2906 }
2907 }
2908 })
2909 }
2910
2911 fn try_read_own_task_cell(
2912 &self,
2913 task_id: TaskId,
2914 cell: CellId,
2915 options: ReadCellOptions,
2916 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2917 ) -> Result<TypedCellContent> {
2918 let mut ctx = self.execute_context(turbo_tasks);
2919 let task = ctx.task(task_id, TaskDataCategory::Data);
2920 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2921 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2922 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2923 } else {
2924 Ok(CellContent(None).into_typed(cell.type_id))
2925 }
2926 }
2927
2928 fn read_task_collectibles(
2929 &self,
2930 task_id: TaskId,
2931 collectible_type: TraitTypeId,
2932 reader_id: Option<TaskId>,
2933 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2934 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2935 let mut ctx = self.execute_context(turbo_tasks);
2936 let mut collectibles = AutoMap::default();
2937 {
2938 let mut task = ctx.task(task_id, TaskDataCategory::All);
2939 loop {
2941 let aggregation_number = get_aggregation_number(&task);
2942 if is_root_node(aggregation_number) {
2943 break;
2944 }
2945 drop(task);
2946 AggregationUpdateQueue::run(
2947 AggregationUpdateJob::UpdateAggregationNumber {
2948 task_id,
2949 base_aggregation_number: u32::MAX,
2950 distance: None,
2951 },
2952 &mut ctx,
2953 );
2954 task = ctx.task(task_id, TaskDataCategory::All);
2955 }
2956 for (collectible, count) in task.iter_aggregated_collectibles() {
2957 if *count > 0 && collectible.collectible_type == collectible_type {
2958 *collectibles
2959 .entry(RawVc::TaskCell(
2960 collectible.cell.task,
2961 collectible.cell.cell,
2962 ))
2963 .or_insert(0) += 1;
2964 }
2965 }
2966 for (&collectible, &count) in task.iter_collectibles() {
2967 if collectible.collectible_type == collectible_type {
2968 *collectibles
2969 .entry(RawVc::TaskCell(
2970 collectible.cell.task,
2971 collectible.cell.cell,
2972 ))
2973 .or_insert(0) += count;
2974 }
2975 }
2976 if let Some(reader_id) = reader_id {
2977 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2978 }
2979 }
2980 if let Some(reader_id) = reader_id {
2981 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2982 let target = CollectiblesRef {
2983 task: task_id,
2984 collectible_type,
2985 };
2986 if !reader.remove_outdated_collectibles_dependencies(&target) {
2987 let _ = reader.add_collectibles_dependencies(target);
2988 }
2989 }
2990 collectibles
2991 }
2992
2993 fn emit_collectible(
2994 &self,
2995 collectible_type: TraitTypeId,
2996 collectible: RawVc,
2997 task_id: TaskId,
2998 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2999 ) {
3000 self.assert_valid_collectible(task_id, collectible);
3001
3002 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3003 panic!("Collectibles need to be resolved");
3004 };
3005 let cell = CellRef {
3006 task: collectible_task,
3007 cell,
3008 };
3009 operation::UpdateCollectibleOperation::run(
3010 task_id,
3011 CollectibleRef {
3012 collectible_type,
3013 cell,
3014 },
3015 1,
3016 self.execute_context(turbo_tasks),
3017 );
3018 }
3019
3020 fn unemit_collectible(
3021 &self,
3022 collectible_type: TraitTypeId,
3023 collectible: RawVc,
3024 count: u32,
3025 task_id: TaskId,
3026 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3027 ) {
3028 self.assert_valid_collectible(task_id, collectible);
3029
3030 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3031 panic!("Collectibles need to be resolved");
3032 };
3033 let cell = CellRef {
3034 task: collectible_task,
3035 cell,
3036 };
3037 operation::UpdateCollectibleOperation::run(
3038 task_id,
3039 CollectibleRef {
3040 collectible_type,
3041 cell,
3042 },
3043 -(i32::try_from(count).unwrap()),
3044 self.execute_context(turbo_tasks),
3045 );
3046 }
3047
3048 fn update_task_cell(
3049 &self,
3050 task_id: TaskId,
3051 cell: CellId,
3052 is_serializable_cell_content: bool,
3053 content: CellContent,
3054 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3055 verification_mode: VerificationMode,
3056 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3057 ) {
3058 operation::UpdateCellOperation::run(
3059 task_id,
3060 cell,
3061 content,
3062 is_serializable_cell_content,
3063 updated_key_hashes,
3064 verification_mode,
3065 self.execute_context(turbo_tasks),
3066 );
3067 }
3068
3069 fn mark_own_task_as_finished(
3070 &self,
3071 task: TaskId,
3072 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3073 ) {
3074 let mut ctx = self.execute_context(turbo_tasks);
3075 let mut task = ctx.task(task, TaskDataCategory::Data);
3076 if let Some(InProgressState::InProgress(box InProgressStateInner {
3077 marked_as_completed,
3078 ..
3079 })) = task.get_in_progress_mut()
3080 {
3081 *marked_as_completed = true;
3082 }
3087 }
3088
3089 fn connect_task(
3090 &self,
3091 task: TaskId,
3092 parent_task: Option<TaskId>,
3093 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3094 ) {
3095 self.assert_not_persistent_calling_transient(parent_task, task, None);
3096 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3097 }
3098
3099 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3100 let task_id = self.transient_task_id_factory.get();
3101 {
3102 let mut task = self.storage.access_mut(task_id);
3103 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3104 }
3105 #[cfg(feature = "verify_aggregation_graph")]
3106 self.root_tasks.lock().insert(task_id);
3107 task_id
3108 }
3109
3110 fn dispose_root_task(
3111 &self,
3112 task_id: TaskId,
3113 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3114 ) {
3115 #[cfg(feature = "verify_aggregation_graph")]
3116 self.root_tasks.lock().remove(&task_id);
3117
3118 let mut ctx = self.execute_context(turbo_tasks);
3119 let mut task = ctx.task(task_id, TaskDataCategory::All);
3120 let is_dirty = task.is_dirty();
3121 let has_dirty_containers = task.has_dirty_containers();
3122 if is_dirty.is_some() || has_dirty_containers {
3123 if let Some(activeness_state) = task.get_activeness_mut() {
3124 activeness_state.unset_root_type();
3126 activeness_state.set_active_until_clean();
3127 };
3128 } else if let Some(activeness_state) = task.take_activeness() {
3129 activeness_state.all_clean_event.notify(usize::MAX);
3132 }
3133 }
3134
3135 #[cfg(feature = "verify_aggregation_graph")]
3136 fn verify_aggregation_graph(
3137 &self,
3138 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3139 idle: bool,
3140 ) {
3141 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3142 return;
3143 }
3144 use std::{collections::VecDeque, env, io::stdout};
3145
3146 use crate::backend::operation::{get_uppers, is_aggregating_node};
3147
3148 let mut ctx = self.execute_context(turbo_tasks);
3149 let root_tasks = self.root_tasks.lock().clone();
3150
3151 for task_id in root_tasks.into_iter() {
3152 let mut queue = VecDeque::new();
3153 let mut visited = FxHashSet::default();
3154 let mut aggregated_nodes = FxHashSet::default();
3155 let mut collectibles = FxHashMap::default();
3156 let root_task_id = task_id;
3157 visited.insert(task_id);
3158 aggregated_nodes.insert(task_id);
3159 queue.push_back(task_id);
3160 let mut counter = 0;
3161 while let Some(task_id) = queue.pop_front() {
3162 counter += 1;
3163 if counter % 100000 == 0 {
3164 println!(
3165 "queue={}, visited={}, aggregated_nodes={}",
3166 queue.len(),
3167 visited.len(),
3168 aggregated_nodes.len()
3169 );
3170 }
3171 let task = ctx.task(task_id, TaskDataCategory::All);
3172 if idle && !self.is_idle.load(Ordering::Relaxed) {
3173 return;
3174 }
3175
3176 let uppers = get_uppers(&task);
3177 if task_id != root_task_id
3178 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3179 {
3180 panic!(
3181 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3182 {:?})",
3183 task_id,
3184 task.get_task_description(),
3185 uppers
3186 );
3187 }
3188
3189 for (collectible, _) in task.iter_aggregated_collectibles() {
3190 collectibles
3191 .entry(*collectible)
3192 .or_insert_with(|| (false, Vec::new()))
3193 .1
3194 .push(task_id);
3195 }
3196
3197 for (&collectible, &value) in task.iter_collectibles() {
3198 if value > 0 {
3199 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3200 *flag = true
3201 } else {
3202 panic!(
3203 "Task {} has a collectible {:?} that is not in any upper task",
3204 task_id, collectible
3205 );
3206 }
3207 }
3208 }
3209
3210 let is_dirty = task.has_dirty();
3211 let has_dirty_container = task.has_dirty_containers();
3212 let should_be_in_upper = is_dirty || has_dirty_container;
3213
3214 let aggregation_number = get_aggregation_number(&task);
3215 if is_aggregating_node(aggregation_number) {
3216 aggregated_nodes.insert(task_id);
3217 }
3218 for child_id in task.iter_children() {
3225 if visited.insert(child_id) {
3227 queue.push_back(child_id);
3228 }
3229 }
3230 drop(task);
3231
3232 if should_be_in_upper {
3233 for upper_id in uppers {
3234 let upper = ctx.task(upper_id, TaskDataCategory::All);
3235 let in_upper = upper
3236 .get_aggregated_dirty_containers(&task_id)
3237 .is_some_and(|&dirty| dirty > 0);
3238 if !in_upper {
3239 let containers: Vec<_> = upper
3240 .iter_aggregated_dirty_containers()
3241 .map(|(&k, &v)| (k, v))
3242 .collect();
3243 let upper_task_desc = upper.get_task_description();
3244 drop(upper);
3245 panic!(
3246 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3247 ({})\nThese dirty containers are present:\n{:#?}",
3248 task_id,
3249 ctx.task(task_id, TaskDataCategory::Data)
3250 .get_task_description(),
3251 upper_id,
3252 upper_task_desc,
3253 containers,
3254 );
3255 }
3256 }
3257 }
3258 }
3259
3260 for (collectible, (flag, task_ids)) in collectibles {
3261 if !flag {
3262 use std::io::Write;
3263 let mut stdout = stdout().lock();
3264 writeln!(
3265 stdout,
3266 "{:?} that is not emitted in any child task but in these aggregated \
3267 tasks: {:#?}",
3268 collectible,
3269 task_ids
3270 .iter()
3271 .map(|t| format!(
3272 "{t} {}",
3273 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3274 ))
3275 .collect::<Vec<_>>()
3276 )
3277 .unwrap();
3278
3279 let task_id = collectible.cell.task;
3280 let mut queue = {
3281 let task = ctx.task(task_id, TaskDataCategory::All);
3282 get_uppers(&task)
3283 };
3284 let mut visited = FxHashSet::default();
3285 for &upper_id in queue.iter() {
3286 visited.insert(upper_id);
3287 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3288 }
3289 while let Some(task_id) = queue.pop() {
3290 let task = ctx.task(task_id, TaskDataCategory::All);
3291 let desc = task.get_task_description();
3292 let aggregated_collectible = task
3293 .get_aggregated_collectibles(&collectible)
3294 .copied()
3295 .unwrap_or_default();
3296 let uppers = get_uppers(&task);
3297 drop(task);
3298 writeln!(
3299 stdout,
3300 "upper {task_id} {desc} collectible={aggregated_collectible}"
3301 )
3302 .unwrap();
3303 if task_ids.contains(&task_id) {
3304 writeln!(
3305 stdout,
3306 "Task has an upper connection to an aggregated task that doesn't \
3307 reference it. Upper connection is invalid!"
3308 )
3309 .unwrap();
3310 }
3311 for upper_id in uppers {
3312 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3313 if !visited.contains(&upper_id) {
3314 queue.push(upper_id);
3315 }
3316 }
3317 }
3318 panic!("See stdout for more details");
3319 }
3320 }
3321 }
3322 }
3323
3324 fn assert_not_persistent_calling_transient(
3325 &self,
3326 parent_id: Option<TaskId>,
3327 child_id: TaskId,
3328 cell_id: Option<CellId>,
3329 ) {
3330 if let Some(parent_id) = parent_id
3331 && !parent_id.is_transient()
3332 && child_id.is_transient()
3333 {
3334 self.panic_persistent_calling_transient(
3335 self.debug_get_task_description(parent_id),
3336 self.debug_get_cached_task_type(child_id).as_deref(),
3337 cell_id,
3338 );
3339 }
3340 }
3341
3342 fn panic_persistent_calling_transient(
3343 &self,
3344 parent: String,
3345 child: Option<&CachedTaskType>,
3346 cell_id: Option<CellId>,
3347 ) {
3348 let transient_reason = if let Some(child) = child {
3349 Cow::Owned(format!(
3350 " The callee is transient because it depends on:\n{}",
3351 self.debug_trace_transient_task(child, cell_id),
3352 ))
3353 } else {
3354 Cow::Borrowed("")
3355 };
3356 panic!(
3357 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3358 parent,
3359 child.map_or("unknown", |t| t.get_name()),
3360 transient_reason,
3361 );
3362 }
3363
3364 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3365 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3367 let task_info = if let Some(col_task_ty) = collectible
3369 .try_get_task_id()
3370 .map(|t| self.debug_get_task_description(t))
3371 {
3372 Cow::Owned(format!(" (return type of {col_task_ty})"))
3373 } else {
3374 Cow::Borrowed("")
3375 };
3376 panic!("Collectible{task_info} must be a ResolvedVc")
3377 };
3378 if col_task_id.is_transient() && !task_id.is_transient() {
3379 let transient_reason =
3380 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3381 Cow::Owned(format!(
3382 ". The collectible is transient because it depends on:\n{}",
3383 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3384 ))
3385 } else {
3386 Cow::Borrowed("")
3387 };
3388 panic!(
3390 "Collectible is transient, transient collectibles cannot be emitted from \
3391 persistent tasks{transient_reason}",
3392 )
3393 }
3394 }
3395}
3396
3397impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3398 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3399 self.0.startup(turbo_tasks);
3400 }
3401
3402 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3403 self.0.stopping();
3404 }
3405
3406 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3407 self.0.stop(turbo_tasks);
3408 }
3409
3410 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3411 self.0.idle_start(turbo_tasks);
3412 }
3413
3414 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3415 self.0.idle_end();
3416 }
3417
3418 fn get_or_create_persistent_task(
3419 &self,
3420 task_type: CachedTaskType,
3421 parent_task: Option<TaskId>,
3422 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3423 ) -> TaskId {
3424 self.0
3425 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3426 }
3427
3428 fn get_or_create_transient_task(
3429 &self,
3430 task_type: CachedTaskType,
3431 parent_task: Option<TaskId>,
3432 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3433 ) -> TaskId {
3434 self.0
3435 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3436 }
3437
3438 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3439 self.0.invalidate_task(task_id, turbo_tasks);
3440 }
3441
3442 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3443 self.0.invalidate_tasks(tasks, turbo_tasks);
3444 }
3445
3446 fn invalidate_tasks_set(
3447 &self,
3448 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3449 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3450 ) {
3451 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3452 }
3453
3454 fn invalidate_serialization(
3455 &self,
3456 task_id: TaskId,
3457 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3458 ) {
3459 self.0.invalidate_serialization(task_id, turbo_tasks);
3460 }
3461
3462 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3463 self.0.task_execution_canceled(task, turbo_tasks)
3464 }
3465
3466 fn try_start_task_execution(
3467 &self,
3468 task_id: TaskId,
3469 priority: TaskPriority,
3470 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3471 ) -> Option<TaskExecutionSpec<'_>> {
3472 self.0
3473 .try_start_task_execution(task_id, priority, turbo_tasks)
3474 }
3475
3476 fn task_execution_completed(
3477 &self,
3478 task_id: TaskId,
3479 result: Result<RawVc, TurboTasksExecutionError>,
3480 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3481 #[cfg(feature = "verify_determinism")] stateful: bool,
3482 has_invalidator: bool,
3483 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3484 ) -> bool {
3485 self.0.task_execution_completed(
3486 task_id,
3487 result,
3488 cell_counters,
3489 #[cfg(feature = "verify_determinism")]
3490 stateful,
3491 has_invalidator,
3492 turbo_tasks,
3493 )
3494 }
3495
3496 type BackendJob = TurboTasksBackendJob;
3497
3498 fn run_backend_job<'a>(
3499 &'a self,
3500 job: Self::BackendJob,
3501 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3502 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3503 self.0.run_backend_job(job, turbo_tasks)
3504 }
3505
3506 fn try_read_task_output(
3507 &self,
3508 task_id: TaskId,
3509 reader: Option<TaskId>,
3510 options: ReadOutputOptions,
3511 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3512 ) -> Result<Result<RawVc, EventListener>> {
3513 self.0
3514 .try_read_task_output(task_id, reader, options, turbo_tasks)
3515 }
3516
3517 fn try_read_task_cell(
3518 &self,
3519 task_id: TaskId,
3520 cell: CellId,
3521 reader: Option<TaskId>,
3522 options: ReadCellOptions,
3523 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3524 ) -> Result<Result<TypedCellContent, EventListener>> {
3525 self.0
3526 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3527 }
3528
3529 fn try_read_own_task_cell(
3530 &self,
3531 task_id: TaskId,
3532 cell: CellId,
3533 options: ReadCellOptions,
3534 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3535 ) -> Result<TypedCellContent> {
3536 self.0
3537 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3538 }
3539
3540 fn read_task_collectibles(
3541 &self,
3542 task_id: TaskId,
3543 collectible_type: TraitTypeId,
3544 reader: Option<TaskId>,
3545 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3546 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3547 self.0
3548 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3549 }
3550
3551 fn emit_collectible(
3552 &self,
3553 collectible_type: TraitTypeId,
3554 collectible: RawVc,
3555 task_id: TaskId,
3556 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3557 ) {
3558 self.0
3559 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3560 }
3561
3562 fn unemit_collectible(
3563 &self,
3564 collectible_type: TraitTypeId,
3565 collectible: RawVc,
3566 count: u32,
3567 task_id: TaskId,
3568 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3569 ) {
3570 self.0
3571 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3572 }
3573
3574 fn update_task_cell(
3575 &self,
3576 task_id: TaskId,
3577 cell: CellId,
3578 is_serializable_cell_content: bool,
3579 content: CellContent,
3580 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3581 verification_mode: VerificationMode,
3582 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3583 ) {
3584 self.0.update_task_cell(
3585 task_id,
3586 cell,
3587 is_serializable_cell_content,
3588 content,
3589 updated_key_hashes,
3590 verification_mode,
3591 turbo_tasks,
3592 );
3593 }
3594
3595 fn mark_own_task_as_finished(
3596 &self,
3597 task_id: TaskId,
3598 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3599 ) {
3600 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3601 }
3602
3603 fn connect_task(
3604 &self,
3605 task: TaskId,
3606 parent_task: Option<TaskId>,
3607 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3608 ) {
3609 self.0.connect_task(task, parent_task, turbo_tasks);
3610 }
3611
3612 fn create_transient_task(
3613 &self,
3614 task_type: TransientTaskType,
3615 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3616 ) -> TaskId {
3617 self.0.create_transient_task(task_type)
3618 }
3619
3620 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3621 self.0.dispose_root_task(task_id, turbo_tasks);
3622 }
3623
3624 fn task_statistics(&self) -> &TaskStatisticsApi {
3625 &self.0.task_statistics
3626 }
3627
3628 fn is_tracking_dependencies(&self) -> bool {
3629 self.0.options.dependency_tracking
3630 }
3631
3632 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3633 self.0.get_task_name(task, turbo_tasks)
3634 }
3635}
3636
3637enum DebugTraceTransientTask {
3638 Cached {
3639 task_name: &'static str,
3640 cell_type_id: Option<ValueTypeId>,
3641 cause_self: Option<Box<DebugTraceTransientTask>>,
3642 cause_args: Vec<DebugTraceTransientTask>,
3643 },
3644 Collapsed {
3646 task_name: &'static str,
3647 cell_type_id: Option<ValueTypeId>,
3648 },
3649 Uncached {
3650 cell_type_id: Option<ValueTypeId>,
3651 },
3652}
3653
3654impl DebugTraceTransientTask {
3655 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3656 let indent = " ".repeat(level);
3657 f.write_str(&indent)?;
3658
3659 fn fmt_cell_type_id(
3660 f: &mut fmt::Formatter<'_>,
3661 cell_type_id: Option<ValueTypeId>,
3662 ) -> fmt::Result {
3663 if let Some(ty) = cell_type_id {
3664 write!(
3665 f,
3666 " (read cell of type {})",
3667 get_value_type(ty).ty.global_name
3668 )
3669 } else {
3670 Ok(())
3671 }
3672 }
3673
3674 match self {
3676 Self::Cached {
3677 task_name,
3678 cell_type_id,
3679 ..
3680 }
3681 | Self::Collapsed {
3682 task_name,
3683 cell_type_id,
3684 ..
3685 } => {
3686 f.write_str(task_name)?;
3687 fmt_cell_type_id(f, *cell_type_id)?;
3688 if matches!(self, Self::Collapsed { .. }) {
3689 f.write_str(" (collapsed)")?;
3690 }
3691 }
3692 Self::Uncached { cell_type_id } => {
3693 f.write_str("unknown transient task")?;
3694 fmt_cell_type_id(f, *cell_type_id)?;
3695 }
3696 }
3697 f.write_char('\n')?;
3698
3699 if let Self::Cached {
3701 cause_self,
3702 cause_args,
3703 ..
3704 } = self
3705 {
3706 if let Some(c) = cause_self {
3707 writeln!(f, "{indent} self:")?;
3708 c.fmt_indented(f, level + 1)?;
3709 }
3710 if !cause_args.is_empty() {
3711 writeln!(f, "{indent} args:")?;
3712 for c in cause_args {
3713 c.fmt_indented(f, level + 1)?;
3714 }
3715 }
3716 }
3717 Ok(())
3718 }
3719}
3720
3721impl fmt::Display for DebugTraceTransientTask {
3722 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3723 self.fmt_indented(f, 0)
3724 }
3725}
3726
3727fn far_future() -> Instant {
3729 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3734}
3735
3736fn encode_task_data(
3741 task: TaskId,
3742 data: &TaskStorage,
3743 category: SpecificTaskDataCategory,
3744 scratch_buffer: &mut TurboBincodeBuffer,
3745) -> Result<TurboBincodeBuffer> {
3746 scratch_buffer.clear();
3747 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3748 data.encode(category, &mut encoder)?;
3749
3750 if cfg!(feature = "verify_serialization") {
3751 TaskStorage::new()
3752 .decode(
3753 category,
3754 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3755 )
3756 .with_context(|| {
3757 format!(
3758 "expected to be able to decode serialized data for '{category:?}' information \
3759 for {task}"
3760 )
3761 })?;
3762 }
3763 Ok(SmallVec::from_slice(scratch_buffer))
3764}