1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17 time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33 backend::{
34 Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
35 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37 VerificationMode,
38 },
39 event::{Event, EventDescription, EventListener},
40 message_queue::{TimingEvent, TraceEvent},
41 registry::get_value_type,
42 scope::scope_and_block,
43 task_statistics::TaskStatisticsApi,
44 trace::TraceRawVcs,
45 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49 operation::AnyOperation,
50 storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55 backend::{
56 operation::{
57 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
59 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
60 connect_children, get_aggregation_number, get_uppers, is_root_node,
61 make_task_dirty_internal, prepare_new_children,
62 },
63 storage::Storage,
64 storage_schema::{TaskStorage, TaskStorageAccessors},
65 },
66 backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
67 data::{
68 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69 InProgressState, InProgressStateInner, OutputValue, TransientTask,
70 },
71 error::TaskError,
72 utils::{
73 arc_or_owned::ArcOrOwned,
74 dash_map_drop_contents::drop_contents,
75 dash_map_raw_entry::{RawEntry, raw_entry},
76 ptr_eq_arc::PtrEqArc,
77 shard_amount::compute_shard_amount,
78 },
79};
80
81const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
85
86const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
87
88static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92 .ok()
93 .and_then(|v| v.parse::<u64>().ok())
94 .map(Duration::from_millis)
95 .unwrap_or(Duration::from_secs(2))
96});
97
98struct SnapshotRequest {
99 snapshot_requested: bool,
100 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
101}
102
103impl SnapshotRequest {
104 fn new() -> Self {
105 Self {
106 snapshot_requested: false,
107 suspended_operations: FxHashSet::default(),
108 }
109 }
110}
111
112pub enum StorageMode {
113 ReadOnly,
115 ReadWrite,
118 ReadWriteOnShutdown,
121}
122
123pub struct BackendOptions {
124 pub dependency_tracking: bool,
129
130 pub active_tracking: bool,
136
137 pub storage_mode: Option<StorageMode>,
139
140 pub num_workers: Option<usize>,
143
144 pub small_preallocation: bool,
146}
147
148impl Default for BackendOptions {
149 fn default() -> Self {
150 Self {
151 dependency_tracking: true,
152 active_tracking: true,
153 storage_mode: Some(StorageMode::ReadWrite),
154 num_workers: None,
155 small_preallocation: false,
156 }
157 }
158}
159
160pub enum TurboTasksBackendJob {
161 InitialSnapshot,
162 FollowUpSnapshot,
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167struct TurboTasksBackendInner<B: BackingStorage> {
168 options: BackendOptions,
169
170 start_time: Instant,
171
172 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
173 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
174
175 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
176
177 storage: Storage,
178
179 in_progress_operations: AtomicUsize,
185
186 snapshot_request: Mutex<SnapshotRequest>,
187 operations_suspended: Condvar,
191 snapshot_completed: Condvar,
194 last_snapshot: AtomicU64,
196
197 stopping: AtomicBool,
198 stopping_event: Event,
199 idle_start_event: Event,
200 idle_end_event: Event,
201 #[cfg(feature = "verify_aggregation_graph")]
202 is_idle: AtomicBool,
203
204 task_statistics: TaskStatisticsApi,
205
206 backing_storage: B,
207
208 #[cfg(feature = "verify_aggregation_graph")]
209 root_tasks: Mutex<FxHashSet<TaskId>>,
210}
211
212impl<B: BackingStorage> TurboTasksBackend<B> {
213 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
214 Self(Arc::new(TurboTasksBackendInner::new(
215 options,
216 backing_storage,
217 )))
218 }
219
220 pub fn backing_storage(&self) -> &B {
221 &self.0.backing_storage
222 }
223}
224
225impl<B: BackingStorage> TurboTasksBackendInner<B> {
226 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
227 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
228 if !options.dependency_tracking {
229 options.active_tracking = false;
230 }
231 let small_preallocation = options.small_preallocation;
232 let next_task_id = backing_storage
233 .next_free_task_id()
234 .expect("Failed to get task id");
235 Self {
236 options,
237 start_time: Instant::now(),
238 persisted_task_id_factory: IdFactoryWithReuse::new(
239 next_task_id,
240 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
241 ),
242 transient_task_id_factory: IdFactoryWithReuse::new(
243 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
244 TaskId::MAX,
245 ),
246 task_cache: FxDashMap::default(),
247 storage: Storage::new(shard_amount, small_preallocation),
248 in_progress_operations: AtomicUsize::new(0),
249 snapshot_request: Mutex::new(SnapshotRequest::new()),
250 operations_suspended: Condvar::new(),
251 snapshot_completed: Condvar::new(),
252 last_snapshot: AtomicU64::new(0),
253 stopping: AtomicBool::new(false),
254 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
255 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
256 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
257 #[cfg(feature = "verify_aggregation_graph")]
258 is_idle: AtomicBool::new(false),
259 task_statistics: TaskStatisticsApi::default(),
260 backing_storage,
261 #[cfg(feature = "verify_aggregation_graph")]
262 root_tasks: Default::default(),
263 }
264 }
265
266 fn execute_context<'a>(
267 &'a self,
268 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
269 ) -> impl ExecuteContext<'a> {
270 ExecuteContextImpl::new(self, turbo_tasks)
271 }
272
273 fn suspending_requested(&self) -> bool {
274 self.should_persist()
275 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
276 }
277
278 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
279 #[cold]
280 fn operation_suspend_point_cold<B: BackingStorage>(
281 this: &TurboTasksBackendInner<B>,
282 suspend: impl FnOnce() -> AnyOperation,
283 ) {
284 let operation = Arc::new(suspend());
285 let mut snapshot_request = this.snapshot_request.lock();
286 if snapshot_request.snapshot_requested {
287 snapshot_request
288 .suspended_operations
289 .insert(operation.clone().into());
290 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
291 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
292 if value == SNAPSHOT_REQUESTED_BIT {
293 this.operations_suspended.notify_all();
294 }
295 this.snapshot_completed
296 .wait_while(&mut snapshot_request, |snapshot_request| {
297 snapshot_request.snapshot_requested
298 });
299 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
300 snapshot_request
301 .suspended_operations
302 .remove(&operation.into());
303 }
304 }
305
306 if self.suspending_requested() {
307 operation_suspend_point_cold(self, suspend);
308 }
309 }
310
311 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
312 if !self.should_persist() {
313 return OperationGuard { backend: None };
314 }
315 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
317 let mut snapshot_request = self.snapshot_request.lock();
318 if snapshot_request.snapshot_requested {
319 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
320 if value == SNAPSHOT_REQUESTED_BIT {
321 self.operations_suspended.notify_all();
322 }
323 self.snapshot_completed
324 .wait_while(&mut snapshot_request, |snapshot_request| {
325 snapshot_request.snapshot_requested
326 });
327 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
328 }
329 }
330 OperationGuard {
331 backend: Some(self),
332 }
333 }
334
335 fn should_persist(&self) -> bool {
336 matches!(
337 self.options.storage_mode,
338 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
339 )
340 }
341
342 fn should_restore(&self) -> bool {
343 self.options.storage_mode.is_some()
344 }
345
346 fn should_track_dependencies(&self) -> bool {
347 self.options.dependency_tracking
348 }
349
350 fn should_track_activeness(&self) -> bool {
351 self.options.active_tracking
352 }
353
354 fn track_cache_hit(&self, task_type: &CachedTaskType) {
355 self.task_statistics
356 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
357 }
358
359 fn track_cache_miss(&self, task_type: &CachedTaskType) {
360 self.task_statistics
361 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
362 }
363
364 fn task_error_to_turbo_tasks_execution_error(
369 &self,
370 error: &TaskError,
371 ctx: &mut impl ExecuteContext<'_>,
372 ) -> TurboTasksExecutionError {
373 match error {
374 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
375 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
376 message: item.message.clone(),
377 source: item
378 .source
379 .as_ref()
380 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
381 })),
382 TaskError::LocalTaskContext(local_task_context) => {
383 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
384 name: local_task_context.name.clone(),
385 source: local_task_context
386 .source
387 .as_ref()
388 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
389 }))
390 }
391 TaskError::TaskChain(chain) => {
392 let task_id = chain.last().unwrap();
393 let error = {
394 let task = ctx.task(*task_id, TaskDataCategory::Meta);
395 if let Some(OutputValue::Error(error)) = task.get_output() {
396 Some(error.clone())
397 } else {
398 None
399 }
400 };
401 let error = error.map_or_else(
402 || {
403 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
405 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
406 "Error no longer available",
407 )),
408 location: None,
409 }))
410 },
411 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
412 );
413 let mut current_error = error;
414 for &task_id in chain.iter().rev() {
415 current_error =
416 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
417 task_id,
418 source: Some(current_error),
419 turbo_tasks: ctx.turbo_tasks(),
420 }));
421 }
422 current_error
423 }
424 }
425 }
426}
427
428pub(crate) struct OperationGuard<'a, B: BackingStorage> {
429 backend: Option<&'a TurboTasksBackendInner<B>>,
430}
431
432impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
433 fn drop(&mut self) {
434 if let Some(backend) = self.backend {
435 let fetch_sub = backend
436 .in_progress_operations
437 .fetch_sub(1, Ordering::AcqRel);
438 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
439 backend.operations_suspended.notify_all();
440 }
441 }
442 }
443}
444
445struct TaskExecutionCompletePrepareResult {
447 pub new_children: FxHashSet<TaskId>,
448 pub is_now_immutable: bool,
449 #[cfg(feature = "verify_determinism")]
450 pub no_output_set: bool,
451 pub new_output: Option<OutputValue>,
452 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
453}
454
455impl<B: BackingStorage> TurboTasksBackendInner<B> {
457 fn connect_child(
458 &self,
459 parent_task: Option<TaskId>,
460 child_task: TaskId,
461 task_type: Option<ArcOrOwned<CachedTaskType>>,
462 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
463 ) {
464 operation::ConnectChildOperation::run(
465 parent_task,
466 child_task,
467 task_type,
468 self.execute_context(turbo_tasks),
469 );
470 }
471
472 fn try_read_task_output(
473 self: &Arc<Self>,
474 task_id: TaskId,
475 reader: Option<TaskId>,
476 options: ReadOutputOptions,
477 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
478 ) -> Result<Result<RawVc, EventListener>> {
479 self.assert_not_persistent_calling_transient(reader, task_id, None);
480
481 let mut ctx = self.execute_context(turbo_tasks);
482 let need_reader_task = if self.should_track_dependencies()
483 && !matches!(options.tracking, ReadTracking::Untracked)
484 && reader.is_some_and(|reader_id| reader_id != task_id)
485 && let Some(reader_id) = reader
486 && reader_id != task_id
487 {
488 Some(reader_id)
489 } else {
490 None
491 };
492 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
493 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
497 (task, Some(reader))
498 } else {
499 (ctx.task(task_id, TaskDataCategory::All), None)
500 };
501
502 fn listen_to_done_event(
503 reader_description: Option<EventDescription>,
504 tracking: ReadTracking,
505 done_event: &Event,
506 ) -> EventListener {
507 done_event.listen_with_note(move || {
508 move || {
509 if let Some(reader_description) = reader_description.as_ref() {
510 format!(
511 "try_read_task_output from {} ({})",
512 reader_description, tracking
513 )
514 } else {
515 format!("try_read_task_output ({})", tracking)
516 }
517 }
518 })
519 }
520
521 fn check_in_progress(
522 task: &impl TaskGuard,
523 reader_description: Option<EventDescription>,
524 tracking: ReadTracking,
525 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
526 {
527 match task.get_in_progress() {
528 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
529 listen_to_done_event(reader_description, tracking, done_event),
530 ))),
531 Some(InProgressState::InProgress(box InProgressStateInner {
532 done_event, ..
533 })) => Some(Ok(Err(listen_to_done_event(
534 reader_description,
535 tracking,
536 done_event,
537 )))),
538 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
539 "{} was canceled",
540 task.get_task_description()
541 ))),
542 None => None,
543 }
544 }
545
546 if matches!(options.consistency, ReadConsistency::Strong) {
547 loop {
549 let aggregation_number = get_aggregation_number(&task);
550 if is_root_node(aggregation_number) {
551 break;
552 }
553 drop(task);
554 drop(reader_task);
555 {
556 let _span = tracing::trace_span!(
557 "make root node for strongly consistent read",
558 %task_id
559 )
560 .entered();
561 AggregationUpdateQueue::run(
562 AggregationUpdateJob::UpdateAggregationNumber {
563 task_id,
564 base_aggregation_number: u32::MAX,
565 distance: None,
566 },
567 &mut ctx,
568 );
569 }
570 (task, reader_task) = if let Some(reader_id) = need_reader_task {
571 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
573 (task, Some(reader))
574 } else {
575 (ctx.task(task_id, TaskDataCategory::All), None)
576 }
577 }
578
579 let is_dirty = task.is_dirty();
580
581 let has_dirty_containers = task.has_dirty_containers();
583 if has_dirty_containers || is_dirty.is_some() {
584 let activeness = task.get_activeness_mut();
585 let mut task_ids_to_schedule: Vec<_> = Vec::new();
586 let activeness = if let Some(activeness) = activeness {
588 activeness.set_active_until_clean();
592 activeness
593 } else {
594 if ctx.should_track_activeness() {
598 task_ids_to_schedule = task.dirty_containers().collect();
600 task_ids_to_schedule.push(task_id);
601 }
602 let activeness =
603 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
604 activeness.set_active_until_clean();
605 activeness
606 };
607 let listener = activeness.all_clean_event.listen_with_note(move || {
608 let this = self.clone();
609 let tt = turbo_tasks.pin();
610 move || {
611 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
612 let mut ctx = this.execute_context(tt);
613 let mut visited = FxHashSet::default();
614 fn indent(s: &str) -> String {
615 s.split_inclusive('\n')
616 .flat_map(|line: &str| [" ", line].into_iter())
617 .collect::<String>()
618 }
619 fn get_info(
620 ctx: &mut impl ExecuteContext<'_>,
621 task_id: TaskId,
622 parent_and_count: Option<(TaskId, i32)>,
623 visited: &mut FxHashSet<TaskId>,
624 ) -> String {
625 let task = ctx.task(task_id, TaskDataCategory::All);
626 let is_dirty = task.is_dirty();
627 let in_progress =
628 task.get_in_progress()
629 .map_or("not in progress", |p| match p {
630 InProgressState::InProgress(_) => "in progress",
631 InProgressState::Scheduled { .. } => "scheduled",
632 InProgressState::Canceled => "canceled",
633 });
634 let activeness = task.get_activeness().map_or_else(
635 || "not active".to_string(),
636 |activeness| format!("{activeness:?}"),
637 );
638 let aggregation_number = get_aggregation_number(&task);
639 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
640 {
641 let uppers = get_uppers(&task);
642 !uppers.contains(&parent_task_id)
643 } else {
644 false
645 };
646
647 let has_dirty_containers = task.has_dirty_containers();
649
650 let task_description = task.get_task_description();
651 let is_dirty_label = if let Some(parent_priority) = is_dirty {
652 format!(", dirty({parent_priority})")
653 } else {
654 String::new()
655 };
656 let has_dirty_containers_label = if has_dirty_containers {
657 ", dirty containers"
658 } else {
659 ""
660 };
661 let count = if let Some((_, count)) = parent_and_count {
662 format!(" {count}")
663 } else {
664 String::new()
665 };
666 let mut info = format!(
667 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
668 {in_progress}, \
669 {activeness}{is_dirty_label}{has_dirty_containers_label})",
670 );
671 let children: Vec<_> = task.dirty_containers_with_count().collect();
672 drop(task);
673
674 if missing_upper {
675 info.push_str("\n ERROR: missing upper connection");
676 }
677
678 if has_dirty_containers || !children.is_empty() {
679 writeln!(info, "\n dirty tasks:").unwrap();
680
681 for (child_task_id, count) in children {
682 let task_description = ctx
683 .task(child_task_id, TaskDataCategory::Data)
684 .get_task_description();
685 if visited.insert(child_task_id) {
686 let child_info = get_info(
687 ctx,
688 child_task_id,
689 Some((task_id, count)),
690 visited,
691 );
692 info.push_str(&indent(&child_info));
693 if !info.ends_with('\n') {
694 info.push('\n');
695 }
696 } else {
697 writeln!(
698 info,
699 " {child_task_id} {task_description} {count} \
700 (already visited)"
701 )
702 .unwrap();
703 }
704 }
705 }
706 info
707 }
708 let info = get_info(&mut ctx, task_id, None, &mut visited);
709 format!(
710 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
711 )
712 }
713 });
714 drop(reader_task);
715 drop(task);
716 if !task_ids_to_schedule.is_empty() {
717 let mut queue = AggregationUpdateQueue::new();
718 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
719 queue.execute(&mut ctx);
720 }
721
722 return Ok(Err(listener));
723 }
724 }
725
726 let reader_description = reader_task
727 .as_ref()
728 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
729 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
730 {
731 return value;
732 }
733
734 if let Some(output) = task.get_output() {
735 let result = match output {
736 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
737 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
738 OutputValue::Error(error) => Err(error.clone()),
739 };
740 if let Some(mut reader_task) = reader_task.take()
741 && options.tracking.should_track(result.is_err())
742 && (!task.immutable() || cfg!(feature = "verify_immutable"))
743 {
744 #[cfg(feature = "trace_task_output_dependencies")]
745 let _span = tracing::trace_span!(
746 "add output dependency",
747 task = %task_id,
748 dependent_task = ?reader
749 )
750 .entered();
751 let mut queue = LeafDistanceUpdateQueue::new();
752 let reader = reader.unwrap();
753 if task.add_output_dependent(reader) {
754 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
756 let reader_leaf_distance =
757 reader_task.get_leaf_distance().copied().unwrap_or_default();
758 if reader_leaf_distance.distance <= leaf_distance.distance {
759 queue.push(
760 reader,
761 leaf_distance.distance,
762 leaf_distance.max_distance_in_buffer,
763 );
764 }
765 }
766
767 drop(task);
768
769 if !reader_task.remove_outdated_output_dependencies(&task_id) {
775 let _ = reader_task.add_output_dependencies(task_id);
776 }
777 drop(reader_task);
778
779 queue.execute(&mut ctx);
780 } else {
781 drop(task);
782 }
783
784 return result.map_err(|error| {
785 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
786 .with_task_context(task_id, turbo_tasks.pin())
787 .into()
788 });
789 }
790 drop(reader_task);
791
792 let note = EventDescription::new(|| {
793 move || {
794 if let Some(reader) = reader_description.as_ref() {
795 format!("try_read_task_output (recompute) from {reader}",)
796 } else {
797 "try_read_task_output (recompute, untracked)".to_string()
798 }
799 }
800 });
801
802 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
804 TaskExecutionReason::OutputNotAvailable,
805 EventDescription::new(|| task.get_task_desc_fn()),
806 note,
807 );
808
809 let old = task.set_in_progress(in_progress_state);
812 debug_assert!(old.is_none(), "InProgress already exists");
813 ctx.schedule_task(task, TaskPriority::Initial);
814
815 Ok(Err(listener))
816 }
817
818 fn try_read_task_cell(
819 &self,
820 task_id: TaskId,
821 reader: Option<TaskId>,
822 cell: CellId,
823 options: ReadCellOptions,
824 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
825 ) -> Result<Result<TypedCellContent, EventListener>> {
826 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
827
828 fn add_cell_dependency(
829 task_id: TaskId,
830 mut task: impl TaskGuard,
831 reader: Option<TaskId>,
832 reader_task: Option<impl TaskGuard>,
833 cell: CellId,
834 key: Option<u64>,
835 ) {
836 if let Some(mut reader_task) = reader_task
837 && (!task.immutable() || cfg!(feature = "verify_immutable"))
838 {
839 let reader = reader.unwrap();
840 let _ = task.add_cell_dependents((cell, key, reader));
841 drop(task);
842
843 let target = CellRef {
849 task: task_id,
850 cell,
851 };
852 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
853 let _ = reader_task.add_cell_dependencies((target, key));
854 }
855 drop(reader_task);
856 }
857 }
858
859 let ReadCellOptions {
860 is_serializable_cell_content,
861 tracking,
862 final_read_hint,
863 } = options;
864
865 let mut ctx = self.execute_context(turbo_tasks);
866 let (mut task, reader_task) = if self.should_track_dependencies()
867 && !matches!(tracking, ReadCellTracking::Untracked)
868 && let Some(reader_id) = reader
869 && reader_id != task_id
870 {
871 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
875 (task, Some(reader))
876 } else {
877 (ctx.task(task_id, TaskDataCategory::All), None)
878 };
879
880 let content = if final_read_hint {
881 task.remove_cell_data(is_serializable_cell_content, cell)
882 } else {
883 task.get_cell_data(is_serializable_cell_content, cell)
884 };
885 if let Some(content) = content {
886 if tracking.should_track(false) {
887 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
888 }
889 return Ok(Ok(TypedCellContent(
890 cell.type_id,
891 CellContent(Some(content.reference)),
892 )));
893 }
894
895 let in_progress = task.get_in_progress();
896 if matches!(
897 in_progress,
898 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
899 ) {
900 return Ok(Err(self
901 .listen_to_cell(&mut task, task_id, &reader_task, cell)
902 .0));
903 }
904 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
905
906 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
908 let Some(max_id) = max_id else {
909 let task_desc = task.get_task_description();
910 if tracking.should_track(true) {
911 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
912 }
913 bail!(
914 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
915 );
916 };
917 if cell.index >= max_id {
918 let task_desc = task.get_task_description();
919 if tracking.should_track(true) {
920 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
921 }
922 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
923 }
924
925 if is_cancelled {
931 bail!("{} was canceled", task.get_task_description());
932 }
933
934 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
936 drop(reader_task);
937 if !new_listener {
938 return Ok(Err(listener));
939 }
940
941 let _span = tracing::trace_span!(
942 "recomputation",
943 cell_type = get_value_type(cell.type_id).ty.global_name,
944 cell_index = cell.index
945 )
946 .entered();
947
948 let _ = task.add_scheduled(
949 TaskExecutionReason::CellNotAvailable,
950 EventDescription::new(|| task.get_task_desc_fn()),
951 );
952 ctx.schedule_task(task, TaskPriority::Initial);
953
954 Ok(Err(listener))
955 }
956
957 fn listen_to_cell(
958 &self,
959 task: &mut impl TaskGuard,
960 task_id: TaskId,
961 reader_task: &Option<impl TaskGuard>,
962 cell: CellId,
963 ) -> (EventListener, bool) {
964 let note = || {
965 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
966 move || {
967 if let Some(reader_desc) = reader_desc.as_ref() {
968 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
969 } else {
970 "try_read_task_cell (in progress, untracked)".to_string()
971 }
972 }
973 };
974 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
975 let listener = in_progress.event.listen_with_note(note);
977 return (listener, false);
978 }
979 let in_progress = InProgressCellState::new(task_id, cell);
980 let listener = in_progress.event.listen_with_note(note);
981 let old = task.insert_in_progress_cells(cell, in_progress);
982 debug_assert!(old.is_none(), "InProgressCell already exists");
983 (listener, true)
984 }
985
986 fn snapshot_and_persist(
987 &self,
988 parent_span: Option<tracing::Id>,
989 reason: &str,
990 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
991 ) -> Option<(Instant, bool)> {
992 let snapshot_span =
993 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
994 .entered();
995 let start = Instant::now();
996 let wall_start = SystemTime::now();
1000 debug_assert!(self.should_persist());
1001
1002 let suspended_operations;
1003 {
1004 let _span = tracing::info_span!("blocking").entered();
1005 let mut snapshot_request = self.snapshot_request.lock();
1006 snapshot_request.snapshot_requested = true;
1007 let active_operations = self
1008 .in_progress_operations
1009 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1010 if active_operations != 0 {
1011 self.operations_suspended
1012 .wait_while(&mut snapshot_request, |_| {
1013 self.in_progress_operations.load(Ordering::Relaxed)
1014 != SNAPSHOT_REQUESTED_BIT
1015 });
1016 }
1017 suspended_operations = snapshot_request
1018 .suspended_operations
1019 .iter()
1020 .map(|op| op.arc().clone())
1021 .collect::<Vec<_>>();
1022 }
1023 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
1026 let mut snapshot_request = self.snapshot_request.lock();
1027 snapshot_request.snapshot_requested = false;
1028 self.in_progress_operations
1029 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1030 self.snapshot_completed.notify_all();
1031 let snapshot_time = Instant::now();
1032 drop(snapshot_request);
1033
1034 if !has_modifications {
1035 drop(snapshot_guard);
1038 return Some((start, false));
1039 }
1040
1041 #[cfg(feature = "print_cache_item_size")]
1042 #[derive(Default)]
1043 struct TaskCacheStats {
1044 data: usize,
1045 #[cfg(feature = "print_cache_item_size_with_compressed")]
1046 data_compressed: usize,
1047 data_count: usize,
1048 meta: usize,
1049 #[cfg(feature = "print_cache_item_size_with_compressed")]
1050 meta_compressed: usize,
1051 meta_count: usize,
1052 upper_count: usize,
1053 collectibles_count: usize,
1054 aggregated_collectibles_count: usize,
1055 children_count: usize,
1056 followers_count: usize,
1057 collectibles_dependents_count: usize,
1058 aggregated_dirty_containers_count: usize,
1059 output_size: usize,
1060 }
1061 #[cfg(feature = "print_cache_item_size")]
1064 struct FormatSizes {
1065 size: usize,
1066 #[cfg(feature = "print_cache_item_size_with_compressed")]
1067 compressed_size: usize,
1068 }
1069 #[cfg(feature = "print_cache_item_size")]
1070 impl std::fmt::Display for FormatSizes {
1071 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1072 use turbo_tasks::util::FormatBytes;
1073 #[cfg(feature = "print_cache_item_size_with_compressed")]
1074 {
1075 write!(
1076 f,
1077 "{} ({} compressed)",
1078 FormatBytes(self.size),
1079 FormatBytes(self.compressed_size)
1080 )
1081 }
1082 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1083 {
1084 write!(f, "{}", FormatBytes(self.size))
1085 }
1086 }
1087 }
1088 #[cfg(feature = "print_cache_item_size")]
1089 impl TaskCacheStats {
1090 #[cfg(feature = "print_cache_item_size_with_compressed")]
1091 fn compressed_size(data: &[u8]) -> Result<usize> {
1092 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1093 data,
1094 &mut Vec::new(),
1095 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1096 )?)
1097 }
1098
1099 fn add_data(&mut self, data: &[u8]) {
1100 self.data += data.len();
1101 #[cfg(feature = "print_cache_item_size_with_compressed")]
1102 {
1103 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1104 }
1105 self.data_count += 1;
1106 }
1107
1108 fn add_meta(&mut self, data: &[u8]) {
1109 self.meta += data.len();
1110 #[cfg(feature = "print_cache_item_size_with_compressed")]
1111 {
1112 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1113 }
1114 self.meta_count += 1;
1115 }
1116
1117 fn add_counts(&mut self, storage: &TaskStorage) {
1118 let counts = storage.meta_counts();
1119 self.upper_count += counts.upper;
1120 self.collectibles_count += counts.collectibles;
1121 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1122 self.children_count += counts.children;
1123 self.followers_count += counts.followers;
1124 self.collectibles_dependents_count += counts.collectibles_dependents;
1125 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1126 if let Some(output) = storage.get_output() {
1127 use turbo_bincode::turbo_bincode_encode;
1128
1129 self.output_size += turbo_bincode_encode(&output)
1130 .map(|data| data.len())
1131 .unwrap_or(0);
1132 }
1133 }
1134
1135 fn task_name(storage: &TaskStorage) -> String {
1137 storage
1138 .get_persistent_task_type()
1139 .map(|t| t.to_string())
1140 .unwrap_or_else(|| "<unknown>".to_string())
1141 }
1142
1143 fn sort_key(&self) -> usize {
1146 #[cfg(feature = "print_cache_item_size_with_compressed")]
1147 {
1148 self.data_compressed + self.meta_compressed
1149 }
1150 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1151 {
1152 self.data + self.meta
1153 }
1154 }
1155
1156 fn format_total(&self) -> FormatSizes {
1157 FormatSizes {
1158 size: self.data + self.meta,
1159 #[cfg(feature = "print_cache_item_size_with_compressed")]
1160 compressed_size: self.data_compressed + self.meta_compressed,
1161 }
1162 }
1163
1164 fn format_data(&self) -> FormatSizes {
1165 FormatSizes {
1166 size: self.data,
1167 #[cfg(feature = "print_cache_item_size_with_compressed")]
1168 compressed_size: self.data_compressed,
1169 }
1170 }
1171
1172 fn format_avg_data(&self) -> FormatSizes {
1173 FormatSizes {
1174 size: self.data.checked_div(self.data_count).unwrap_or(0),
1175 #[cfg(feature = "print_cache_item_size_with_compressed")]
1176 compressed_size: self
1177 .data_compressed
1178 .checked_div(self.data_count)
1179 .unwrap_or(0),
1180 }
1181 }
1182
1183 fn format_meta(&self) -> FormatSizes {
1184 FormatSizes {
1185 size: self.meta,
1186 #[cfg(feature = "print_cache_item_size_with_compressed")]
1187 compressed_size: self.meta_compressed,
1188 }
1189 }
1190
1191 fn format_avg_meta(&self) -> FormatSizes {
1192 FormatSizes {
1193 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1194 #[cfg(feature = "print_cache_item_size_with_compressed")]
1195 compressed_size: self
1196 .meta_compressed
1197 .checked_div(self.meta_count)
1198 .unwrap_or(0),
1199 }
1200 }
1201 }
1202 #[cfg(feature = "print_cache_item_size")]
1203 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1204 Mutex::new(FxHashMap::default());
1205
1206 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1213 let encode_category = |task_id: TaskId,
1214 data: &TaskStorage,
1215 category: SpecificTaskDataCategory,
1216 buffer: &mut TurboBincodeBuffer|
1217 -> Option<TurboBincodeBuffer> {
1218 match encode_task_data(task_id, data, category, buffer) {
1219 Ok(encoded) => {
1220 #[cfg(feature = "print_cache_item_size")]
1221 {
1222 let mut stats = task_cache_stats.lock();
1223 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1224 match category {
1225 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1226 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1227 }
1228 }
1229 Some(encoded)
1230 }
1231 Err(err) => {
1232 panic!(
1233 "Serializing task {} failed ({:?}): {:?}",
1234 self.debug_get_task_description(task_id),
1235 category,
1236 err
1237 );
1238 }
1239 }
1240 };
1241 if task_id.is_transient() {
1242 unreachable!("transient task_ids should never be enqueued to be persisted");
1243 }
1244
1245 let encode_meta = inner.flags.meta_modified();
1246 let encode_data = inner.flags.data_modified();
1247
1248 #[cfg(feature = "print_cache_item_size")]
1249 if encode_data || encode_meta {
1250 task_cache_stats
1251 .lock()
1252 .entry(TaskCacheStats::task_name(inner))
1253 .or_default()
1254 .add_counts(inner);
1255 }
1256
1257 let meta = if encode_meta {
1258 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1259 } else {
1260 None
1261 };
1262
1263 let data = if encode_data {
1264 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1265 } else {
1266 None
1267 };
1268 let task_type_hash = if inner.flags.new_task() {
1269 let task_type = inner.get_persistent_task_type().expect(
1270 "It is not possible for a new_task to not have a persistent_task_type. Task \
1271 creation for persistent tasks uses a single ExecutionContextImpl for \
1272 creating the task (which sets new_task) and connect_child (which sets \
1273 persistent_task_type) and take_snapshot waits for all operations to complete \
1274 or suspend before we start snapshotting. So task creation will always set \
1275 the task_type.",
1276 );
1277 Some(compute_task_type_hash(task_type))
1278 } else {
1279 None
1280 };
1281
1282 SnapshotItem {
1283 task_id,
1284 meta,
1285 data,
1286 task_type_hash,
1287 }
1288 };
1289
1290 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1291
1292 drop(snapshot_span);
1293 let snapshot_duration = start.elapsed();
1294 let task_count = task_snapshots.len();
1295
1296 if task_snapshots.is_empty() {
1297 std::hint::cold_path();
1300 return Some((snapshot_time, false));
1301 }
1302
1303 let persist_start = Instant::now();
1304 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1305 {
1306 if let Err(err) = self
1307 .backing_storage
1308 .save_snapshot(suspended_operations, task_snapshots)
1309 {
1310 eprintln!("Persisting failed: {err:?}");
1311 return None;
1312 }
1313 #[cfg(feature = "print_cache_item_size")]
1314 {
1315 let mut task_cache_stats = task_cache_stats
1316 .into_inner()
1317 .into_iter()
1318 .collect::<Vec<_>>();
1319 if !task_cache_stats.is_empty() {
1320 use turbo_tasks::util::FormatBytes;
1321
1322 use crate::utils::markdown_table::print_markdown_table;
1323
1324 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1325 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1326 });
1327
1328 println!(
1329 "Task cache stats: {}",
1330 FormatSizes {
1331 size: task_cache_stats
1332 .iter()
1333 .map(|(_, s)| s.data + s.meta)
1334 .sum::<usize>(),
1335 #[cfg(feature = "print_cache_item_size_with_compressed")]
1336 compressed_size: task_cache_stats
1337 .iter()
1338 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1339 .sum::<usize>()
1340 },
1341 );
1342
1343 print_markdown_table(
1344 [
1345 "Task",
1346 " Total Size",
1347 " Data Size",
1348 " Data Count x Avg",
1349 " Data Count x Avg",
1350 " Meta Size",
1351 " Meta Count x Avg",
1352 " Meta Count x Avg",
1353 " Uppers",
1354 " Coll",
1355 " Agg Coll",
1356 " Children",
1357 " Followers",
1358 " Coll Deps",
1359 " Agg Dirty",
1360 " Output Size",
1361 ],
1362 task_cache_stats.iter(),
1363 |(task_desc, stats)| {
1364 [
1365 task_desc.to_string(),
1366 format!(" {}", stats.format_total()),
1367 format!(" {}", stats.format_data()),
1368 format!(" {} x", stats.data_count),
1369 format!("{}", stats.format_avg_data()),
1370 format!(" {}", stats.format_meta()),
1371 format!(" {} x", stats.meta_count),
1372 format!("{}", stats.format_avg_meta()),
1373 format!(" {}", stats.upper_count),
1374 format!(" {}", stats.collectibles_count),
1375 format!(" {}", stats.aggregated_collectibles_count),
1376 format!(" {}", stats.children_count),
1377 format!(" {}", stats.followers_count),
1378 format!(" {}", stats.collectibles_dependents_count),
1379 format!(" {}", stats.aggregated_dirty_containers_count),
1380 format!(" {}", FormatBytes(stats.output_size)),
1381 ]
1382 },
1383 );
1384 }
1385 }
1386 }
1387
1388 let elapsed = start.elapsed();
1389 let persist_duration = persist_start.elapsed();
1390 if elapsed > Duration::from_secs(10) {
1392 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1393 "Finished writing to filesystem cache".to_string(),
1394 elapsed,
1395 )));
1396 }
1397
1398 let wall_start_ms = wall_start
1399 .duration_since(SystemTime::UNIX_EPOCH)
1400 .unwrap_or_default()
1401 .as_secs_f64()
1403 * 1000.0;
1404 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1405 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1406 "turbopack-persistence",
1407 wall_start_ms,
1408 wall_end_ms,
1409 vec![
1410 ("reason", serde_json::Value::from(reason)),
1411 (
1412 "snapshot_duration_ms",
1413 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1414 ),
1415 (
1416 "persist_duration_ms",
1417 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1418 ),
1419 ("task_count", serde_json::Value::from(task_count)),
1420 ],
1421 )));
1422
1423 Some((snapshot_time, true))
1424 }
1425
1426 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1427 if self.should_restore() {
1428 let uncompleted_operations = self
1432 .backing_storage
1433 .uncompleted_operations()
1434 .expect("Failed to get uncompleted operations");
1435 if !uncompleted_operations.is_empty() {
1436 let mut ctx = self.execute_context(turbo_tasks);
1437 for op in uncompleted_operations {
1438 op.execute(&mut ctx);
1439 }
1440 }
1441 }
1442
1443 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1446 let _span = trace_span!("persisting background job").entered();
1448 let _span = tracing::info_span!("thread").entered();
1449 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1450 }
1451 }
1452
1453 fn stopping(&self) {
1454 self.stopping.store(true, Ordering::Release);
1455 self.stopping_event.notify(usize::MAX);
1456 }
1457
1458 #[allow(unused_variables)]
1459 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1460 #[cfg(feature = "verify_aggregation_graph")]
1461 {
1462 self.is_idle.store(false, Ordering::Release);
1463 self.verify_aggregation_graph(turbo_tasks, false);
1464 }
1465 if self.should_persist() {
1466 self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1467 }
1468 drop_contents(&self.task_cache);
1469 self.storage.drop_contents();
1470 if let Err(err) = self.backing_storage.shutdown() {
1471 println!("Shutting down failed: {err}");
1472 }
1473 }
1474
1475 #[allow(unused_variables)]
1476 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1477 self.idle_start_event.notify(usize::MAX);
1478
1479 #[cfg(feature = "verify_aggregation_graph")]
1480 {
1481 use tokio::select;
1482
1483 self.is_idle.store(true, Ordering::Release);
1484 let this = self.clone();
1485 let turbo_tasks = turbo_tasks.pin();
1486 tokio::task::spawn(async move {
1487 select! {
1488 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1489 }
1491 _ = this.idle_end_event.listen() => {
1492 return;
1493 }
1494 }
1495 if !this.is_idle.load(Ordering::Relaxed) {
1496 return;
1497 }
1498 this.verify_aggregation_graph(&*turbo_tasks, true);
1499 });
1500 }
1501 }
1502
1503 fn idle_end(&self) {
1504 #[cfg(feature = "verify_aggregation_graph")]
1505 self.is_idle.store(false, Ordering::Release);
1506 self.idle_end_event.notify(usize::MAX);
1507 }
1508
1509 fn get_or_create_persistent_task(
1510 &self,
1511 task_type: CachedTaskType,
1512 parent_task: Option<TaskId>,
1513 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1514 ) -> TaskId {
1515 let is_root = task_type.native_fn.is_root;
1516
1517 if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1521 self.track_cache_hit(&task_type);
1522 self.connect_child(
1523 parent_task,
1524 task_id,
1525 Some(ArcOrOwned::Owned(task_type)),
1526 turbo_tasks,
1527 );
1528 return task_id;
1529 }
1530
1531 let mut ctx = self.execute_context(turbo_tasks);
1533
1534 let mut is_new = false;
1535 let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1536 self.track_cache_hit(&task_type);
1539 let task_type = match raw_entry(&self.task_cache, &task_type) {
1540 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1541 RawEntry::Vacant(e) => {
1542 let task_type = Arc::new(task_type);
1543 e.insert(task_type.clone(), task_id);
1544 ArcOrOwned::Arc(task_type)
1545 }
1546 };
1547 (task_id, task_type)
1548 } else {
1549 let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1552 RawEntry::Occupied(e) => {
1553 let task_id = *e.get();
1556 drop(e);
1557 self.track_cache_hit(&task_type);
1558 (task_id, ArcOrOwned::Owned(task_type))
1559 }
1560 RawEntry::Vacant(e) => {
1561 let task_type = Arc::new(task_type);
1563 let task_id = self.persisted_task_id_factory.get();
1564 self.storage.initialize_new_task(task_id);
1568 e.insert(task_type.clone(), task_id);
1569 self.track_cache_miss(&task_type);
1571 is_new = true;
1572 (task_id, ArcOrOwned::Arc(task_type))
1573 }
1574 };
1575 (task_id, task_type)
1576 };
1577 if is_new && is_root {
1578 AggregationUpdateQueue::run(
1579 AggregationUpdateJob::UpdateAggregationNumber {
1580 task_id,
1581 base_aggregation_number: u32::MAX,
1582 distance: None,
1583 },
1584 &mut ctx,
1585 );
1586 }
1587 operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1589
1590 task_id
1591 }
1592
1593 fn get_or_create_transient_task(
1594 &self,
1595 task_type: CachedTaskType,
1596 parent_task: Option<TaskId>,
1597 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1598 ) -> TaskId {
1599 let is_root = task_type.native_fn.is_root;
1600
1601 if let Some(parent_task) = parent_task
1602 && !parent_task.is_transient()
1603 {
1604 self.panic_persistent_calling_transient(
1605 self.debug_get_task_description(parent_task),
1606 Some(&task_type),
1607 None,
1608 );
1609 }
1610 if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1614 self.track_cache_hit(&task_type);
1615 self.connect_child(
1616 parent_task,
1617 task_id,
1618 Some(ArcOrOwned::Owned(task_type)),
1619 turbo_tasks,
1620 );
1621 return task_id;
1622 }
1623 match raw_entry(&self.task_cache, &task_type) {
1625 RawEntry::Occupied(e) => {
1626 let task_id = *e.get();
1627 drop(e);
1628 self.track_cache_hit(&task_type);
1629 self.connect_child(
1630 parent_task,
1631 task_id,
1632 Some(ArcOrOwned::Owned(task_type)),
1633 turbo_tasks,
1634 );
1635 task_id
1636 }
1637 RawEntry::Vacant(e) => {
1638 let task_type = Arc::new(task_type);
1639 let task_id = self.transient_task_id_factory.get();
1640 self.storage.initialize_new_task(task_id);
1642 e.insert(task_type.clone(), task_id);
1643 self.track_cache_miss(&task_type);
1644
1645 if is_root {
1646 let mut ctx = self.execute_context(turbo_tasks);
1647 AggregationUpdateQueue::run(
1648 AggregationUpdateJob::UpdateAggregationNumber {
1649 task_id,
1650 base_aggregation_number: u32::MAX,
1651 distance: None,
1652 },
1653 &mut ctx,
1654 );
1655 }
1656
1657 self.connect_child(
1658 parent_task,
1659 task_id,
1660 Some(ArcOrOwned::Arc(task_type)),
1661 turbo_tasks,
1662 );
1663
1664 task_id
1665 }
1666 }
1667 }
1668
1669 fn debug_trace_transient_task(
1672 &self,
1673 task_type: &CachedTaskType,
1674 cell_id: Option<CellId>,
1675 ) -> DebugTraceTransientTask {
1676 fn inner_id(
1679 backend: &TurboTasksBackendInner<impl BackingStorage>,
1680 task_id: TaskId,
1681 cell_type_id: Option<ValueTypeId>,
1682 visited_set: &mut FxHashSet<TaskId>,
1683 ) -> DebugTraceTransientTask {
1684 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1685 if visited_set.contains(&task_id) {
1686 let task_name = task_type.get_name();
1687 DebugTraceTransientTask::Collapsed {
1688 task_name,
1689 cell_type_id,
1690 }
1691 } else {
1692 inner_cached(backend, &task_type, cell_type_id, visited_set)
1693 }
1694 } else {
1695 DebugTraceTransientTask::Uncached { cell_type_id }
1696 }
1697 }
1698 fn inner_cached(
1699 backend: &TurboTasksBackendInner<impl BackingStorage>,
1700 task_type: &CachedTaskType,
1701 cell_type_id: Option<ValueTypeId>,
1702 visited_set: &mut FxHashSet<TaskId>,
1703 ) -> DebugTraceTransientTask {
1704 let task_name = task_type.get_name();
1705
1706 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1707 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1708 return None;
1712 };
1713 if task_id.is_transient() {
1714 Some(Box::new(inner_id(
1715 backend,
1716 task_id,
1717 cause_self_raw_vc.try_get_type_id(),
1718 visited_set,
1719 )))
1720 } else {
1721 None
1722 }
1723 });
1724 let cause_args = task_type
1725 .arg
1726 .get_raw_vcs()
1727 .into_iter()
1728 .filter_map(|raw_vc| {
1729 let Some(task_id) = raw_vc.try_get_task_id() else {
1730 return None;
1732 };
1733 if !task_id.is_transient() {
1734 return None;
1735 }
1736 Some((task_id, raw_vc.try_get_type_id()))
1737 })
1738 .collect::<IndexSet<_>>() .into_iter()
1740 .map(|(task_id, cell_type_id)| {
1741 inner_id(backend, task_id, cell_type_id, visited_set)
1742 })
1743 .collect();
1744
1745 DebugTraceTransientTask::Cached {
1746 task_name,
1747 cell_type_id,
1748 cause_self,
1749 cause_args,
1750 }
1751 }
1752 inner_cached(
1753 self,
1754 task_type,
1755 cell_id.map(|c| c.type_id),
1756 &mut FxHashSet::default(),
1757 )
1758 }
1759
1760 fn invalidate_task(
1761 &self,
1762 task_id: TaskId,
1763 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1764 ) {
1765 if !self.should_track_dependencies() {
1766 panic!("Dependency tracking is disabled so invalidation is not allowed");
1767 }
1768 operation::InvalidateOperation::run(
1769 smallvec![task_id],
1770 #[cfg(feature = "trace_task_dirty")]
1771 TaskDirtyCause::Invalidator,
1772 self.execute_context(turbo_tasks),
1773 );
1774 }
1775
1776 fn invalidate_tasks(
1777 &self,
1778 tasks: &[TaskId],
1779 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1780 ) {
1781 if !self.should_track_dependencies() {
1782 panic!("Dependency tracking is disabled so invalidation is not allowed");
1783 }
1784 operation::InvalidateOperation::run(
1785 tasks.iter().copied().collect(),
1786 #[cfg(feature = "trace_task_dirty")]
1787 TaskDirtyCause::Unknown,
1788 self.execute_context(turbo_tasks),
1789 );
1790 }
1791
1792 fn invalidate_tasks_set(
1793 &self,
1794 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1795 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1796 ) {
1797 if !self.should_track_dependencies() {
1798 panic!("Dependency tracking is disabled so invalidation is not allowed");
1799 }
1800 operation::InvalidateOperation::run(
1801 tasks.iter().copied().collect(),
1802 #[cfg(feature = "trace_task_dirty")]
1803 TaskDirtyCause::Unknown,
1804 self.execute_context(turbo_tasks),
1805 );
1806 }
1807
1808 fn invalidate_serialization(
1809 &self,
1810 task_id: TaskId,
1811 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1812 ) {
1813 if task_id.is_transient() {
1814 return;
1815 }
1816 let mut ctx = self.execute_context(turbo_tasks);
1817 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1818 task.invalidate_serialization();
1819 }
1820
1821 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1822 let task = self.storage.access_mut(task_id);
1823 if let Some(value) = task.get_persistent_task_type() {
1824 format!("{task_id:?} {}", value)
1825 } else if let Some(value) = task.get_transient_task_type() {
1826 format!("{task_id:?} {}", value)
1827 } else {
1828 format!("{task_id:?} unknown")
1829 }
1830 }
1831
1832 fn get_task_name(
1833 &self,
1834 task_id: TaskId,
1835 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1836 ) -> String {
1837 let mut ctx = self.execute_context(turbo_tasks);
1838 let task = ctx.task(task_id, TaskDataCategory::Data);
1839 if let Some(value) = task.get_persistent_task_type() {
1840 value.to_string()
1841 } else if let Some(value) = task.get_transient_task_type() {
1842 value.to_string()
1843 } else {
1844 "unknown".to_string()
1845 }
1846 }
1847
1848 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1849 let task = self.storage.access_mut(task_id);
1850 task.get_persistent_task_type().cloned()
1851 }
1852
1853 fn task_execution_canceled(
1854 &self,
1855 task_id: TaskId,
1856 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1857 ) {
1858 let mut ctx = self.execute_context(turbo_tasks);
1859 let mut task = ctx.task(task_id, TaskDataCategory::All);
1860 if let Some(in_progress) = task.take_in_progress() {
1861 match in_progress {
1862 InProgressState::Scheduled {
1863 done_event,
1864 reason: _,
1865 } => done_event.notify(usize::MAX),
1866 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1867 done_event.notify(usize::MAX)
1868 }
1869 InProgressState::Canceled => {}
1870 }
1871 }
1872 let in_progress_cells = task.take_in_progress_cells();
1875 if let Some(ref cells) = in_progress_cells {
1876 for state in cells.values() {
1877 state.event.notify(usize::MAX);
1878 }
1879 }
1880
1881 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1887 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1888 } else {
1889 None
1890 };
1891
1892 let old = task.set_in_progress(InProgressState::Canceled);
1893 debug_assert!(old.is_none(), "InProgress already exists");
1894 drop(task);
1895
1896 if let Some(data_update) = data_update {
1897 AggregationUpdateQueue::run(data_update, &mut ctx);
1898 }
1899
1900 drop(in_progress_cells);
1901 }
1902
1903 fn try_start_task_execution(
1904 &self,
1905 task_id: TaskId,
1906 priority: TaskPriority,
1907 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1908 ) -> Option<TaskExecutionSpec<'_>> {
1909 let execution_reason;
1910 let task_type;
1911 {
1912 let mut ctx = self.execute_context(turbo_tasks);
1913 let mut task = ctx.task(task_id, TaskDataCategory::All);
1914 task_type = task.get_task_type().to_owned();
1915 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1916 if let Some(tasks) = task.prefetch() {
1917 drop(task);
1918 ctx.prepare_tasks(tasks, "prefetch");
1919 task = ctx.task(task_id, TaskDataCategory::All);
1920 }
1921 let in_progress = task.take_in_progress()?;
1922 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1923 let old = task.set_in_progress(in_progress);
1924 debug_assert!(old.is_none(), "InProgress already exists");
1925 return None;
1926 };
1927 execution_reason = reason;
1928 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1929 InProgressStateInner {
1930 stale: false,
1931 once_task,
1932 done_event,
1933 session_dependent: false,
1934 marked_as_completed: false,
1935 new_children: Default::default(),
1936 },
1937 )));
1938 debug_assert!(old.is_none(), "InProgress already exists");
1939
1940 enum Collectible {
1942 Current(CollectibleRef, i32),
1943 Outdated(CollectibleRef),
1944 }
1945 let collectibles = task
1946 .iter_collectibles()
1947 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1948 .chain(
1949 task.iter_outdated_collectibles()
1950 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1951 )
1952 .collect::<Vec<_>>();
1953 for collectible in collectibles {
1954 match collectible {
1955 Collectible::Current(collectible, value) => {
1956 let _ = task.insert_outdated_collectible(collectible, value);
1957 }
1958 Collectible::Outdated(collectible) => {
1959 if task
1960 .collectibles()
1961 .is_none_or(|m| m.get(&collectible).is_none())
1962 {
1963 task.remove_outdated_collectibles(&collectible);
1964 }
1965 }
1966 }
1967 }
1968
1969 if self.should_track_dependencies() {
1970 let cell_dependencies = task.iter_cell_dependencies().collect();
1972 task.set_outdated_cell_dependencies(cell_dependencies);
1973
1974 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1975 task.set_outdated_output_dependencies(outdated_output_dependencies);
1976 }
1977 }
1978
1979 let (span, future) = match task_type {
1980 TaskType::Cached(task_type) => {
1981 let CachedTaskType {
1982 native_fn,
1983 this,
1984 arg,
1985 } = &*task_type;
1986 (
1987 native_fn.span(task_id.persistence(), execution_reason, priority),
1988 native_fn.execute(*this, &**arg),
1989 )
1990 }
1991 TaskType::Transient(task_type) => {
1992 let span = tracing::trace_span!("turbo_tasks::root_task");
1993 let future = match &*task_type {
1994 TransientTask::Root(f) => f(),
1995 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1996 };
1997 (span, future)
1998 }
1999 };
2000 Some(TaskExecutionSpec { future, span })
2001 }
2002
2003 fn task_execution_completed(
2004 &self,
2005 task_id: TaskId,
2006 result: Result<RawVc, TurboTasksExecutionError>,
2007 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2008 #[cfg(feature = "verify_determinism")] stateful: bool,
2009 has_invalidator: bool,
2010 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2011 ) -> bool {
2012 #[cfg(not(feature = "trace_task_details"))]
2027 let span = tracing::trace_span!(
2028 "task execution completed",
2029 new_children = tracing::field::Empty
2030 )
2031 .entered();
2032 #[cfg(feature = "trace_task_details")]
2033 let span = tracing::trace_span!(
2034 "task execution completed",
2035 task_id = display(task_id),
2036 result = match result.as_ref() {
2037 Ok(value) => display(either::Either::Left(value)),
2038 Err(err) => display(either::Either::Right(err)),
2039 },
2040 new_children = tracing::field::Empty,
2041 immutable = tracing::field::Empty,
2042 new_output = tracing::field::Empty,
2043 output_dependents = tracing::field::Empty,
2044 stale = tracing::field::Empty,
2045 )
2046 .entered();
2047
2048 let is_error = result.is_err();
2049
2050 let mut ctx = self.execute_context(turbo_tasks);
2051
2052 let Some(TaskExecutionCompletePrepareResult {
2053 new_children,
2054 is_now_immutable,
2055 #[cfg(feature = "verify_determinism")]
2056 no_output_set,
2057 new_output,
2058 output_dependent_tasks,
2059 }) = self.task_execution_completed_prepare(
2060 &mut ctx,
2061 #[cfg(feature = "trace_task_details")]
2062 &span,
2063 task_id,
2064 result,
2065 cell_counters,
2066 #[cfg(feature = "verify_determinism")]
2067 stateful,
2068 has_invalidator,
2069 )
2070 else {
2071 #[cfg(feature = "trace_task_details")]
2073 span.record("stale", "prepare");
2074 return true;
2075 };
2076
2077 #[cfg(feature = "trace_task_details")]
2078 span.record("new_output", new_output.is_some());
2079 #[cfg(feature = "trace_task_details")]
2080 span.record("output_dependents", output_dependent_tasks.len());
2081
2082 if !output_dependent_tasks.is_empty() {
2087 self.task_execution_completed_invalidate_output_dependent(
2088 &mut ctx,
2089 task_id,
2090 output_dependent_tasks,
2091 );
2092 }
2093
2094 let has_new_children = !new_children.is_empty();
2095 span.record("new_children", new_children.len());
2096
2097 if has_new_children {
2098 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2099 }
2100
2101 if has_new_children
2102 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2103 {
2104 #[cfg(feature = "trace_task_details")]
2106 span.record("stale", "connect");
2107 return true;
2108 }
2109
2110 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2111 &mut ctx,
2112 task_id,
2113 #[cfg(feature = "verify_determinism")]
2114 no_output_set,
2115 new_output,
2116 is_now_immutable,
2117 );
2118 if stale {
2119 #[cfg(feature = "trace_task_details")]
2121 span.record("stale", "finish");
2122 return true;
2123 }
2124
2125 let removed_data =
2126 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2127
2128 drop(removed_data);
2130 drop(in_progress_cells);
2131
2132 false
2133 }
2134
2135 fn task_execution_completed_prepare(
2136 &self,
2137 ctx: &mut impl ExecuteContext<'_>,
2138 #[cfg(feature = "trace_task_details")] span: &Span,
2139 task_id: TaskId,
2140 result: Result<RawVc, TurboTasksExecutionError>,
2141 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2142 #[cfg(feature = "verify_determinism")] stateful: bool,
2143 has_invalidator: bool,
2144 ) -> Option<TaskExecutionCompletePrepareResult> {
2145 let mut task = ctx.task(task_id, TaskDataCategory::All);
2146 let Some(in_progress) = task.get_in_progress_mut() else {
2147 panic!("Task execution completed, but task is not in progress: {task:#?}");
2148 };
2149 if matches!(in_progress, InProgressState::Canceled) {
2150 return Some(TaskExecutionCompletePrepareResult {
2151 new_children: Default::default(),
2152 is_now_immutable: false,
2153 #[cfg(feature = "verify_determinism")]
2154 no_output_set: false,
2155 new_output: None,
2156 output_dependent_tasks: Default::default(),
2157 });
2158 }
2159 let &mut InProgressState::InProgress(box InProgressStateInner {
2160 stale,
2161 ref mut new_children,
2162 session_dependent,
2163 once_task: is_once_task,
2164 ..
2165 }) = in_progress
2166 else {
2167 panic!("Task execution completed, but task is not in progress: {task:#?}");
2168 };
2169
2170 #[cfg(not(feature = "no_fast_stale"))]
2172 if stale && !is_once_task {
2173 let Some(InProgressState::InProgress(box InProgressStateInner {
2174 done_event,
2175 mut new_children,
2176 ..
2177 })) = task.take_in_progress()
2178 else {
2179 unreachable!();
2180 };
2181 let old = task.set_in_progress(InProgressState::Scheduled {
2182 done_event,
2183 reason: TaskExecutionReason::Stale,
2184 });
2185 debug_assert!(old.is_none(), "InProgress already exists");
2186 for task in task.iter_children() {
2189 new_children.remove(&task);
2190 }
2191 drop(task);
2192
2193 AggregationUpdateQueue::run(
2196 AggregationUpdateJob::DecreaseActiveCounts {
2197 task_ids: new_children.into_iter().collect(),
2198 },
2199 ctx,
2200 );
2201 return None;
2202 }
2203
2204 let mut new_children = take(new_children);
2206
2207 #[cfg(feature = "verify_determinism")]
2209 if stateful {
2210 task.set_stateful(true);
2211 }
2212
2213 if has_invalidator {
2215 task.set_invalidator(true);
2216 }
2217
2218 if result.is_ok() {
2228 let old_counters: FxHashMap<_, _> = task
2229 .iter_cell_type_max_index()
2230 .map(|(&k, &v)| (k, v))
2231 .collect();
2232 let mut counters_to_remove = old_counters.clone();
2233
2234 for (&cell_type, &max_index) in cell_counters.iter() {
2235 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2236 if old_max_index != max_index {
2237 task.insert_cell_type_max_index(cell_type, max_index);
2238 }
2239 } else {
2240 task.insert_cell_type_max_index(cell_type, max_index);
2241 }
2242 }
2243 for (cell_type, _) in counters_to_remove {
2244 task.remove_cell_type_max_index(&cell_type);
2245 }
2246 }
2247
2248 let mut queue = AggregationUpdateQueue::new();
2249
2250 let mut old_edges = Vec::new();
2251
2252 let has_children = !new_children.is_empty();
2253 let is_immutable = task.immutable();
2254 let task_dependencies_for_immutable =
2255 if !is_immutable
2257 && !session_dependent
2259 && !task.invalidator()
2261 && task.is_collectibles_dependencies_empty()
2263 {
2264 Some(
2265 task.iter_output_dependencies()
2267 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2268 .collect::<FxHashSet<_>>(),
2269 )
2270 } else {
2271 None
2272 };
2273
2274 if has_children {
2275 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2277
2278 old_edges.extend(
2280 task.iter_children()
2281 .filter(|task| !new_children.remove(task))
2282 .map(OutdatedEdge::Child),
2283 );
2284 } else {
2285 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2286 }
2287
2288 old_edges.extend(
2289 task.iter_outdated_collectibles()
2290 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2291 );
2292
2293 if self.should_track_dependencies() {
2294 old_edges.extend(
2301 task.iter_outdated_cell_dependencies()
2302 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2303 );
2304 old_edges.extend(
2305 task.iter_outdated_output_dependencies()
2306 .map(OutdatedEdge::OutputDependency),
2307 );
2308 }
2309
2310 let current_output = task.get_output();
2312 #[cfg(feature = "verify_determinism")]
2313 let no_output_set = current_output.is_none();
2314 let new_output = match result {
2315 Ok(RawVc::TaskOutput(output_task_id)) => {
2316 if let Some(OutputValue::Output(current_task_id)) = current_output
2317 && *current_task_id == output_task_id
2318 {
2319 None
2320 } else {
2321 Some(OutputValue::Output(output_task_id))
2322 }
2323 }
2324 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2325 if let Some(OutputValue::Cell(CellRef {
2326 task: current_task_id,
2327 cell: current_cell,
2328 })) = current_output
2329 && *current_task_id == output_task_id
2330 && *current_cell == cell
2331 {
2332 None
2333 } else {
2334 Some(OutputValue::Cell(CellRef {
2335 task: output_task_id,
2336 cell,
2337 }))
2338 }
2339 }
2340 Ok(RawVc::LocalOutput(..)) => {
2341 panic!("Non-local tasks must not return a local Vc");
2342 }
2343 Err(err) => {
2344 if let Some(OutputValue::Error(old_error)) = current_output
2345 && **old_error == err
2346 {
2347 None
2348 } else {
2349 Some(OutputValue::Error(Arc::new((&err).into())))
2350 }
2351 }
2352 };
2353 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2354 if new_output.is_some() && ctx.should_track_dependencies() {
2356 output_dependent_tasks = task.iter_output_dependent().collect();
2357 }
2358
2359 drop(task);
2360
2361 let mut is_now_immutable = false;
2363 if let Some(dependencies) = task_dependencies_for_immutable
2364 && dependencies
2365 .iter()
2366 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2367 {
2368 is_now_immutable = true;
2369 }
2370 #[cfg(feature = "trace_task_details")]
2371 span.record("immutable", is_immutable || is_now_immutable);
2372
2373 if !queue.is_empty() || !old_edges.is_empty() {
2374 #[cfg(feature = "trace_task_completion")]
2375 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2376 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2380 }
2381
2382 Some(TaskExecutionCompletePrepareResult {
2383 new_children,
2384 is_now_immutable,
2385 #[cfg(feature = "verify_determinism")]
2386 no_output_set,
2387 new_output,
2388 output_dependent_tasks,
2389 })
2390 }
2391
2392 fn task_execution_completed_invalidate_output_dependent(
2393 &self,
2394 ctx: &mut impl ExecuteContext<'_>,
2395 task_id: TaskId,
2396 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2397 ) {
2398 debug_assert!(!output_dependent_tasks.is_empty());
2399
2400 if output_dependent_tasks.len() > 1 {
2401 ctx.prepare_tasks(
2402 output_dependent_tasks
2403 .iter()
2404 .map(|&id| (id, TaskDataCategory::All)),
2405 "invalidate output dependents",
2406 );
2407 }
2408
2409 fn process_output_dependents(
2410 ctx: &mut impl ExecuteContext<'_>,
2411 task_id: TaskId,
2412 dependent_task_id: TaskId,
2413 queue: &mut AggregationUpdateQueue,
2414 ) {
2415 #[cfg(feature = "trace_task_output_dependencies")]
2416 let span = tracing::trace_span!(
2417 "invalidate output dependency",
2418 task = %task_id,
2419 dependent_task = %dependent_task_id,
2420 result = tracing::field::Empty,
2421 )
2422 .entered();
2423 let mut make_stale = true;
2424 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2425 let transient_task_type = dependent.get_transient_task_type();
2426 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2427 #[cfg(feature = "trace_task_output_dependencies")]
2429 span.record("result", "once task");
2430 return;
2431 }
2432 if dependent.outdated_output_dependencies_contains(&task_id) {
2433 #[cfg(feature = "trace_task_output_dependencies")]
2434 span.record("result", "outdated dependency");
2435 make_stale = false;
2440 } else if !dependent.output_dependencies_contains(&task_id) {
2441 #[cfg(feature = "trace_task_output_dependencies")]
2444 span.record("result", "no backward dependency");
2445 return;
2446 }
2447 make_task_dirty_internal(
2448 dependent,
2449 dependent_task_id,
2450 make_stale,
2451 #[cfg(feature = "trace_task_dirty")]
2452 TaskDirtyCause::OutputChange { task_id },
2453 queue,
2454 ctx,
2455 );
2456 #[cfg(feature = "trace_task_output_dependencies")]
2457 span.record("result", "marked dirty");
2458 }
2459
2460 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2461 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2462 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2463 let _ = scope_and_block(chunks.len(), |scope| {
2464 for chunk in chunks {
2465 let child_ctx = ctx.child_context();
2466 scope.spawn(move || {
2467 let mut ctx = child_ctx.create();
2468 let mut queue = AggregationUpdateQueue::new();
2469 for dependent_task_id in chunk {
2470 process_output_dependents(
2471 &mut ctx,
2472 task_id,
2473 dependent_task_id,
2474 &mut queue,
2475 )
2476 }
2477 queue.execute(&mut ctx);
2478 });
2479 }
2480 });
2481 } else {
2482 let mut queue = AggregationUpdateQueue::new();
2483 for dependent_task_id in output_dependent_tasks {
2484 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2485 }
2486 queue.execute(ctx);
2487 }
2488 }
2489
2490 fn task_execution_completed_unfinished_children_dirty(
2491 &self,
2492 ctx: &mut impl ExecuteContext<'_>,
2493 new_children: &FxHashSet<TaskId>,
2494 ) {
2495 debug_assert!(!new_children.is_empty());
2496
2497 let mut queue = AggregationUpdateQueue::new();
2498 ctx.for_each_task_all(
2499 new_children.iter().copied(),
2500 "unfinished children dirty",
2501 |child_task, ctx| {
2502 if !child_task.has_output() {
2503 let child_id = child_task.id();
2504 make_task_dirty_internal(
2505 child_task,
2506 child_id,
2507 false,
2508 #[cfg(feature = "trace_task_dirty")]
2509 TaskDirtyCause::InitialDirty,
2510 &mut queue,
2511 ctx,
2512 );
2513 }
2514 },
2515 );
2516
2517 queue.execute(ctx);
2518 }
2519
2520 fn task_execution_completed_connect(
2521 &self,
2522 ctx: &mut impl ExecuteContext<'_>,
2523 task_id: TaskId,
2524 new_children: FxHashSet<TaskId>,
2525 ) -> bool {
2526 debug_assert!(!new_children.is_empty());
2527
2528 let mut task = ctx.task(task_id, TaskDataCategory::All);
2529 let Some(in_progress) = task.get_in_progress() else {
2530 panic!("Task execution completed, but task is not in progress: {task:#?}");
2531 };
2532 if matches!(in_progress, InProgressState::Canceled) {
2533 return false;
2535 }
2536 let InProgressState::InProgress(box InProgressStateInner {
2537 #[cfg(not(feature = "no_fast_stale"))]
2538 stale,
2539 once_task: is_once_task,
2540 ..
2541 }) = in_progress
2542 else {
2543 panic!("Task execution completed, but task is not in progress: {task:#?}");
2544 };
2545
2546 #[cfg(not(feature = "no_fast_stale"))]
2548 if *stale && !is_once_task {
2549 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2550 task.take_in_progress()
2551 else {
2552 unreachable!();
2553 };
2554 let old = task.set_in_progress(InProgressState::Scheduled {
2555 done_event,
2556 reason: TaskExecutionReason::Stale,
2557 });
2558 debug_assert!(old.is_none(), "InProgress already exists");
2559 drop(task);
2560
2561 AggregationUpdateQueue::run(
2564 AggregationUpdateJob::DecreaseActiveCounts {
2565 task_ids: new_children.into_iter().collect(),
2566 },
2567 ctx,
2568 );
2569 return true;
2570 }
2571
2572 let has_active_count = ctx.should_track_activeness()
2573 && task
2574 .get_activeness()
2575 .is_some_and(|activeness| activeness.active_counter > 0);
2576 connect_children(
2577 ctx,
2578 task_id,
2579 task,
2580 new_children,
2581 has_active_count,
2582 ctx.should_track_activeness(),
2583 );
2584
2585 false
2586 }
2587
2588 fn task_execution_completed_finish(
2589 &self,
2590 ctx: &mut impl ExecuteContext<'_>,
2591 task_id: TaskId,
2592 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2593 new_output: Option<OutputValue>,
2594 is_now_immutable: bool,
2595 ) -> (
2596 bool,
2597 Option<
2598 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2599 >,
2600 ) {
2601 let mut task = ctx.task(task_id, TaskDataCategory::All);
2602 let Some(in_progress) = task.take_in_progress() else {
2603 panic!("Task execution completed, but task is not in progress: {task:#?}");
2604 };
2605 if matches!(in_progress, InProgressState::Canceled) {
2606 return (false, None);
2608 }
2609 let InProgressState::InProgress(box InProgressStateInner {
2610 done_event,
2611 once_task: is_once_task,
2612 stale,
2613 session_dependent,
2614 marked_as_completed: _,
2615 new_children,
2616 }) = in_progress
2617 else {
2618 panic!("Task execution completed, but task is not in progress: {task:#?}");
2619 };
2620 debug_assert!(new_children.is_empty());
2621
2622 if stale && !is_once_task {
2624 let old = task.set_in_progress(InProgressState::Scheduled {
2625 done_event,
2626 reason: TaskExecutionReason::Stale,
2627 });
2628 debug_assert!(old.is_none(), "InProgress already exists");
2629 return (true, None);
2630 }
2631
2632 let mut old_content = None;
2634 if let Some(value) = new_output {
2635 old_content = task.set_output(value);
2636 }
2637
2638 if is_now_immutable {
2641 task.set_immutable(true);
2642 }
2643
2644 let in_progress_cells = task.take_in_progress_cells();
2646 if let Some(ref cells) = in_progress_cells {
2647 for state in cells.values() {
2648 state.event.notify(usize::MAX);
2649 }
2650 }
2651
2652 let new_dirtyness = if session_dependent {
2654 Some(Dirtyness::SessionDependent)
2655 } else {
2656 None
2657 };
2658 #[cfg(feature = "verify_determinism")]
2659 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2660 let data_update = task.update_dirty_state(new_dirtyness);
2661
2662 #[cfg(feature = "verify_determinism")]
2663 let reschedule =
2664 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2665 #[cfg(not(feature = "verify_determinism"))]
2666 let reschedule = false;
2667 if reschedule {
2668 let old = task.set_in_progress(InProgressState::Scheduled {
2669 done_event,
2670 reason: TaskExecutionReason::Stale,
2671 });
2672 debug_assert!(old.is_none(), "InProgress already exists");
2673 drop(task);
2674 } else {
2675 drop(task);
2676
2677 done_event.notify(usize::MAX);
2679 }
2680
2681 drop(old_content);
2682
2683 if let Some(data_update) = data_update {
2684 AggregationUpdateQueue::run(data_update, ctx);
2685 }
2686
2687 (reschedule, in_progress_cells)
2689 }
2690
2691 fn task_execution_completed_cleanup(
2692 &self,
2693 ctx: &mut impl ExecuteContext<'_>,
2694 task_id: TaskId,
2695 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2696 is_error: bool,
2697 ) -> Vec<SharedReference> {
2698 let mut task = ctx.task(task_id, TaskDataCategory::All);
2699 let mut removed_cell_data = Vec::new();
2700 if !is_error {
2706 let to_remove_persistent: Vec<_> = task
2713 .iter_persistent_cell_data()
2714 .filter_map(|(cell, _)| {
2715 cell_counters
2716 .get(&cell.type_id)
2717 .is_none_or(|start_index| cell.index >= *start_index)
2718 .then_some(*cell)
2719 })
2720 .collect();
2721
2722 let to_remove_transient: Vec<_> = task
2724 .iter_transient_cell_data()
2725 .filter_map(|(cell, _)| {
2726 cell_counters
2727 .get(&cell.type_id)
2728 .is_none_or(|start_index| cell.index >= *start_index)
2729 .then_some(*cell)
2730 })
2731 .collect();
2732 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2733 for cell in to_remove_persistent {
2734 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2735 removed_cell_data.push(data.into_untyped());
2736 }
2737 }
2738 for cell in to_remove_transient {
2739 if let Some(data) = task.remove_transient_cell_data(&cell) {
2740 removed_cell_data.push(data);
2741 }
2742 }
2743 let to_remove_hash: Vec<_> = task
2745 .iter_cell_data_hash()
2746 .filter_map(|(cell, _)| {
2747 cell_counters
2748 .get(&cell.type_id)
2749 .is_none_or(|start_index| cell.index >= *start_index)
2750 .then_some(*cell)
2751 })
2752 .collect();
2753 for cell in to_remove_hash {
2754 task.remove_cell_data_hash(&cell);
2755 }
2756 }
2757
2758 task.cleanup_after_execution();
2762
2763 drop(task);
2764
2765 removed_cell_data
2767 }
2768
2769 fn run_backend_job<'a>(
2770 self: &'a Arc<Self>,
2771 job: TurboTasksBackendJob,
2772 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2773 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2774 Box::pin(async move {
2775 match job {
2776 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2777 debug_assert!(self.should_persist());
2778
2779 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2780 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2781 let mut idle_start_listener = self.idle_start_event.listen();
2782 let mut idle_end_listener = self.idle_end_event.listen();
2783 let mut fresh_idle = true;
2784 loop {
2785 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2786 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2787 let idle_timeout = *IDLE_TIMEOUT;
2788 let (time, mut reason) =
2789 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2790 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2791 } else {
2792 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2793 };
2794
2795 let until = last_snapshot + time;
2796 if until > Instant::now() {
2797 let mut stop_listener = self.stopping_event.listen();
2798 if self.stopping.load(Ordering::Acquire) {
2799 return;
2800 }
2801 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2802 Instant::now() + idle_timeout
2803 } else {
2804 far_future()
2805 };
2806 loop {
2807 tokio::select! {
2808 _ = &mut stop_listener => {
2809 return;
2810 },
2811 _ = &mut idle_start_listener => {
2812 fresh_idle = true;
2813 idle_time = Instant::now() + idle_timeout;
2814 idle_start_listener = self.idle_start_event.listen()
2815 },
2816 _ = &mut idle_end_listener => {
2817 idle_time = until + idle_timeout;
2818 idle_end_listener = self.idle_end_event.listen()
2819 },
2820 _ = tokio::time::sleep_until(until) => {
2821 break;
2822 },
2823 _ = tokio::time::sleep_until(idle_time) => {
2824 if turbo_tasks.is_idle() {
2825 reason = "idle timeout";
2826 break;
2827 }
2828 },
2829 }
2830 }
2831 }
2832
2833 let this = self.clone();
2834 let background_span =
2838 tracing::info_span!(parent: None, "background snapshot");
2839 let snapshot =
2840 this.snapshot_and_persist(background_span.id(), reason, turbo_tasks);
2841 if let Some((snapshot_start, new_data)) = snapshot {
2842 last_snapshot = snapshot_start;
2843
2844 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2851 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2852 let idle_ended = tokio::select! {
2853 biased;
2854 _ = &mut idle_end_listener => {
2855 idle_end_listener = self.idle_end_event.listen();
2856 true
2857 },
2858 _ = std::future::ready(()) => false,
2859 };
2860 if idle_ended {
2861 break;
2862 }
2863 let _compact_span = tracing::info_span!(
2867 parent: background_span.id(),
2868 "compact database"
2869 )
2870 .entered();
2871 match self.backing_storage.compact() {
2872 Ok(true) => {}
2873 Ok(false) => break,
2874 Err(err) => {
2875 eprintln!("Compaction failed: {err:?}");
2876 break;
2877 }
2878 }
2879 }
2880
2881 if !new_data {
2882 fresh_idle = false;
2883 continue;
2884 }
2885 let last_snapshot = last_snapshot.duration_since(self.start_time);
2886 self.last_snapshot.store(
2887 last_snapshot.as_millis().try_into().unwrap(),
2888 Ordering::Relaxed,
2889 );
2890
2891 turbo_tasks.schedule_backend_background_job(
2892 TurboTasksBackendJob::FollowUpSnapshot,
2893 );
2894 return;
2895 }
2896 }
2897 }
2898 }
2899 })
2900 }
2901
2902 fn try_read_own_task_cell(
2903 &self,
2904 task_id: TaskId,
2905 cell: CellId,
2906 options: ReadCellOptions,
2907 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2908 ) -> Result<TypedCellContent> {
2909 let mut ctx = self.execute_context(turbo_tasks);
2910 let task = ctx.task(task_id, TaskDataCategory::Data);
2911 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2912 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2913 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2914 } else {
2915 Ok(CellContent(None).into_typed(cell.type_id))
2916 }
2917 }
2918
2919 fn read_task_collectibles(
2920 &self,
2921 task_id: TaskId,
2922 collectible_type: TraitTypeId,
2923 reader_id: Option<TaskId>,
2924 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2925 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2926 let mut ctx = self.execute_context(turbo_tasks);
2927 let mut collectibles = AutoMap::default();
2928 {
2929 let mut task = ctx.task(task_id, TaskDataCategory::All);
2930 loop {
2932 let aggregation_number = get_aggregation_number(&task);
2933 if is_root_node(aggregation_number) {
2934 break;
2935 }
2936 drop(task);
2937 AggregationUpdateQueue::run(
2938 AggregationUpdateJob::UpdateAggregationNumber {
2939 task_id,
2940 base_aggregation_number: u32::MAX,
2941 distance: None,
2942 },
2943 &mut ctx,
2944 );
2945 task = ctx.task(task_id, TaskDataCategory::All);
2946 }
2947 for (collectible, count) in task.iter_aggregated_collectibles() {
2948 if *count > 0 && collectible.collectible_type == collectible_type {
2949 *collectibles
2950 .entry(RawVc::TaskCell(
2951 collectible.cell.task,
2952 collectible.cell.cell,
2953 ))
2954 .or_insert(0) += 1;
2955 }
2956 }
2957 for (&collectible, &count) in task.iter_collectibles() {
2958 if collectible.collectible_type == collectible_type {
2959 *collectibles
2960 .entry(RawVc::TaskCell(
2961 collectible.cell.task,
2962 collectible.cell.cell,
2963 ))
2964 .or_insert(0) += count;
2965 }
2966 }
2967 if let Some(reader_id) = reader_id {
2968 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2969 }
2970 }
2971 if let Some(reader_id) = reader_id {
2972 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2973 let target = CollectiblesRef {
2974 task: task_id,
2975 collectible_type,
2976 };
2977 if !reader.remove_outdated_collectibles_dependencies(&target) {
2978 let _ = reader.add_collectibles_dependencies(target);
2979 }
2980 }
2981 collectibles
2982 }
2983
2984 fn emit_collectible(
2985 &self,
2986 collectible_type: TraitTypeId,
2987 collectible: RawVc,
2988 task_id: TaskId,
2989 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2990 ) {
2991 self.assert_valid_collectible(task_id, collectible);
2992
2993 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2994 panic!("Collectibles need to be resolved");
2995 };
2996 let cell = CellRef {
2997 task: collectible_task,
2998 cell,
2999 };
3000 operation::UpdateCollectibleOperation::run(
3001 task_id,
3002 CollectibleRef {
3003 collectible_type,
3004 cell,
3005 },
3006 1,
3007 self.execute_context(turbo_tasks),
3008 );
3009 }
3010
3011 fn unemit_collectible(
3012 &self,
3013 collectible_type: TraitTypeId,
3014 collectible: RawVc,
3015 count: u32,
3016 task_id: TaskId,
3017 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3018 ) {
3019 self.assert_valid_collectible(task_id, collectible);
3020
3021 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3022 panic!("Collectibles need to be resolved");
3023 };
3024 let cell = CellRef {
3025 task: collectible_task,
3026 cell,
3027 };
3028 operation::UpdateCollectibleOperation::run(
3029 task_id,
3030 CollectibleRef {
3031 collectible_type,
3032 cell,
3033 },
3034 -(i32::try_from(count).unwrap()),
3035 self.execute_context(turbo_tasks),
3036 );
3037 }
3038
3039 fn update_task_cell(
3040 &self,
3041 task_id: TaskId,
3042 cell: CellId,
3043 is_serializable_cell_content: bool,
3044 content: CellContent,
3045 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3046 content_hash: Option<CellHash>,
3047 verification_mode: VerificationMode,
3048 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3049 ) {
3050 operation::UpdateCellOperation::run(
3051 task_id,
3052 cell,
3053 content,
3054 is_serializable_cell_content,
3055 updated_key_hashes,
3056 content_hash,
3057 verification_mode,
3058 self.execute_context(turbo_tasks),
3059 );
3060 }
3061
3062 fn mark_own_task_as_session_dependent(
3063 &self,
3064 task_id: TaskId,
3065 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3066 ) {
3067 if !self.should_track_dependencies() {
3068 return;
3070 }
3071 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3072 let mut ctx = self.execute_context(turbo_tasks);
3073 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3074 let aggregation_number = get_aggregation_number(&task);
3075 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3076 drop(task);
3077 AggregationUpdateQueue::run(
3080 AggregationUpdateJob::UpdateAggregationNumber {
3081 task_id,
3082 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3083 distance: None,
3084 },
3085 &mut ctx,
3086 );
3087 task = ctx.task(task_id, TaskDataCategory::Meta);
3088 }
3089 if let Some(InProgressState::InProgress(box InProgressStateInner {
3090 session_dependent,
3091 ..
3092 })) = task.get_in_progress_mut()
3093 {
3094 *session_dependent = true;
3095 }
3096 }
3097
3098 fn mark_own_task_as_finished(
3099 &self,
3100 task: TaskId,
3101 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3102 ) {
3103 let mut ctx = self.execute_context(turbo_tasks);
3104 let mut task = ctx.task(task, TaskDataCategory::Data);
3105 if let Some(InProgressState::InProgress(box InProgressStateInner {
3106 marked_as_completed,
3107 ..
3108 })) = task.get_in_progress_mut()
3109 {
3110 *marked_as_completed = true;
3111 }
3116 }
3117
3118 fn connect_task(
3119 &self,
3120 task: TaskId,
3121 parent_task: Option<TaskId>,
3122 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3123 ) {
3124 self.assert_not_persistent_calling_transient(parent_task, task, None);
3125 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3126 }
3127
3128 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3129 let task_id = self.transient_task_id_factory.get();
3130 {
3131 let mut task = self.storage.access_mut(task_id);
3132 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3133 }
3134 #[cfg(feature = "verify_aggregation_graph")]
3135 self.root_tasks.lock().insert(task_id);
3136 task_id
3137 }
3138
3139 fn dispose_root_task(
3140 &self,
3141 task_id: TaskId,
3142 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3143 ) {
3144 #[cfg(feature = "verify_aggregation_graph")]
3145 self.root_tasks.lock().remove(&task_id);
3146
3147 let mut ctx = self.execute_context(turbo_tasks);
3148 let mut task = ctx.task(task_id, TaskDataCategory::All);
3149 let is_dirty = task.is_dirty();
3150 let has_dirty_containers = task.has_dirty_containers();
3151 if is_dirty.is_some() || has_dirty_containers {
3152 if let Some(activeness_state) = task.get_activeness_mut() {
3153 activeness_state.unset_root_type();
3155 activeness_state.set_active_until_clean();
3156 };
3157 } else if let Some(activeness_state) = task.take_activeness() {
3158 activeness_state.all_clean_event.notify(usize::MAX);
3161 }
3162 }
3163
3164 #[cfg(feature = "verify_aggregation_graph")]
3165 fn verify_aggregation_graph(
3166 &self,
3167 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3168 idle: bool,
3169 ) {
3170 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3171 return;
3172 }
3173 use std::{collections::VecDeque, env, io::stdout};
3174
3175 use crate::backend::operation::{get_uppers, is_aggregating_node};
3176
3177 let mut ctx = self.execute_context(turbo_tasks);
3178 let root_tasks = self.root_tasks.lock().clone();
3179
3180 for task_id in root_tasks.into_iter() {
3181 let mut queue = VecDeque::new();
3182 let mut visited = FxHashSet::default();
3183 let mut aggregated_nodes = FxHashSet::default();
3184 let mut collectibles = FxHashMap::default();
3185 let root_task_id = task_id;
3186 visited.insert(task_id);
3187 aggregated_nodes.insert(task_id);
3188 queue.push_back(task_id);
3189 let mut counter = 0;
3190 while let Some(task_id) = queue.pop_front() {
3191 counter += 1;
3192 if counter % 100000 == 0 {
3193 println!(
3194 "queue={}, visited={}, aggregated_nodes={}",
3195 queue.len(),
3196 visited.len(),
3197 aggregated_nodes.len()
3198 );
3199 }
3200 let task = ctx.task(task_id, TaskDataCategory::All);
3201 if idle && !self.is_idle.load(Ordering::Relaxed) {
3202 return;
3203 }
3204
3205 let uppers = get_uppers(&task);
3206 if task_id != root_task_id
3207 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3208 {
3209 panic!(
3210 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3211 {:?})",
3212 task_id,
3213 task.get_task_description(),
3214 uppers
3215 );
3216 }
3217
3218 for (collectible, _) in task.iter_aggregated_collectibles() {
3219 collectibles
3220 .entry(*collectible)
3221 .or_insert_with(|| (false, Vec::new()))
3222 .1
3223 .push(task_id);
3224 }
3225
3226 for (&collectible, &value) in task.iter_collectibles() {
3227 if value > 0 {
3228 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3229 *flag = true
3230 } else {
3231 panic!(
3232 "Task {} has a collectible {:?} that is not in any upper task",
3233 task_id, collectible
3234 );
3235 }
3236 }
3237 }
3238
3239 let is_dirty = task.has_dirty();
3240 let has_dirty_container = task.has_dirty_containers();
3241 let should_be_in_upper = is_dirty || has_dirty_container;
3242
3243 let aggregation_number = get_aggregation_number(&task);
3244 if is_aggregating_node(aggregation_number) {
3245 aggregated_nodes.insert(task_id);
3246 }
3247 for child_id in task.iter_children() {
3254 if visited.insert(child_id) {
3256 queue.push_back(child_id);
3257 }
3258 }
3259 drop(task);
3260
3261 if should_be_in_upper {
3262 for upper_id in uppers {
3263 let upper = ctx.task(upper_id, TaskDataCategory::All);
3264 let in_upper = upper
3265 .get_aggregated_dirty_containers(&task_id)
3266 .is_some_and(|&dirty| dirty > 0);
3267 if !in_upper {
3268 let containers: Vec<_> = upper
3269 .iter_aggregated_dirty_containers()
3270 .map(|(&k, &v)| (k, v))
3271 .collect();
3272 let upper_task_desc = upper.get_task_description();
3273 drop(upper);
3274 panic!(
3275 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3276 ({})\nThese dirty containers are present:\n{:#?}",
3277 task_id,
3278 ctx.task(task_id, TaskDataCategory::Data)
3279 .get_task_description(),
3280 upper_id,
3281 upper_task_desc,
3282 containers,
3283 );
3284 }
3285 }
3286 }
3287 }
3288
3289 for (collectible, (flag, task_ids)) in collectibles {
3290 if !flag {
3291 use std::io::Write;
3292 let mut stdout = stdout().lock();
3293 writeln!(
3294 stdout,
3295 "{:?} that is not emitted in any child task but in these aggregated \
3296 tasks: {:#?}",
3297 collectible,
3298 task_ids
3299 .iter()
3300 .map(|t| format!(
3301 "{t} {}",
3302 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3303 ))
3304 .collect::<Vec<_>>()
3305 )
3306 .unwrap();
3307
3308 let task_id = collectible.cell.task;
3309 let mut queue = {
3310 let task = ctx.task(task_id, TaskDataCategory::All);
3311 get_uppers(&task)
3312 };
3313 let mut visited = FxHashSet::default();
3314 for &upper_id in queue.iter() {
3315 visited.insert(upper_id);
3316 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3317 }
3318 while let Some(task_id) = queue.pop() {
3319 let task = ctx.task(task_id, TaskDataCategory::All);
3320 let desc = task.get_task_description();
3321 let aggregated_collectible = task
3322 .get_aggregated_collectibles(&collectible)
3323 .copied()
3324 .unwrap_or_default();
3325 let uppers = get_uppers(&task);
3326 drop(task);
3327 writeln!(
3328 stdout,
3329 "upper {task_id} {desc} collectible={aggregated_collectible}"
3330 )
3331 .unwrap();
3332 if task_ids.contains(&task_id) {
3333 writeln!(
3334 stdout,
3335 "Task has an upper connection to an aggregated task that doesn't \
3336 reference it. Upper connection is invalid!"
3337 )
3338 .unwrap();
3339 }
3340 for upper_id in uppers {
3341 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3342 if !visited.contains(&upper_id) {
3343 queue.push(upper_id);
3344 }
3345 }
3346 }
3347 panic!("See stdout for more details");
3348 }
3349 }
3350 }
3351 }
3352
3353 fn assert_not_persistent_calling_transient(
3354 &self,
3355 parent_id: Option<TaskId>,
3356 child_id: TaskId,
3357 cell_id: Option<CellId>,
3358 ) {
3359 if let Some(parent_id) = parent_id
3360 && !parent_id.is_transient()
3361 && child_id.is_transient()
3362 {
3363 self.panic_persistent_calling_transient(
3364 self.debug_get_task_description(parent_id),
3365 self.debug_get_cached_task_type(child_id).as_deref(),
3366 cell_id,
3367 );
3368 }
3369 }
3370
3371 fn panic_persistent_calling_transient(
3372 &self,
3373 parent: String,
3374 child: Option<&CachedTaskType>,
3375 cell_id: Option<CellId>,
3376 ) {
3377 let transient_reason = if let Some(child) = child {
3378 Cow::Owned(format!(
3379 " The callee is transient because it depends on:\n{}",
3380 self.debug_trace_transient_task(child, cell_id),
3381 ))
3382 } else {
3383 Cow::Borrowed("")
3384 };
3385 panic!(
3386 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3387 parent,
3388 child.map_or("unknown", |t| t.get_name()),
3389 transient_reason,
3390 );
3391 }
3392
3393 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3394 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3396 let task_info = if let Some(col_task_ty) = collectible
3398 .try_get_task_id()
3399 .map(|t| self.debug_get_task_description(t))
3400 {
3401 Cow::Owned(format!(" (return type of {col_task_ty})"))
3402 } else {
3403 Cow::Borrowed("")
3404 };
3405 panic!("Collectible{task_info} must be a ResolvedVc")
3406 };
3407 if col_task_id.is_transient() && !task_id.is_transient() {
3408 let transient_reason =
3409 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3410 Cow::Owned(format!(
3411 ". The collectible is transient because it depends on:\n{}",
3412 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3413 ))
3414 } else {
3415 Cow::Borrowed("")
3416 };
3417 panic!(
3419 "Collectible is transient, transient collectibles cannot be emitted from \
3420 persistent tasks{transient_reason}",
3421 )
3422 }
3423 }
3424}
3425
3426impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3427 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3428 self.0.startup(turbo_tasks);
3429 }
3430
3431 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3432 self.0.stopping();
3433 }
3434
3435 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3436 self.0.stop(turbo_tasks);
3437 }
3438
3439 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3440 self.0.idle_start(turbo_tasks);
3441 }
3442
3443 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3444 self.0.idle_end();
3445 }
3446
3447 fn get_or_create_persistent_task(
3448 &self,
3449 task_type: CachedTaskType,
3450 parent_task: Option<TaskId>,
3451 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3452 ) -> TaskId {
3453 self.0
3454 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3455 }
3456
3457 fn get_or_create_transient_task(
3458 &self,
3459 task_type: CachedTaskType,
3460 parent_task: Option<TaskId>,
3461 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3462 ) -> TaskId {
3463 self.0
3464 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3465 }
3466
3467 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3468 self.0.invalidate_task(task_id, turbo_tasks);
3469 }
3470
3471 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3472 self.0.invalidate_tasks(tasks, turbo_tasks);
3473 }
3474
3475 fn invalidate_tasks_set(
3476 &self,
3477 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3478 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3479 ) {
3480 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3481 }
3482
3483 fn invalidate_serialization(
3484 &self,
3485 task_id: TaskId,
3486 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3487 ) {
3488 self.0.invalidate_serialization(task_id, turbo_tasks);
3489 }
3490
3491 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3492 self.0.task_execution_canceled(task, turbo_tasks)
3493 }
3494
3495 fn try_start_task_execution(
3496 &self,
3497 task_id: TaskId,
3498 priority: TaskPriority,
3499 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3500 ) -> Option<TaskExecutionSpec<'_>> {
3501 self.0
3502 .try_start_task_execution(task_id, priority, turbo_tasks)
3503 }
3504
3505 fn task_execution_completed(
3506 &self,
3507 task_id: TaskId,
3508 result: Result<RawVc, TurboTasksExecutionError>,
3509 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3510 #[cfg(feature = "verify_determinism")] stateful: bool,
3511 has_invalidator: bool,
3512 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3513 ) -> bool {
3514 self.0.task_execution_completed(
3515 task_id,
3516 result,
3517 cell_counters,
3518 #[cfg(feature = "verify_determinism")]
3519 stateful,
3520 has_invalidator,
3521 turbo_tasks,
3522 )
3523 }
3524
3525 type BackendJob = TurboTasksBackendJob;
3526
3527 fn run_backend_job<'a>(
3528 &'a self,
3529 job: Self::BackendJob,
3530 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3531 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3532 self.0.run_backend_job(job, turbo_tasks)
3533 }
3534
3535 fn try_read_task_output(
3536 &self,
3537 task_id: TaskId,
3538 reader: Option<TaskId>,
3539 options: ReadOutputOptions,
3540 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3541 ) -> Result<Result<RawVc, EventListener>> {
3542 self.0
3543 .try_read_task_output(task_id, reader, options, turbo_tasks)
3544 }
3545
3546 fn try_read_task_cell(
3547 &self,
3548 task_id: TaskId,
3549 cell: CellId,
3550 reader: Option<TaskId>,
3551 options: ReadCellOptions,
3552 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3553 ) -> Result<Result<TypedCellContent, EventListener>> {
3554 self.0
3555 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3556 }
3557
3558 fn try_read_own_task_cell(
3559 &self,
3560 task_id: TaskId,
3561 cell: CellId,
3562 options: ReadCellOptions,
3563 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3564 ) -> Result<TypedCellContent> {
3565 self.0
3566 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3567 }
3568
3569 fn read_task_collectibles(
3570 &self,
3571 task_id: TaskId,
3572 collectible_type: TraitTypeId,
3573 reader: Option<TaskId>,
3574 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3575 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3576 self.0
3577 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3578 }
3579
3580 fn emit_collectible(
3581 &self,
3582 collectible_type: TraitTypeId,
3583 collectible: RawVc,
3584 task_id: TaskId,
3585 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3586 ) {
3587 self.0
3588 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3589 }
3590
3591 fn unemit_collectible(
3592 &self,
3593 collectible_type: TraitTypeId,
3594 collectible: RawVc,
3595 count: u32,
3596 task_id: TaskId,
3597 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3598 ) {
3599 self.0
3600 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3601 }
3602
3603 fn update_task_cell(
3604 &self,
3605 task_id: TaskId,
3606 cell: CellId,
3607 is_serializable_cell_content: bool,
3608 content: CellContent,
3609 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3610 content_hash: Option<CellHash>,
3611 verification_mode: VerificationMode,
3612 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3613 ) {
3614 self.0.update_task_cell(
3615 task_id,
3616 cell,
3617 is_serializable_cell_content,
3618 content,
3619 updated_key_hashes,
3620 content_hash,
3621 verification_mode,
3622 turbo_tasks,
3623 );
3624 }
3625
3626 fn mark_own_task_as_finished(
3627 &self,
3628 task_id: TaskId,
3629 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3630 ) {
3631 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3632 }
3633
3634 fn mark_own_task_as_session_dependent(
3635 &self,
3636 task: TaskId,
3637 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3638 ) {
3639 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3640 }
3641
3642 fn connect_task(
3643 &self,
3644 task: TaskId,
3645 parent_task: Option<TaskId>,
3646 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3647 ) {
3648 self.0.connect_task(task, parent_task, turbo_tasks);
3649 }
3650
3651 fn create_transient_task(
3652 &self,
3653 task_type: TransientTaskType,
3654 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3655 ) -> TaskId {
3656 self.0.create_transient_task(task_type)
3657 }
3658
3659 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3660 self.0.dispose_root_task(task_id, turbo_tasks);
3661 }
3662
3663 fn task_statistics(&self) -> &TaskStatisticsApi {
3664 &self.0.task_statistics
3665 }
3666
3667 fn is_tracking_dependencies(&self) -> bool {
3668 self.0.options.dependency_tracking
3669 }
3670
3671 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3672 self.0.get_task_name(task, turbo_tasks)
3673 }
3674}
3675
3676enum DebugTraceTransientTask {
3677 Cached {
3678 task_name: &'static str,
3679 cell_type_id: Option<ValueTypeId>,
3680 cause_self: Option<Box<DebugTraceTransientTask>>,
3681 cause_args: Vec<DebugTraceTransientTask>,
3682 },
3683 Collapsed {
3685 task_name: &'static str,
3686 cell_type_id: Option<ValueTypeId>,
3687 },
3688 Uncached {
3689 cell_type_id: Option<ValueTypeId>,
3690 },
3691}
3692
3693impl DebugTraceTransientTask {
3694 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3695 let indent = " ".repeat(level);
3696 f.write_str(&indent)?;
3697
3698 fn fmt_cell_type_id(
3699 f: &mut fmt::Formatter<'_>,
3700 cell_type_id: Option<ValueTypeId>,
3701 ) -> fmt::Result {
3702 if let Some(ty) = cell_type_id {
3703 write!(
3704 f,
3705 " (read cell of type {})",
3706 get_value_type(ty).ty.global_name
3707 )
3708 } else {
3709 Ok(())
3710 }
3711 }
3712
3713 match self {
3715 Self::Cached {
3716 task_name,
3717 cell_type_id,
3718 ..
3719 }
3720 | Self::Collapsed {
3721 task_name,
3722 cell_type_id,
3723 ..
3724 } => {
3725 f.write_str(task_name)?;
3726 fmt_cell_type_id(f, *cell_type_id)?;
3727 if matches!(self, Self::Collapsed { .. }) {
3728 f.write_str(" (collapsed)")?;
3729 }
3730 }
3731 Self::Uncached { cell_type_id } => {
3732 f.write_str("unknown transient task")?;
3733 fmt_cell_type_id(f, *cell_type_id)?;
3734 }
3735 }
3736 f.write_char('\n')?;
3737
3738 if let Self::Cached {
3740 cause_self,
3741 cause_args,
3742 ..
3743 } = self
3744 {
3745 if let Some(c) = cause_self {
3746 writeln!(f, "{indent} self:")?;
3747 c.fmt_indented(f, level + 1)?;
3748 }
3749 if !cause_args.is_empty() {
3750 writeln!(f, "{indent} args:")?;
3751 for c in cause_args {
3752 c.fmt_indented(f, level + 1)?;
3753 }
3754 }
3755 }
3756 Ok(())
3757 }
3758}
3759
3760impl fmt::Display for DebugTraceTransientTask {
3761 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3762 self.fmt_indented(f, 0)
3763 }
3764}
3765
3766fn far_future() -> Instant {
3768 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3773}
3774
3775fn encode_task_data(
3787 task: TaskId,
3788 data: &TaskStorage,
3789 category: SpecificTaskDataCategory,
3790 scratch_buffer: &mut TurboBincodeBuffer,
3791) -> Result<TurboBincodeBuffer> {
3792 scratch_buffer.clear();
3793 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3794 data.encode(category, &mut encoder)?;
3795
3796 if cfg!(feature = "verify_serialization") {
3797 TaskStorage::new()
3798 .decode(
3799 category,
3800 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3801 )
3802 .with_context(|| {
3803 format!(
3804 "expected to be able to decode serialized data for '{category:?}' information \
3805 for {task}"
3806 )
3807 })?;
3808 }
3809 Ok(SmallVec::from_slice(scratch_buffer))
3810}