1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17};
18
19use anyhow::{Context, Result, bail};
20use auto_hash_map::{AutoMap, AutoSet};
21use indexmap::IndexSet;
22use parking_lot::{Condvar, Mutex};
23use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
24use smallvec::{SmallVec, smallvec};
25use tokio::time::{Duration, Instant};
26use tracing::{Span, trace_span};
27use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
28use turbo_tasks::{
29 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
30 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
31 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
32 backend::{
33 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
34 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
35 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
36 VerificationMode,
37 },
38 event::{Event, EventDescription, EventListener},
39 message_queue::TimingEvent,
40 registry::get_value_type,
41 scope::scope_and_block,
42 task_statistics::TaskStatisticsApi,
43 trace::TraceRawVcs,
44 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
45};
46
47pub use self::{
48 operation::AnyOperation,
49 storage::{SpecificTaskDataCategory, TaskDataCategory},
50};
51#[cfg(feature = "trace_task_dirty")]
52use crate::backend::operation::TaskDirtyCause;
53use crate::{
54 backend::{
55 operation::{
56 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
57 CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
58 ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
59 TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
60 is_root_node, make_task_dirty_internal, prepare_new_children,
61 },
62 storage::Storage,
63 storage_schema::{TaskStorage, TaskStorageAccessors},
64 },
65 backing_storage::BackingStorage,
66 data::{
67 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
68 InProgressState, InProgressStateInner, OutputValue, TransientTask,
69 },
70 error::TaskError,
71 utils::{
72 arc_or_owned::ArcOrOwned,
73 chunked_vec::ChunkedVec,
74 dash_map_drop_contents::drop_contents,
75 dash_map_raw_entry::{RawEntry, raw_entry},
76 ptr_eq_arc::PtrEqArc,
77 shard_amount::compute_shard_amount,
78 sharded::Sharded,
79 swap_retain,
80 },
81};
82
83const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
87
88const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
89
90static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
93 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
94 .ok()
95 .and_then(|v| v.parse::<u64>().ok())
96 .map(Duration::from_millis)
97 .unwrap_or(Duration::from_secs(2))
98});
99
100struct SnapshotRequest {
101 snapshot_requested: bool,
102 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
103}
104
105impl SnapshotRequest {
106 fn new() -> Self {
107 Self {
108 snapshot_requested: false,
109 suspended_operations: FxHashSet::default(),
110 }
111 }
112}
113
114pub enum StorageMode {
115 ReadOnly,
117 ReadWrite,
120 ReadWriteOnShutdown,
123}
124
125pub struct BackendOptions {
126 pub dependency_tracking: bool,
131
132 pub active_tracking: bool,
138
139 pub storage_mode: Option<StorageMode>,
141
142 pub num_workers: Option<usize>,
145
146 pub small_preallocation: bool,
148}
149
150impl Default for BackendOptions {
151 fn default() -> Self {
152 Self {
153 dependency_tracking: true,
154 active_tracking: true,
155 storage_mode: Some(StorageMode::ReadWrite),
156 num_workers: None,
157 small_preallocation: false,
158 }
159 }
160}
161
162pub enum TurboTasksBackendJob {
163 InitialSnapshot,
164 FollowUpSnapshot,
165}
166
167pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
168
169type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
170
171struct TurboTasksBackendInner<B: BackingStorage> {
172 options: BackendOptions,
173
174 start_time: Instant,
175
176 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
177 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
178
179 persisted_task_cache_log: Option<TaskCacheLog>,
180 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
181
182 storage: Storage,
183
184 local_is_partial: AtomicBool,
186
187 in_progress_operations: AtomicUsize,
193
194 snapshot_request: Mutex<SnapshotRequest>,
195 operations_suspended: Condvar,
199 snapshot_completed: Condvar,
202 last_snapshot: AtomicU64,
204
205 stopping: AtomicBool,
206 stopping_event: Event,
207 idle_start_event: Event,
208 idle_end_event: Event,
209 #[cfg(feature = "verify_aggregation_graph")]
210 is_idle: AtomicBool,
211
212 task_statistics: TaskStatisticsApi,
213
214 backing_storage: B,
215
216 #[cfg(feature = "verify_aggregation_graph")]
217 root_tasks: Mutex<FxHashSet<TaskId>>,
218}
219
220impl<B: BackingStorage> TurboTasksBackend<B> {
221 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
222 Self(Arc::new(TurboTasksBackendInner::new(
223 options,
224 backing_storage,
225 )))
226 }
227
228 pub fn backing_storage(&self) -> &B {
229 &self.0.backing_storage
230 }
231}
232
233impl<B: BackingStorage> TurboTasksBackendInner<B> {
234 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
235 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
236 let need_log = matches!(
237 options.storage_mode,
238 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
239 );
240 if !options.dependency_tracking {
241 options.active_tracking = false;
242 }
243 let small_preallocation = options.small_preallocation;
244 let next_task_id = backing_storage
245 .next_free_task_id()
246 .expect("Failed to get task id");
247 Self {
248 options,
249 start_time: Instant::now(),
250 persisted_task_id_factory: IdFactoryWithReuse::new(
251 next_task_id,
252 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
253 ),
254 transient_task_id_factory: IdFactoryWithReuse::new(
255 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
256 TaskId::MAX,
257 ),
258 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
259 task_cache: FxDashMap::default(),
260 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
261 storage: Storage::new(shard_amount, small_preallocation),
262 in_progress_operations: AtomicUsize::new(0),
263 snapshot_request: Mutex::new(SnapshotRequest::new()),
264 operations_suspended: Condvar::new(),
265 snapshot_completed: Condvar::new(),
266 last_snapshot: AtomicU64::new(0),
267 stopping: AtomicBool::new(false),
268 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
269 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
270 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
271 #[cfg(feature = "verify_aggregation_graph")]
272 is_idle: AtomicBool::new(false),
273 task_statistics: TaskStatisticsApi::default(),
274 backing_storage,
275 #[cfg(feature = "verify_aggregation_graph")]
276 root_tasks: Default::default(),
277 }
278 }
279
280 fn execute_context<'a>(
281 &'a self,
282 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
283 ) -> impl ExecuteContext<'a> {
284 ExecuteContextImpl::new(self, turbo_tasks)
285 }
286
287 unsafe fn execute_context_with_tx<'e, 'tx>(
291 &'e self,
292 tx: Option<&'e B::ReadTransaction<'tx>>,
293 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
294 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
295 where
296 'tx: 'e,
297 {
298 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
300 }
301
302 fn suspending_requested(&self) -> bool {
303 self.should_persist()
304 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
305 }
306
307 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
308 #[cold]
309 fn operation_suspend_point_cold<B: BackingStorage>(
310 this: &TurboTasksBackendInner<B>,
311 suspend: impl FnOnce() -> AnyOperation,
312 ) {
313 let operation = Arc::new(suspend());
314 let mut snapshot_request = this.snapshot_request.lock();
315 if snapshot_request.snapshot_requested {
316 snapshot_request
317 .suspended_operations
318 .insert(operation.clone().into());
319 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
320 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
321 if value == SNAPSHOT_REQUESTED_BIT {
322 this.operations_suspended.notify_all();
323 }
324 this.snapshot_completed
325 .wait_while(&mut snapshot_request, |snapshot_request| {
326 snapshot_request.snapshot_requested
327 });
328 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
329 snapshot_request
330 .suspended_operations
331 .remove(&operation.into());
332 }
333 }
334
335 if self.suspending_requested() {
336 operation_suspend_point_cold(self, suspend);
337 }
338 }
339
340 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
341 if !self.should_persist() {
342 return OperationGuard { backend: None };
343 }
344 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
345 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
346 let mut snapshot_request = self.snapshot_request.lock();
347 if snapshot_request.snapshot_requested {
348 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
349 if value == SNAPSHOT_REQUESTED_BIT {
350 self.operations_suspended.notify_all();
351 }
352 self.snapshot_completed
353 .wait_while(&mut snapshot_request, |snapshot_request| {
354 snapshot_request.snapshot_requested
355 });
356 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
357 }
358 }
359 OperationGuard {
360 backend: Some(self),
361 }
362 }
363
364 fn should_persist(&self) -> bool {
365 matches!(
366 self.options.storage_mode,
367 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
368 )
369 }
370
371 fn should_restore(&self) -> bool {
372 self.options.storage_mode.is_some()
373 }
374
375 fn should_track_dependencies(&self) -> bool {
376 self.options.dependency_tracking
377 }
378
379 fn should_track_activeness(&self) -> bool {
380 self.options.active_tracking
381 }
382
383 fn track_cache_hit(&self, task_type: &CachedTaskType) {
384 self.task_statistics
385 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
386 }
387
388 fn track_cache_miss(&self, task_type: &CachedTaskType) {
389 self.task_statistics
390 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
391 }
392
393 fn task_error_to_turbo_tasks_execution_error(
398 &self,
399 error: &TaskError,
400 ctx: &mut impl ExecuteContext<'_>,
401 ) -> TurboTasksExecutionError {
402 match error {
403 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
404 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
405 message: item.message.clone(),
406 source: item
407 .source
408 .as_ref()
409 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
410 })),
411 TaskError::LocalTaskContext(local_task_context) => {
412 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
413 name: local_task_context.name.clone(),
414 source: local_task_context
415 .source
416 .as_ref()
417 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
418 }))
419 }
420 TaskError::TaskChain(chain) => {
421 let task_id = chain.last().unwrap();
422 let error = {
423 let task = ctx.task(*task_id, TaskDataCategory::Meta);
424 if let Some(OutputValue::Error(error)) = task.get_output() {
425 Some(error.clone())
426 } else {
427 None
428 }
429 };
430 let error = error.map_or_else(
431 || {
432 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
434 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
435 "Error no longer available",
436 )),
437 location: None,
438 }))
439 },
440 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
441 );
442 let mut current_error = error;
443 for &task_id in chain.iter().rev() {
444 current_error =
445 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
446 task_id,
447 source: Some(current_error),
448 turbo_tasks: ctx.turbo_tasks(),
449 }));
450 }
451 current_error
452 }
453 }
454 }
455}
456
457pub(crate) struct OperationGuard<'a, B: BackingStorage> {
458 backend: Option<&'a TurboTasksBackendInner<B>>,
459}
460
461impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
462 fn drop(&mut self) {
463 if let Some(backend) = self.backend {
464 let fetch_sub = backend
465 .in_progress_operations
466 .fetch_sub(1, Ordering::AcqRel);
467 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
468 backend.operations_suspended.notify_all();
469 }
470 }
471 }
472}
473
474struct TaskExecutionCompletePrepareResult {
476 pub new_children: FxHashSet<TaskId>,
477 pub is_now_immutable: bool,
478 #[cfg(feature = "verify_determinism")]
479 pub no_output_set: bool,
480 pub new_output: Option<OutputValue>,
481 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
482}
483
484impl<B: BackingStorage> TurboTasksBackendInner<B> {
486 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
490 &'l self,
491 tx: Option<&'l B::ReadTransaction<'tx>>,
492 parent_task: Option<TaskId>,
493 child_task: TaskId,
494 task_type: Option<ArcOrOwned<CachedTaskType>>,
495 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
496 ) {
497 operation::ConnectChildOperation::run(parent_task, child_task, task_type, unsafe {
498 self.execute_context_with_tx(tx, turbo_tasks)
499 });
500 }
501
502 fn connect_child(
503 &self,
504 parent_task: Option<TaskId>,
505 child_task: TaskId,
506 task_type: Option<ArcOrOwned<CachedTaskType>>,
507 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
508 ) {
509 operation::ConnectChildOperation::run(
510 parent_task,
511 child_task,
512 task_type,
513 self.execute_context(turbo_tasks),
514 );
515 }
516
517 fn try_read_task_output(
518 self: &Arc<Self>,
519 task_id: TaskId,
520 reader: Option<TaskId>,
521 options: ReadOutputOptions,
522 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
523 ) -> Result<Result<RawVc, EventListener>> {
524 self.assert_not_persistent_calling_transient(reader, task_id, None);
525
526 let mut ctx = self.execute_context(turbo_tasks);
527 let need_reader_task = if self.should_track_dependencies()
528 && !matches!(options.tracking, ReadTracking::Untracked)
529 && reader.is_some_and(|reader_id| reader_id != task_id)
530 && let Some(reader_id) = reader
531 && reader_id != task_id
532 {
533 Some(reader_id)
534 } else {
535 None
536 };
537 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
538 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
542 (task, Some(reader))
543 } else {
544 (ctx.task(task_id, TaskDataCategory::All), None)
545 };
546
547 fn listen_to_done_event(
548 reader_description: Option<EventDescription>,
549 tracking: ReadTracking,
550 done_event: &Event,
551 ) -> EventListener {
552 done_event.listen_with_note(move || {
553 move || {
554 if let Some(reader_description) = reader_description.as_ref() {
555 format!(
556 "try_read_task_output from {} ({})",
557 reader_description, tracking
558 )
559 } else {
560 format!("try_read_task_output ({})", tracking)
561 }
562 }
563 })
564 }
565
566 fn check_in_progress(
567 task: &impl TaskGuard,
568 reader_description: Option<EventDescription>,
569 tracking: ReadTracking,
570 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
571 {
572 match task.get_in_progress() {
573 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
574 listen_to_done_event(reader_description, tracking, done_event),
575 ))),
576 Some(InProgressState::InProgress(box InProgressStateInner {
577 done_event, ..
578 })) => Some(Ok(Err(listen_to_done_event(
579 reader_description,
580 tracking,
581 done_event,
582 )))),
583 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
584 "{} was canceled",
585 task.get_task_description()
586 ))),
587 None => None,
588 }
589 }
590
591 if matches!(options.consistency, ReadConsistency::Strong) {
592 loop {
594 let aggregation_number = get_aggregation_number(&task);
595 if is_root_node(aggregation_number) {
596 break;
597 }
598 drop(task);
599 drop(reader_task);
600 {
601 let _span = tracing::trace_span!(
602 "make root node for strongly consistent read",
603 %task_id
604 )
605 .entered();
606 AggregationUpdateQueue::run(
607 AggregationUpdateJob::UpdateAggregationNumber {
608 task_id,
609 base_aggregation_number: u32::MAX,
610 distance: None,
611 },
612 &mut ctx,
613 );
614 }
615 (task, reader_task) = if let Some(reader_id) = need_reader_task {
616 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
618 (task, Some(reader))
619 } else {
620 (ctx.task(task_id, TaskDataCategory::All), None)
621 }
622 }
623
624 let is_dirty = task.is_dirty();
625
626 let has_dirty_containers = task.has_dirty_containers();
628 if has_dirty_containers || is_dirty.is_some() {
629 let activeness = task.get_activeness_mut();
630 let mut task_ids_to_schedule: Vec<_> = Vec::new();
631 let activeness = if let Some(activeness) = activeness {
633 activeness.set_active_until_clean();
637 activeness
638 } else {
639 if ctx.should_track_activeness() {
643 task_ids_to_schedule = task.dirty_containers().collect();
645 task_ids_to_schedule.push(task_id);
646 }
647 let activeness =
648 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
649 activeness.set_active_until_clean();
650 activeness
651 };
652 let listener = activeness.all_clean_event.listen_with_note(move || {
653 let this = self.clone();
654 let tt = turbo_tasks.pin();
655 move || {
656 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
657 let mut ctx = this.execute_context(tt);
658 let mut visited = FxHashSet::default();
659 fn indent(s: &str) -> String {
660 s.split_inclusive('\n')
661 .flat_map(|line: &str| [" ", line].into_iter())
662 .collect::<String>()
663 }
664 fn get_info(
665 ctx: &mut impl ExecuteContext<'_>,
666 task_id: TaskId,
667 parent_and_count: Option<(TaskId, i32)>,
668 visited: &mut FxHashSet<TaskId>,
669 ) -> String {
670 let task = ctx.task(task_id, TaskDataCategory::All);
671 let is_dirty = task.is_dirty();
672 let in_progress =
673 task.get_in_progress()
674 .map_or("not in progress", |p| match p {
675 InProgressState::InProgress(_) => "in progress",
676 InProgressState::Scheduled { .. } => "scheduled",
677 InProgressState::Canceled => "canceled",
678 });
679 let activeness = task.get_activeness().map_or_else(
680 || "not active".to_string(),
681 |activeness| format!("{activeness:?}"),
682 );
683 let aggregation_number = get_aggregation_number(&task);
684 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
685 {
686 let uppers = get_uppers(&task);
687 !uppers.contains(&parent_task_id)
688 } else {
689 false
690 };
691
692 let has_dirty_containers = task.has_dirty_containers();
694
695 let task_description = task.get_task_description();
696 let is_dirty_label = if let Some(parent_priority) = is_dirty {
697 format!(", dirty({parent_priority})")
698 } else {
699 String::new()
700 };
701 let has_dirty_containers_label = if has_dirty_containers {
702 ", dirty containers"
703 } else {
704 ""
705 };
706 let count = if let Some((_, count)) = parent_and_count {
707 format!(" {count}")
708 } else {
709 String::new()
710 };
711 let mut info = format!(
712 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
713 {in_progress}, \
714 {activeness}{is_dirty_label}{has_dirty_containers_label})",
715 );
716 let children: Vec<_> = task.dirty_containers_with_count().collect();
717 drop(task);
718
719 if missing_upper {
720 info.push_str("\n ERROR: missing upper connection");
721 }
722
723 if has_dirty_containers || !children.is_empty() {
724 writeln!(info, "\n dirty tasks:").unwrap();
725
726 for (child_task_id, count) in children {
727 let task_description = ctx
728 .task(child_task_id, TaskDataCategory::Data)
729 .get_task_description();
730 if visited.insert(child_task_id) {
731 let child_info = get_info(
732 ctx,
733 child_task_id,
734 Some((task_id, count)),
735 visited,
736 );
737 info.push_str(&indent(&child_info));
738 if !info.ends_with('\n') {
739 info.push('\n');
740 }
741 } else {
742 writeln!(
743 info,
744 " {child_task_id} {task_description} {count} \
745 (already visited)"
746 )
747 .unwrap();
748 }
749 }
750 }
751 info
752 }
753 let info = get_info(&mut ctx, task_id, None, &mut visited);
754 format!(
755 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
756 )
757 }
758 });
759 drop(reader_task);
760 drop(task);
761 if !task_ids_to_schedule.is_empty() {
762 let mut queue = AggregationUpdateQueue::new();
763 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
764 queue.execute(&mut ctx);
765 }
766
767 return Ok(Err(listener));
768 }
769 }
770
771 let reader_description = reader_task
772 .as_ref()
773 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
774 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
775 {
776 return value;
777 }
778
779 if let Some(output) = task.get_output() {
780 let result = match output {
781 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
782 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
783 OutputValue::Error(error) => Err(error.clone()),
784 };
785 if let Some(mut reader_task) = reader_task.take()
786 && options.tracking.should_track(result.is_err())
787 && (!task.immutable() || cfg!(feature = "verify_immutable"))
788 {
789 #[cfg(feature = "trace_task_output_dependencies")]
790 let _span = tracing::trace_span!(
791 "add output dependency",
792 task = %task_id,
793 dependent_task = ?reader
794 )
795 .entered();
796 let mut queue = LeafDistanceUpdateQueue::new();
797 let reader = reader.unwrap();
798 if task.add_output_dependent(reader) {
799 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
801 let reader_leaf_distance =
802 reader_task.get_leaf_distance().copied().unwrap_or_default();
803 if reader_leaf_distance.distance <= leaf_distance.distance {
804 queue.push(
805 reader,
806 leaf_distance.distance,
807 leaf_distance.max_distance_in_buffer,
808 );
809 }
810 }
811
812 drop(task);
813
814 if !reader_task.remove_outdated_output_dependencies(&task_id) {
820 let _ = reader_task.add_output_dependencies(task_id);
821 }
822 drop(reader_task);
823
824 queue.execute(&mut ctx);
825 } else {
826 drop(task);
827 }
828
829 return result.map_err(|error| {
830 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
831 .with_task_context(task_id, turbo_tasks.pin())
832 .into()
833 });
834 }
835 drop(reader_task);
836
837 let note = EventDescription::new(|| {
838 move || {
839 if let Some(reader) = reader_description.as_ref() {
840 format!("try_read_task_output (recompute) from {reader}",)
841 } else {
842 "try_read_task_output (recompute, untracked)".to_string()
843 }
844 }
845 });
846
847 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
849 TaskExecutionReason::OutputNotAvailable,
850 EventDescription::new(|| task.get_task_desc_fn()),
851 note,
852 );
853
854 let old = task.set_in_progress(in_progress_state);
857 debug_assert!(old.is_none(), "InProgress already exists");
858 ctx.schedule_task(task, TaskPriority::Initial);
859
860 Ok(Err(listener))
861 }
862
863 fn try_read_task_cell(
864 &self,
865 task_id: TaskId,
866 reader: Option<TaskId>,
867 cell: CellId,
868 options: ReadCellOptions,
869 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
870 ) -> Result<Result<TypedCellContent, EventListener>> {
871 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
872
873 fn add_cell_dependency(
874 task_id: TaskId,
875 mut task: impl TaskGuard,
876 reader: Option<TaskId>,
877 reader_task: Option<impl TaskGuard>,
878 cell: CellId,
879 key: Option<u64>,
880 ) {
881 if let Some(mut reader_task) = reader_task
882 && (!task.immutable() || cfg!(feature = "verify_immutable"))
883 {
884 let reader = reader.unwrap();
885 let _ = task.add_cell_dependents((cell, key, reader));
886 drop(task);
887
888 let target = CellRef {
894 task: task_id,
895 cell,
896 };
897 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
898 let _ = reader_task.add_cell_dependencies((target, key));
899 }
900 drop(reader_task);
901 }
902 }
903
904 let ReadCellOptions {
905 is_serializable_cell_content,
906 tracking,
907 final_read_hint,
908 } = options;
909
910 let mut ctx = self.execute_context(turbo_tasks);
911 let (mut task, reader_task) = if self.should_track_dependencies()
912 && !matches!(tracking, ReadCellTracking::Untracked)
913 && let Some(reader_id) = reader
914 && reader_id != task_id
915 {
916 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
920 (task, Some(reader))
921 } else {
922 (ctx.task(task_id, TaskDataCategory::All), None)
923 };
924
925 let content = if final_read_hint {
926 task.remove_cell_data(is_serializable_cell_content, cell)
927 } else {
928 task.get_cell_data(is_serializable_cell_content, cell)
929 };
930 if let Some(content) = content {
931 if tracking.should_track(false) {
932 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
933 }
934 return Ok(Ok(TypedCellContent(
935 cell.type_id,
936 CellContent(Some(content.reference)),
937 )));
938 }
939
940 let in_progress = task.get_in_progress();
941 if matches!(
942 in_progress,
943 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
944 ) {
945 return Ok(Err(self
946 .listen_to_cell(&mut task, task_id, &reader_task, cell)
947 .0));
948 }
949 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
950
951 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
953 let Some(max_id) = max_id else {
954 let task_desc = task.get_task_description();
955 if tracking.should_track(true) {
956 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
957 }
958 bail!(
959 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
960 );
961 };
962 if cell.index >= max_id {
963 let task_desc = task.get_task_description();
964 if tracking.should_track(true) {
965 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
966 }
967 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
968 }
969
970 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
975 drop(reader_task);
976 if !new_listener {
977 return Ok(Err(listener));
978 }
979
980 let _span = tracing::trace_span!(
981 "recomputation",
982 cell_type = get_value_type(cell.type_id).global_name,
983 cell_index = cell.index
984 )
985 .entered();
986
987 if is_cancelled {
989 bail!("{} was canceled", task.get_task_description());
990 }
991
992 let _ = task.add_scheduled(
993 TaskExecutionReason::CellNotAvailable,
994 EventDescription::new(|| task.get_task_desc_fn()),
995 );
996 ctx.schedule_task(task, TaskPriority::Initial);
997
998 Ok(Err(listener))
999 }
1000
1001 fn listen_to_cell(
1002 &self,
1003 task: &mut impl TaskGuard,
1004 task_id: TaskId,
1005 reader_task: &Option<impl TaskGuard>,
1006 cell: CellId,
1007 ) -> (EventListener, bool) {
1008 let note = || {
1009 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
1010 move || {
1011 if let Some(reader_desc) = reader_desc.as_ref() {
1012 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
1013 } else {
1014 "try_read_task_cell (in progress, untracked)".to_string()
1015 }
1016 }
1017 };
1018 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
1019 let listener = in_progress.event.listen_with_note(note);
1021 return (listener, false);
1022 }
1023 let in_progress = InProgressCellState::new(task_id, cell);
1024 let listener = in_progress.event.listen_with_note(note);
1025 let old = task.insert_in_progress_cells(cell, in_progress);
1026 debug_assert!(old.is_none(), "InProgressCell already exists");
1027 (listener, true)
1028 }
1029
1030 fn snapshot_and_persist(
1031 &self,
1032 parent_span: Option<tracing::Id>,
1033 reason: &str,
1034 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1035 ) -> Option<(Instant, bool)> {
1036 let snapshot_span =
1037 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1038 .entered();
1039 let start = Instant::now();
1040 debug_assert!(self.should_persist());
1041
1042 let suspended_operations;
1043 {
1044 let _span = tracing::info_span!("blocking").entered();
1045 let mut snapshot_request = self.snapshot_request.lock();
1046 snapshot_request.snapshot_requested = true;
1047 let active_operations = self
1048 .in_progress_operations
1049 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1050 if active_operations != 0 {
1051 self.operations_suspended
1052 .wait_while(&mut snapshot_request, |_| {
1053 self.in_progress_operations.load(Ordering::Relaxed)
1054 != SNAPSHOT_REQUESTED_BIT
1055 });
1056 }
1057 suspended_operations = snapshot_request
1058 .suspended_operations
1059 .iter()
1060 .map(|op| op.arc().clone())
1061 .collect::<Vec<_>>();
1062 }
1063 self.storage.start_snapshot();
1064 let mut persisted_task_cache_log = self
1065 .persisted_task_cache_log
1066 .as_ref()
1067 .map(|l| l.take(|i| i))
1068 .unwrap_or_default();
1069 let mut snapshot_request = self.snapshot_request.lock();
1070 snapshot_request.snapshot_requested = false;
1071 self.in_progress_operations
1072 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1073 self.snapshot_completed.notify_all();
1074 let snapshot_time = Instant::now();
1075 drop(snapshot_request);
1076
1077 #[cfg(feature = "print_cache_item_size")]
1078 #[derive(Default)]
1079 struct TaskCacheStats {
1080 data: usize,
1081 data_compressed: usize,
1082 data_count: usize,
1083 meta: usize,
1084 meta_compressed: usize,
1085 meta_count: usize,
1086 upper_count: usize,
1087 collectibles_count: usize,
1088 aggregated_collectibles_count: usize,
1089 children_count: usize,
1090 followers_count: usize,
1091 collectibles_dependents_count: usize,
1092 aggregated_dirty_containers_count: usize,
1093 output_size: usize,
1094 }
1095 #[cfg(feature = "print_cache_item_size")]
1096 impl TaskCacheStats {
1097 fn compressed_size(data: &[u8]) -> Result<usize> {
1098 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1099 data,
1100 &mut Vec::new(),
1101 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1102 )?)
1103 }
1104
1105 fn add_data(&mut self, data: &[u8]) {
1106 self.data += data.len();
1107 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1108 self.data_count += 1;
1109 }
1110
1111 fn add_meta(&mut self, data: &[u8]) {
1112 self.meta += data.len();
1113 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1114 self.meta_count += 1;
1115 }
1116
1117 fn add_counts(&mut self, storage: &TaskStorage) {
1118 let counts = storage.meta_counts();
1119 self.upper_count += counts.upper;
1120 self.collectibles_count += counts.collectibles;
1121 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1122 self.children_count += counts.children;
1123 self.followers_count += counts.followers;
1124 self.collectibles_dependents_count += counts.collectibles_dependents;
1125 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1126 if let Some(output) = storage.get_output() {
1127 use turbo_bincode::turbo_bincode_encode;
1128
1129 self.output_size += turbo_bincode_encode(&output)
1130 .map(|data| data.len())
1131 .unwrap_or(0);
1132 }
1133 }
1134 }
1135 #[cfg(feature = "print_cache_item_size")]
1136 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1137 Mutex::new(FxHashMap::default());
1138
1139 let preprocess = |task_id: TaskId, inner: &TaskStorage| {
1140 if task_id.is_transient() {
1141 return (None, None);
1142 }
1143
1144 let meta_restored = inner.flags.meta_restored();
1145 let data_restored = inner.flags.data_restored();
1146
1147 let meta = meta_restored.then(|| inner.clone_meta_snapshot());
1149 let data = data_restored.then(|| inner.clone_data_snapshot());
1150
1151 (meta, data)
1152 };
1153 let process = |task_id: TaskId,
1154 (meta, data): (Option<TaskStorage>, Option<TaskStorage>),
1155 buffer: &mut TurboBincodeBuffer| {
1156 #[cfg(feature = "print_cache_item_size")]
1157 if let Some(ref m) = meta {
1158 task_cache_stats
1159 .lock()
1160 .entry(self.get_task_name(task_id, turbo_tasks))
1161 .or_default()
1162 .add_counts(m);
1163 }
1164 (
1165 task_id,
1166 meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
1167 data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
1168 )
1169 };
1170 let process_snapshot =
1171 |task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
1172 if task_id.is_transient() {
1173 return (task_id, None, None);
1174 }
1175
1176 #[cfg(feature = "print_cache_item_size")]
1177 if inner.flags.meta_modified() {
1178 task_cache_stats
1179 .lock()
1180 .entry(self.get_task_name(task_id, turbo_tasks))
1181 .or_default()
1182 .add_counts(&inner);
1183 }
1184
1185 (
1187 task_id,
1188 inner.flags.meta_modified().then(|| {
1189 encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
1190 }),
1191 inner.flags.data_modified().then(|| {
1192 encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
1193 }),
1194 )
1195 };
1196
1197 let snapshot = self
1198 .storage
1199 .take_snapshot(&preprocess, &process, &process_snapshot);
1200
1201 let task_snapshots = snapshot
1202 .into_iter()
1203 .filter_map(|iter| {
1204 let mut iter = iter
1205 .filter_map(
1206 |(task_id, meta, data): (
1207 _,
1208 Option<Result<SmallVec<_>>>,
1209 Option<Result<SmallVec<_>>>,
1210 )| {
1211 let meta = match meta {
1212 Some(Ok(meta)) => {
1213 #[cfg(feature = "print_cache_item_size")]
1214 task_cache_stats
1215 .lock()
1216 .entry(self.get_task_name(task_id, turbo_tasks))
1217 .or_default()
1218 .add_meta(&meta);
1219 Some(meta)
1220 }
1221 None => None,
1222 Some(Err(err)) => {
1223 println!(
1224 "Serializing task {} failed (meta): {:?}",
1225 self.debug_get_task_description(task_id),
1226 err
1227 );
1228 None
1229 }
1230 };
1231 let data = match data {
1232 Some(Ok(data)) => {
1233 #[cfg(feature = "print_cache_item_size")]
1234 task_cache_stats
1235 .lock()
1236 .entry(self.get_task_name(task_id, turbo_tasks))
1237 .or_default()
1238 .add_data(&data);
1239 Some(data)
1240 }
1241 None => None,
1242 Some(Err(err)) => {
1243 println!(
1244 "Serializing task {} failed (data): {:?}",
1245 self.debug_get_task_description(task_id),
1246 err
1247 );
1248 None
1249 }
1250 };
1251 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1252 },
1253 )
1254 .peekable();
1255 iter.peek().is_some().then_some(iter)
1256 })
1257 .collect::<Vec<_>>();
1258
1259 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1260
1261 drop(snapshot_span);
1262
1263 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1264 return Some((snapshot_time, false));
1265 }
1266
1267 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1268 {
1269 if let Err(err) = self.backing_storage.save_snapshot(
1270 suspended_operations,
1271 persisted_task_cache_log,
1272 task_snapshots,
1273 ) {
1274 eprintln!("Persisting failed: {err:?}");
1275 return None;
1276 }
1277 #[cfg(feature = "print_cache_item_size")]
1278 {
1279 let mut task_cache_stats = task_cache_stats
1280 .into_inner()
1281 .into_iter()
1282 .collect::<Vec<_>>();
1283 if !task_cache_stats.is_empty() {
1284 use turbo_tasks::util::FormatBytes;
1285
1286 use crate::utils::markdown_table::print_markdown_table;
1287
1288 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1289 (stats_b.data_compressed + stats_b.meta_compressed, key_b)
1290 .cmp(&(stats_a.data_compressed + stats_a.meta_compressed, key_a))
1291 });
1292 println!(
1293 "Task cache stats: {} ({})",
1294 FormatBytes(
1295 task_cache_stats
1296 .iter()
1297 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1298 .sum::<usize>()
1299 ),
1300 FormatBytes(
1301 task_cache_stats
1302 .iter()
1303 .map(|(_, s)| s.data + s.meta)
1304 .sum::<usize>()
1305 )
1306 );
1307
1308 print_markdown_table(
1309 [
1310 "Task",
1311 " Total Size",
1312 " Data Size",
1313 " Data Count x Avg",
1314 " Data Count x Avg",
1315 " Meta Size",
1316 " Meta Count x Avg",
1317 " Meta Count x Avg",
1318 " Uppers",
1319 " Coll",
1320 " Agg Coll",
1321 " Children",
1322 " Followers",
1323 " Coll Deps",
1324 " Agg Dirty",
1325 " Output Size",
1326 ],
1327 task_cache_stats.iter(),
1328 |(task_desc, stats)| {
1329 [
1330 task_desc.to_string(),
1331 format!(
1332 " {} ({})",
1333 FormatBytes(stats.data_compressed + stats.meta_compressed),
1334 FormatBytes(stats.data + stats.meta)
1335 ),
1336 format!(
1337 " {} ({})",
1338 FormatBytes(stats.data_compressed),
1339 FormatBytes(stats.data)
1340 ),
1341 format!(" {} x", stats.data_count,),
1342 format!(
1343 "{} ({})",
1344 FormatBytes(
1345 stats
1346 .data_compressed
1347 .checked_div(stats.data_count)
1348 .unwrap_or(0)
1349 ),
1350 FormatBytes(
1351 stats.data.checked_div(stats.data_count).unwrap_or(0)
1352 ),
1353 ),
1354 format!(
1355 " {} ({})",
1356 FormatBytes(stats.meta_compressed),
1357 FormatBytes(stats.meta)
1358 ),
1359 format!(" {} x", stats.meta_count,),
1360 format!(
1361 "{} ({})",
1362 FormatBytes(
1363 stats
1364 .meta_compressed
1365 .checked_div(stats.meta_count)
1366 .unwrap_or(0)
1367 ),
1368 FormatBytes(
1369 stats.meta.checked_div(stats.meta_count).unwrap_or(0)
1370 ),
1371 ),
1372 format!(" {}", stats.upper_count),
1373 format!(" {}", stats.collectibles_count),
1374 format!(" {}", stats.aggregated_collectibles_count),
1375 format!(" {}", stats.children_count),
1376 format!(" {}", stats.followers_count),
1377 format!(" {}", stats.collectibles_dependents_count),
1378 format!(" {}", stats.aggregated_dirty_containers_count),
1379 format!(" {}", FormatBytes(stats.output_size)),
1380 ]
1381 },
1382 );
1383 }
1384 }
1385 }
1386
1387 let elapsed = start.elapsed();
1388 if elapsed > Duration::from_secs(10) {
1390 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1391 "Finished writing to filesystem cache".to_string(),
1392 elapsed,
1393 )));
1394 }
1395
1396 Some((snapshot_time, true))
1397 }
1398
1399 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1400 if self.should_restore() {
1401 let uncompleted_operations = self
1405 .backing_storage
1406 .uncompleted_operations()
1407 .expect("Failed to get uncompleted operations");
1408 if !uncompleted_operations.is_empty() {
1409 let mut ctx = self.execute_context(turbo_tasks);
1410 for op in uncompleted_operations {
1411 op.execute(&mut ctx);
1412 }
1413 }
1414 }
1415
1416 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1419 let _span = trace_span!("persisting background job").entered();
1421 let _span = tracing::info_span!("thread").entered();
1422 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1423 }
1424 }
1425
1426 fn stopping(&self) {
1427 self.stopping.store(true, Ordering::Release);
1428 self.stopping_event.notify(usize::MAX);
1429 }
1430
1431 #[allow(unused_variables)]
1432 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1433 #[cfg(feature = "verify_aggregation_graph")]
1434 {
1435 self.is_idle.store(false, Ordering::Release);
1436 self.verify_aggregation_graph(turbo_tasks, false);
1437 }
1438 if self.should_persist() {
1439 self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1440 }
1441 drop_contents(&self.task_cache);
1442 self.storage.drop_contents();
1443 if let Err(err) = self.backing_storage.shutdown() {
1444 println!("Shutting down failed: {err}");
1445 }
1446 }
1447
1448 #[allow(unused_variables)]
1449 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1450 self.idle_start_event.notify(usize::MAX);
1451
1452 #[cfg(feature = "verify_aggregation_graph")]
1453 {
1454 use tokio::select;
1455
1456 self.is_idle.store(true, Ordering::Release);
1457 let this = self.clone();
1458 let turbo_tasks = turbo_tasks.pin();
1459 tokio::task::spawn(async move {
1460 select! {
1461 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1462 }
1464 _ = this.idle_end_event.listen() => {
1465 return;
1466 }
1467 }
1468 if !this.is_idle.load(Ordering::Relaxed) {
1469 return;
1470 }
1471 this.verify_aggregation_graph(&*turbo_tasks, true);
1472 });
1473 }
1474 }
1475
1476 fn idle_end(&self) {
1477 #[cfg(feature = "verify_aggregation_graph")]
1478 self.is_idle.store(false, Ordering::Release);
1479 self.idle_end_event.notify(usize::MAX);
1480 }
1481
1482 fn get_or_create_persistent_task(
1483 &self,
1484 task_type: CachedTaskType,
1485 parent_task: Option<TaskId>,
1486 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1487 ) -> TaskId {
1488 let is_root = task_type.native_fn.is_root;
1489
1490 if let Some(task_id) = self.task_cache.get(&task_type) {
1492 let task_id = *task_id;
1493 self.track_cache_hit(&task_type);
1494 self.connect_child(
1495 parent_task,
1496 task_id,
1497 Some(ArcOrOwned::Owned(task_type)),
1498 turbo_tasks,
1499 );
1500 return task_id;
1501 }
1502
1503 let check_backing_storage =
1504 self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1505 let tx = check_backing_storage
1506 .then(|| self.backing_storage.start_read_transaction())
1507 .flatten();
1508 let mut is_new = false;
1509 let (task_id, task_type) = {
1510 if let Some(task_id) = unsafe {
1512 check_backing_storage
1513 .then(|| {
1514 self.backing_storage
1515 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1516 .expect("Failed to lookup task id")
1517 })
1518 .flatten()
1519 } {
1520 self.track_cache_hit(&task_type);
1523 let task_type = match raw_entry(&self.task_cache, &task_type) {
1524 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1525 RawEntry::Vacant(e) => {
1526 let task_type = Arc::new(task_type);
1527 e.insert(task_type.clone(), task_id);
1528 ArcOrOwned::Arc(task_type)
1529 }
1530 };
1531 (task_id, task_type)
1532 } else {
1533 let (task_id, mut task_type) = match raw_entry(&self.task_cache, &task_type) {
1536 RawEntry::Occupied(e) => {
1537 let task_id = *e.get();
1538 drop(e);
1539 self.track_cache_hit(&task_type);
1540 (task_id, ArcOrOwned::Owned(task_type))
1541 }
1542 RawEntry::Vacant(e) => {
1543 let task_type = Arc::new(task_type);
1544 let task_id = self.persisted_task_id_factory.get();
1545 e.insert(task_type.clone(), task_id);
1546 self.track_cache_miss(&task_type);
1547 is_new = true;
1548 (task_id, ArcOrOwned::Arc(task_type))
1549 }
1550 };
1551 if let Some(log) = &self.persisted_task_cache_log {
1552 let task_type_arc: Arc<CachedTaskType> = Arc::from(task_type);
1553 log.lock(task_id).push((task_type_arc.clone(), task_id));
1554 task_type = ArcOrOwned::Arc(task_type_arc);
1555 }
1556 (task_id, task_type)
1557 }
1558 };
1559
1560 if is_new && is_root {
1561 let mut ctx = self.execute_context(turbo_tasks);
1562 AggregationUpdateQueue::run(
1563 AggregationUpdateJob::UpdateAggregationNumber {
1564 task_id,
1565 base_aggregation_number: u32::MAX,
1566 distance: None,
1567 },
1568 &mut ctx,
1569 );
1570 }
1571
1572 unsafe {
1574 self.connect_child_with_tx(
1575 tx.as_ref(),
1576 parent_task,
1577 task_id,
1578 Some(task_type),
1579 turbo_tasks,
1580 )
1581 };
1582
1583 task_id
1584 }
1585
1586 fn get_or_create_transient_task(
1587 &self,
1588 task_type: CachedTaskType,
1589 parent_task: Option<TaskId>,
1590 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1591 ) -> TaskId {
1592 let is_root = task_type.native_fn.is_root;
1593
1594 if let Some(parent_task) = parent_task
1595 && !parent_task.is_transient()
1596 {
1597 self.panic_persistent_calling_transient(
1598 self.debug_get_task_description(parent_task),
1599 Some(&task_type),
1600 None,
1601 );
1602 }
1603 if let Some(task_id) = self.task_cache.get(&task_type) {
1605 let task_id = *task_id;
1606 self.track_cache_hit(&task_type);
1607 self.connect_child(
1608 parent_task,
1609 task_id,
1610 Some(ArcOrOwned::Owned(task_type)),
1611 turbo_tasks,
1612 );
1613 return task_id;
1614 }
1615 match raw_entry(&self.task_cache, &task_type) {
1617 RawEntry::Occupied(e) => {
1618 let task_id = *e.get();
1619 drop(e);
1620 self.track_cache_hit(&task_type);
1621 self.connect_child(
1622 parent_task,
1623 task_id,
1624 Some(ArcOrOwned::Owned(task_type)),
1625 turbo_tasks,
1626 );
1627 task_id
1628 }
1629 RawEntry::Vacant(e) => {
1630 let task_type = Arc::new(task_type);
1631 let task_id = self.transient_task_id_factory.get();
1632 e.insert(task_type.clone(), task_id);
1633 self.track_cache_miss(&task_type);
1634
1635 if is_root {
1636 let mut ctx = self.execute_context(turbo_tasks);
1637 AggregationUpdateQueue::run(
1638 AggregationUpdateJob::UpdateAggregationNumber {
1639 task_id,
1640 base_aggregation_number: u32::MAX,
1641 distance: None,
1642 },
1643 &mut ctx,
1644 );
1645 }
1646
1647 self.connect_child(
1648 parent_task,
1649 task_id,
1650 Some(ArcOrOwned::Arc(task_type)),
1651 turbo_tasks,
1652 );
1653
1654 task_id
1655 }
1656 }
1657 }
1658
1659 fn debug_trace_transient_task(
1662 &self,
1663 task_type: &CachedTaskType,
1664 cell_id: Option<CellId>,
1665 ) -> DebugTraceTransientTask {
1666 fn inner_id(
1669 backend: &TurboTasksBackendInner<impl BackingStorage>,
1670 task_id: TaskId,
1671 cell_type_id: Option<ValueTypeId>,
1672 visited_set: &mut FxHashSet<TaskId>,
1673 ) -> DebugTraceTransientTask {
1674 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1675 if visited_set.contains(&task_id) {
1676 let task_name = task_type.get_name();
1677 DebugTraceTransientTask::Collapsed {
1678 task_name,
1679 cell_type_id,
1680 }
1681 } else {
1682 inner_cached(backend, &task_type, cell_type_id, visited_set)
1683 }
1684 } else {
1685 DebugTraceTransientTask::Uncached { cell_type_id }
1686 }
1687 }
1688 fn inner_cached(
1689 backend: &TurboTasksBackendInner<impl BackingStorage>,
1690 task_type: &CachedTaskType,
1691 cell_type_id: Option<ValueTypeId>,
1692 visited_set: &mut FxHashSet<TaskId>,
1693 ) -> DebugTraceTransientTask {
1694 let task_name = task_type.get_name();
1695
1696 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1697 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1698 return None;
1702 };
1703 if task_id.is_transient() {
1704 Some(Box::new(inner_id(
1705 backend,
1706 task_id,
1707 cause_self_raw_vc.try_get_type_id(),
1708 visited_set,
1709 )))
1710 } else {
1711 None
1712 }
1713 });
1714 let cause_args = task_type
1715 .arg
1716 .get_raw_vcs()
1717 .into_iter()
1718 .filter_map(|raw_vc| {
1719 let Some(task_id) = raw_vc.try_get_task_id() else {
1720 return None;
1722 };
1723 if !task_id.is_transient() {
1724 return None;
1725 }
1726 Some((task_id, raw_vc.try_get_type_id()))
1727 })
1728 .collect::<IndexSet<_>>() .into_iter()
1730 .map(|(task_id, cell_type_id)| {
1731 inner_id(backend, task_id, cell_type_id, visited_set)
1732 })
1733 .collect();
1734
1735 DebugTraceTransientTask::Cached {
1736 task_name,
1737 cell_type_id,
1738 cause_self,
1739 cause_args,
1740 }
1741 }
1742 inner_cached(
1743 self,
1744 task_type,
1745 cell_id.map(|c| c.type_id),
1746 &mut FxHashSet::default(),
1747 )
1748 }
1749
1750 fn invalidate_task(
1751 &self,
1752 task_id: TaskId,
1753 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1754 ) {
1755 if !self.should_track_dependencies() {
1756 panic!("Dependency tracking is disabled so invalidation is not allowed");
1757 }
1758 operation::InvalidateOperation::run(
1759 smallvec![task_id],
1760 #[cfg(feature = "trace_task_dirty")]
1761 TaskDirtyCause::Invalidator,
1762 self.execute_context(turbo_tasks),
1763 );
1764 }
1765
1766 fn invalidate_tasks(
1767 &self,
1768 tasks: &[TaskId],
1769 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1770 ) {
1771 if !self.should_track_dependencies() {
1772 panic!("Dependency tracking is disabled so invalidation is not allowed");
1773 }
1774 operation::InvalidateOperation::run(
1775 tasks.iter().copied().collect(),
1776 #[cfg(feature = "trace_task_dirty")]
1777 TaskDirtyCause::Unknown,
1778 self.execute_context(turbo_tasks),
1779 );
1780 }
1781
1782 fn invalidate_tasks_set(
1783 &self,
1784 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1785 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1786 ) {
1787 if !self.should_track_dependencies() {
1788 panic!("Dependency tracking is disabled so invalidation is not allowed");
1789 }
1790 operation::InvalidateOperation::run(
1791 tasks.iter().copied().collect(),
1792 #[cfg(feature = "trace_task_dirty")]
1793 TaskDirtyCause::Unknown,
1794 self.execute_context(turbo_tasks),
1795 );
1796 }
1797
1798 fn invalidate_serialization(
1799 &self,
1800 task_id: TaskId,
1801 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1802 ) {
1803 if task_id.is_transient() {
1804 return;
1805 }
1806 let mut ctx = self.execute_context(turbo_tasks);
1807 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1808 task.invalidate_serialization();
1809 }
1810
1811 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1812 let task = self.storage.access_mut(task_id);
1813 if let Some(value) = task.get_persistent_task_type() {
1814 format!("{task_id:?} {}", value)
1815 } else if let Some(value) = task.get_transient_task_type() {
1816 format!("{task_id:?} {}", value)
1817 } else {
1818 format!("{task_id:?} unknown")
1819 }
1820 }
1821
1822 fn get_task_name(
1823 &self,
1824 task_id: TaskId,
1825 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1826 ) -> String {
1827 let mut ctx = self.execute_context(turbo_tasks);
1828 let task = ctx.task(task_id, TaskDataCategory::Data);
1829 if let Some(value) = task.get_persistent_task_type() {
1830 value.to_string()
1831 } else if let Some(value) = task.get_transient_task_type() {
1832 value.to_string()
1833 } else {
1834 "unknown".to_string()
1835 }
1836 }
1837
1838 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1839 let task = self.storage.access_mut(task_id);
1840 task.get_persistent_task_type().cloned()
1841 }
1842
1843 fn task_execution_canceled(
1844 &self,
1845 task_id: TaskId,
1846 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1847 ) {
1848 let mut ctx = self.execute_context(turbo_tasks);
1849 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1850 if let Some(in_progress) = task.take_in_progress() {
1851 match in_progress {
1852 InProgressState::Scheduled {
1853 done_event,
1854 reason: _,
1855 } => done_event.notify(usize::MAX),
1856 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1857 done_event.notify(usize::MAX)
1858 }
1859 InProgressState::Canceled => {}
1860 }
1861 }
1862 let old = task.set_in_progress(InProgressState::Canceled);
1863 debug_assert!(old.is_none(), "InProgress already exists");
1864 }
1865
1866 fn try_start_task_execution(
1867 &self,
1868 task_id: TaskId,
1869 priority: TaskPriority,
1870 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1871 ) -> Option<TaskExecutionSpec<'_>> {
1872 let execution_reason;
1873 let task_type;
1874 {
1875 let mut ctx = self.execute_context(turbo_tasks);
1876 let mut task = ctx.task(task_id, TaskDataCategory::All);
1877 task_type = task.get_task_type().to_owned();
1878 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1879 if let Some(tasks) = task.prefetch() {
1880 drop(task);
1881 ctx.prepare_tasks(tasks);
1882 task = ctx.task(task_id, TaskDataCategory::All);
1883 }
1884 let in_progress = task.take_in_progress()?;
1885 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1886 let old = task.set_in_progress(in_progress);
1887 debug_assert!(old.is_none(), "InProgress already exists");
1888 return None;
1889 };
1890 execution_reason = reason;
1891 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1892 InProgressStateInner {
1893 stale: false,
1894 once_task,
1895 done_event,
1896 session_dependent: false,
1897 marked_as_completed: false,
1898 new_children: Default::default(),
1899 },
1900 )));
1901 debug_assert!(old.is_none(), "InProgress already exists");
1902
1903 enum Collectible {
1905 Current(CollectibleRef, i32),
1906 Outdated(CollectibleRef),
1907 }
1908 let collectibles = task
1909 .iter_collectibles()
1910 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1911 .chain(
1912 task.iter_outdated_collectibles()
1913 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1914 )
1915 .collect::<Vec<_>>();
1916 for collectible in collectibles {
1917 match collectible {
1918 Collectible::Current(collectible, value) => {
1919 let _ = task.insert_outdated_collectible(collectible, value);
1920 }
1921 Collectible::Outdated(collectible) => {
1922 if task
1923 .collectibles()
1924 .is_none_or(|m| m.get(&collectible).is_none())
1925 {
1926 task.remove_outdated_collectibles(&collectible);
1927 }
1928 }
1929 }
1930 }
1931
1932 if self.should_track_dependencies() {
1933 let cell_dependencies = task.iter_cell_dependencies().collect();
1935 task.set_outdated_cell_dependencies(cell_dependencies);
1936
1937 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1938 task.set_outdated_output_dependencies(outdated_output_dependencies);
1939 }
1940 }
1941
1942 let (span, future) = match task_type {
1943 TaskType::Cached(task_type) => {
1944 let CachedTaskType {
1945 native_fn,
1946 this,
1947 arg,
1948 } = &*task_type;
1949 (
1950 native_fn.span(task_id.persistence(), execution_reason, priority),
1951 native_fn.execute(*this, &**arg),
1952 )
1953 }
1954 TaskType::Transient(task_type) => {
1955 let span = tracing::trace_span!("turbo_tasks::root_task");
1956 let future = match &*task_type {
1957 TransientTask::Root(f) => f(),
1958 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1959 };
1960 (span, future)
1961 }
1962 };
1963 Some(TaskExecutionSpec { future, span })
1964 }
1965
1966 fn task_execution_completed(
1967 &self,
1968 task_id: TaskId,
1969 result: Result<RawVc, TurboTasksExecutionError>,
1970 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1971 #[cfg(feature = "verify_determinism")] stateful: bool,
1972 has_invalidator: bool,
1973 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1974 ) -> bool {
1975 #[cfg(not(feature = "trace_task_details"))]
1990 let span = tracing::trace_span!(
1991 "task execution completed",
1992 new_children = tracing::field::Empty
1993 )
1994 .entered();
1995 #[cfg(feature = "trace_task_details")]
1996 let span = tracing::trace_span!(
1997 "task execution completed",
1998 task_id = display(task_id),
1999 result = match result.as_ref() {
2000 Ok(value) => display(either::Either::Left(value)),
2001 Err(err) => display(either::Either::Right(err)),
2002 },
2003 new_children = tracing::field::Empty,
2004 immutable = tracing::field::Empty,
2005 new_output = tracing::field::Empty,
2006 output_dependents = tracing::field::Empty,
2007 stale = tracing::field::Empty,
2008 )
2009 .entered();
2010
2011 let is_error = result.is_err();
2012
2013 let mut ctx = self.execute_context(turbo_tasks);
2014
2015 let Some(TaskExecutionCompletePrepareResult {
2016 new_children,
2017 is_now_immutable,
2018 #[cfg(feature = "verify_determinism")]
2019 no_output_set,
2020 new_output,
2021 output_dependent_tasks,
2022 }) = self.task_execution_completed_prepare(
2023 &mut ctx,
2024 #[cfg(feature = "trace_task_details")]
2025 &span,
2026 task_id,
2027 result,
2028 cell_counters,
2029 #[cfg(feature = "verify_determinism")]
2030 stateful,
2031 has_invalidator,
2032 )
2033 else {
2034 #[cfg(feature = "trace_task_details")]
2036 span.record("stale", "prepare");
2037 return true;
2038 };
2039
2040 #[cfg(feature = "trace_task_details")]
2041 span.record("new_output", new_output.is_some());
2042 #[cfg(feature = "trace_task_details")]
2043 span.record("output_dependents", output_dependent_tasks.len());
2044
2045 if !output_dependent_tasks.is_empty() {
2050 self.task_execution_completed_invalidate_output_dependent(
2051 &mut ctx,
2052 task_id,
2053 output_dependent_tasks,
2054 );
2055 }
2056
2057 let has_new_children = !new_children.is_empty();
2058 span.record("new_children", new_children.len());
2059
2060 if has_new_children {
2061 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2062 }
2063
2064 if has_new_children
2065 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2066 {
2067 #[cfg(feature = "trace_task_details")]
2069 span.record("stale", "connect");
2070 return true;
2071 }
2072
2073 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2074 &mut ctx,
2075 task_id,
2076 #[cfg(feature = "verify_determinism")]
2077 no_output_set,
2078 new_output,
2079 is_now_immutable,
2080 );
2081 if stale {
2082 #[cfg(feature = "trace_task_details")]
2084 span.record("stale", "finish");
2085 return true;
2086 }
2087
2088 let removed_data =
2089 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2090
2091 drop(removed_data);
2093 drop(in_progress_cells);
2094
2095 false
2096 }
2097
2098 fn task_execution_completed_prepare(
2099 &self,
2100 ctx: &mut impl ExecuteContext<'_>,
2101 #[cfg(feature = "trace_task_details")] span: &Span,
2102 task_id: TaskId,
2103 result: Result<RawVc, TurboTasksExecutionError>,
2104 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2105 #[cfg(feature = "verify_determinism")] stateful: bool,
2106 has_invalidator: bool,
2107 ) -> Option<TaskExecutionCompletePrepareResult> {
2108 let mut task = ctx.task(task_id, TaskDataCategory::All);
2109 let Some(in_progress) = task.get_in_progress_mut() else {
2110 panic!("Task execution completed, but task is not in progress: {task:#?}");
2111 };
2112 if matches!(in_progress, InProgressState::Canceled) {
2113 return Some(TaskExecutionCompletePrepareResult {
2114 new_children: Default::default(),
2115 is_now_immutable: false,
2116 #[cfg(feature = "verify_determinism")]
2117 no_output_set: false,
2118 new_output: None,
2119 output_dependent_tasks: Default::default(),
2120 });
2121 }
2122 let &mut InProgressState::InProgress(box InProgressStateInner {
2123 stale,
2124 ref mut new_children,
2125 session_dependent,
2126 once_task: is_once_task,
2127 ..
2128 }) = in_progress
2129 else {
2130 panic!("Task execution completed, but task is not in progress: {task:#?}");
2131 };
2132
2133 #[cfg(not(feature = "no_fast_stale"))]
2135 if stale && !is_once_task {
2136 let Some(InProgressState::InProgress(box InProgressStateInner {
2137 done_event,
2138 mut new_children,
2139 ..
2140 })) = task.take_in_progress()
2141 else {
2142 unreachable!();
2143 };
2144 let old = task.set_in_progress(InProgressState::Scheduled {
2145 done_event,
2146 reason: TaskExecutionReason::Stale,
2147 });
2148 debug_assert!(old.is_none(), "InProgress already exists");
2149 for task in task.iter_children() {
2152 new_children.remove(&task);
2153 }
2154 drop(task);
2155
2156 AggregationUpdateQueue::run(
2159 AggregationUpdateJob::DecreaseActiveCounts {
2160 task_ids: new_children.into_iter().collect(),
2161 },
2162 ctx,
2163 );
2164 return None;
2165 }
2166
2167 let mut new_children = take(new_children);
2169
2170 #[cfg(feature = "verify_determinism")]
2172 if stateful {
2173 task.set_stateful(true);
2174 }
2175
2176 if has_invalidator {
2178 task.set_invalidator(true);
2179 }
2180
2181 let old_counters: FxHashMap<_, _> = task
2183 .iter_cell_type_max_index()
2184 .map(|(&k, &v)| (k, v))
2185 .collect();
2186 let mut counters_to_remove = old_counters.clone();
2187
2188 for (&cell_type, &max_index) in cell_counters.iter() {
2189 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2190 if old_max_index != max_index {
2191 task.insert_cell_type_max_index(cell_type, max_index);
2192 }
2193 } else {
2194 task.insert_cell_type_max_index(cell_type, max_index);
2195 }
2196 }
2197 for (cell_type, _) in counters_to_remove {
2198 task.remove_cell_type_max_index(&cell_type);
2199 }
2200
2201 let mut queue = AggregationUpdateQueue::new();
2202
2203 let mut old_edges = Vec::new();
2204
2205 let has_children = !new_children.is_empty();
2206 let is_immutable = task.immutable();
2207 let task_dependencies_for_immutable =
2208 if !is_immutable
2210 && !session_dependent
2212 && !task.invalidator()
2214 && task.is_collectibles_dependencies_empty()
2216 {
2217 Some(
2218 task.iter_output_dependencies()
2220 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2221 .collect::<FxHashSet<_>>(),
2222 )
2223 } else {
2224 None
2225 };
2226
2227 if has_children {
2228 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2230
2231 old_edges.extend(
2233 task.iter_children()
2234 .filter(|task| !new_children.remove(task))
2235 .map(OutdatedEdge::Child),
2236 );
2237 } else {
2238 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2239 }
2240
2241 old_edges.extend(
2242 task.iter_outdated_collectibles()
2243 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2244 );
2245
2246 if self.should_track_dependencies() {
2247 old_edges.extend(
2254 task.iter_outdated_cell_dependencies()
2255 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2256 );
2257 old_edges.extend(
2258 task.iter_outdated_output_dependencies()
2259 .map(OutdatedEdge::OutputDependency),
2260 );
2261 }
2262
2263 let current_output = task.get_output();
2265 #[cfg(feature = "verify_determinism")]
2266 let no_output_set = current_output.is_none();
2267 let new_output = match result {
2268 Ok(RawVc::TaskOutput(output_task_id)) => {
2269 if let Some(OutputValue::Output(current_task_id)) = current_output
2270 && *current_task_id == output_task_id
2271 {
2272 None
2273 } else {
2274 Some(OutputValue::Output(output_task_id))
2275 }
2276 }
2277 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2278 if let Some(OutputValue::Cell(CellRef {
2279 task: current_task_id,
2280 cell: current_cell,
2281 })) = current_output
2282 && *current_task_id == output_task_id
2283 && *current_cell == cell
2284 {
2285 None
2286 } else {
2287 Some(OutputValue::Cell(CellRef {
2288 task: output_task_id,
2289 cell,
2290 }))
2291 }
2292 }
2293 Ok(RawVc::LocalOutput(..)) => {
2294 panic!("Non-local tasks must not return a local Vc");
2295 }
2296 Err(err) => {
2297 if let Some(OutputValue::Error(old_error)) = current_output
2298 && **old_error == err
2299 {
2300 None
2301 } else {
2302 Some(OutputValue::Error(Arc::new((&err).into())))
2303 }
2304 }
2305 };
2306 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2307 if new_output.is_some() && ctx.should_track_dependencies() {
2309 output_dependent_tasks = task.iter_output_dependent().collect();
2310 }
2311
2312 drop(task);
2313
2314 let mut is_now_immutable = false;
2316 if let Some(dependencies) = task_dependencies_for_immutable
2317 && dependencies
2318 .iter()
2319 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2320 {
2321 is_now_immutable = true;
2322 }
2323 #[cfg(feature = "trace_task_details")]
2324 span.record("immutable", is_immutable || is_now_immutable);
2325
2326 if !queue.is_empty() || !old_edges.is_empty() {
2327 #[cfg(feature = "trace_task_completion")]
2328 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2329 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2333 }
2334
2335 Some(TaskExecutionCompletePrepareResult {
2336 new_children,
2337 is_now_immutable,
2338 #[cfg(feature = "verify_determinism")]
2339 no_output_set,
2340 new_output,
2341 output_dependent_tasks,
2342 })
2343 }
2344
2345 fn task_execution_completed_invalidate_output_dependent(
2346 &self,
2347 ctx: &mut impl ExecuteContext<'_>,
2348 task_id: TaskId,
2349 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2350 ) {
2351 debug_assert!(!output_dependent_tasks.is_empty());
2352
2353 if output_dependent_tasks.len() > 1 {
2354 ctx.prepare_tasks(
2355 output_dependent_tasks
2356 .iter()
2357 .map(|&id| (id, TaskDataCategory::All)),
2358 );
2359 }
2360
2361 fn process_output_dependents(
2362 ctx: &mut impl ExecuteContext<'_>,
2363 task_id: TaskId,
2364 dependent_task_id: TaskId,
2365 queue: &mut AggregationUpdateQueue,
2366 ) {
2367 #[cfg(feature = "trace_task_output_dependencies")]
2368 let span = tracing::trace_span!(
2369 "invalidate output dependency",
2370 task = %task_id,
2371 dependent_task = %dependent_task_id,
2372 result = tracing::field::Empty,
2373 )
2374 .entered();
2375 let mut make_stale = true;
2376 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2377 let transient_task_type = dependent.get_transient_task_type();
2378 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2379 #[cfg(feature = "trace_task_output_dependencies")]
2381 span.record("result", "once task");
2382 return;
2383 }
2384 if dependent.outdated_output_dependencies_contains(&task_id) {
2385 #[cfg(feature = "trace_task_output_dependencies")]
2386 span.record("result", "outdated dependency");
2387 make_stale = false;
2392 } else if !dependent.output_dependencies_contains(&task_id) {
2393 #[cfg(feature = "trace_task_output_dependencies")]
2396 span.record("result", "no backward dependency");
2397 return;
2398 }
2399 make_task_dirty_internal(
2400 dependent,
2401 dependent_task_id,
2402 make_stale,
2403 #[cfg(feature = "trace_task_dirty")]
2404 TaskDirtyCause::OutputChange { task_id },
2405 queue,
2406 ctx,
2407 );
2408 #[cfg(feature = "trace_task_output_dependencies")]
2409 span.record("result", "marked dirty");
2410 }
2411
2412 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2413 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2414 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2415 let _ = scope_and_block(chunks.len(), |scope| {
2416 for chunk in chunks {
2417 let child_ctx = ctx.child_context();
2418 scope.spawn(move || {
2419 let mut ctx = child_ctx.create();
2420 let mut queue = AggregationUpdateQueue::new();
2421 for dependent_task_id in chunk {
2422 process_output_dependents(
2423 &mut ctx,
2424 task_id,
2425 dependent_task_id,
2426 &mut queue,
2427 )
2428 }
2429 queue.execute(&mut ctx);
2430 });
2431 }
2432 });
2433 } else {
2434 let mut queue = AggregationUpdateQueue::new();
2435 for dependent_task_id in output_dependent_tasks {
2436 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2437 }
2438 queue.execute(ctx);
2439 }
2440 }
2441
2442 fn task_execution_completed_unfinished_children_dirty(
2443 &self,
2444 ctx: &mut impl ExecuteContext<'_>,
2445 new_children: &FxHashSet<TaskId>,
2446 ) {
2447 debug_assert!(!new_children.is_empty());
2448
2449 let mut queue = AggregationUpdateQueue::new();
2450 ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2451 if !child_task.has_output() {
2452 let child_id = child_task.id();
2453 make_task_dirty_internal(
2454 child_task,
2455 child_id,
2456 false,
2457 #[cfg(feature = "trace_task_dirty")]
2458 TaskDirtyCause::InitialDirty,
2459 &mut queue,
2460 ctx,
2461 );
2462 }
2463 });
2464
2465 queue.execute(ctx);
2466 }
2467
2468 fn task_execution_completed_connect(
2469 &self,
2470 ctx: &mut impl ExecuteContext<'_>,
2471 task_id: TaskId,
2472 new_children: FxHashSet<TaskId>,
2473 ) -> bool {
2474 debug_assert!(!new_children.is_empty());
2475
2476 let mut task = ctx.task(task_id, TaskDataCategory::All);
2477 let Some(in_progress) = task.get_in_progress() else {
2478 panic!("Task execution completed, but task is not in progress: {task:#?}");
2479 };
2480 if matches!(in_progress, InProgressState::Canceled) {
2481 return false;
2483 }
2484 let InProgressState::InProgress(box InProgressStateInner {
2485 #[cfg(not(feature = "no_fast_stale"))]
2486 stale,
2487 once_task: is_once_task,
2488 ..
2489 }) = in_progress
2490 else {
2491 panic!("Task execution completed, but task is not in progress: {task:#?}");
2492 };
2493
2494 #[cfg(not(feature = "no_fast_stale"))]
2496 if *stale && !is_once_task {
2497 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2498 task.take_in_progress()
2499 else {
2500 unreachable!();
2501 };
2502 let old = task.set_in_progress(InProgressState::Scheduled {
2503 done_event,
2504 reason: TaskExecutionReason::Stale,
2505 });
2506 debug_assert!(old.is_none(), "InProgress already exists");
2507 drop(task);
2508
2509 AggregationUpdateQueue::run(
2512 AggregationUpdateJob::DecreaseActiveCounts {
2513 task_ids: new_children.into_iter().collect(),
2514 },
2515 ctx,
2516 );
2517 return true;
2518 }
2519
2520 let has_active_count = ctx.should_track_activeness()
2521 && task
2522 .get_activeness()
2523 .is_some_and(|activeness| activeness.active_counter > 0);
2524 connect_children(
2525 ctx,
2526 task_id,
2527 task,
2528 new_children,
2529 has_active_count,
2530 ctx.should_track_activeness(),
2531 );
2532
2533 false
2534 }
2535
2536 fn task_execution_completed_finish(
2537 &self,
2538 ctx: &mut impl ExecuteContext<'_>,
2539 task_id: TaskId,
2540 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2541 new_output: Option<OutputValue>,
2542 is_now_immutable: bool,
2543 ) -> (
2544 bool,
2545 Option<
2546 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2547 >,
2548 ) {
2549 let mut task = ctx.task(task_id, TaskDataCategory::All);
2550 let Some(in_progress) = task.take_in_progress() else {
2551 panic!("Task execution completed, but task is not in progress: {task:#?}");
2552 };
2553 if matches!(in_progress, InProgressState::Canceled) {
2554 return (false, None);
2556 }
2557 let InProgressState::InProgress(box InProgressStateInner {
2558 done_event,
2559 once_task: is_once_task,
2560 stale,
2561 session_dependent,
2562 marked_as_completed: _,
2563 new_children,
2564 }) = in_progress
2565 else {
2566 panic!("Task execution completed, but task is not in progress: {task:#?}");
2567 };
2568 debug_assert!(new_children.is_empty());
2569
2570 if stale && !is_once_task {
2572 let old = task.set_in_progress(InProgressState::Scheduled {
2573 done_event,
2574 reason: TaskExecutionReason::Stale,
2575 });
2576 debug_assert!(old.is_none(), "InProgress already exists");
2577 return (true, None);
2578 }
2579
2580 let mut old_content = None;
2582 if let Some(value) = new_output {
2583 old_content = task.set_output(value);
2584 }
2585
2586 if is_now_immutable {
2589 task.set_immutable(true);
2590 }
2591
2592 let in_progress_cells = task.take_in_progress_cells();
2594 if let Some(ref cells) = in_progress_cells {
2595 for state in cells.values() {
2596 state.event.notify(usize::MAX);
2597 }
2598 }
2599
2600 let old_dirtyness = task.get_dirty().cloned();
2602 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2603 None => (false, false),
2604 Some(Dirtyness::Dirty(_)) => (true, false),
2605 Some(Dirtyness::SessionDependent) => {
2606 let clean_in_current_session = task.current_session_clean();
2607 (true, clean_in_current_session)
2608 }
2609 };
2610
2611 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2613 (Some(Dirtyness::SessionDependent), true, true)
2614 } else {
2615 (None, false, false)
2616 };
2617
2618 let dirty_changed = old_dirtyness != new_dirtyness;
2620 if dirty_changed {
2621 if let Some(value) = new_dirtyness {
2622 task.set_dirty(value);
2623 } else if old_dirtyness.is_some() {
2624 task.take_dirty();
2625 }
2626 }
2627 if old_current_session_self_clean != new_current_session_self_clean {
2628 if new_current_session_self_clean {
2629 task.set_current_session_clean(true);
2630 } else if old_current_session_self_clean {
2631 task.set_current_session_clean(false);
2632 }
2633 }
2634
2635 let data_update = if old_self_dirty != new_self_dirty
2637 || old_current_session_self_clean != new_current_session_self_clean
2638 {
2639 let dirty_container_count = task
2640 .get_aggregated_dirty_container_count()
2641 .cloned()
2642 .unwrap_or_default();
2643 let current_session_clean_container_count = task
2644 .get_aggregated_current_session_clean_container_count()
2645 .copied()
2646 .unwrap_or_default();
2647 let result = ComputeDirtyAndCleanUpdate {
2648 old_dirty_container_count: dirty_container_count,
2649 new_dirty_container_count: dirty_container_count,
2650 old_current_session_clean_container_count: current_session_clean_container_count,
2651 new_current_session_clean_container_count: current_session_clean_container_count,
2652 old_self_dirty,
2653 new_self_dirty,
2654 old_current_session_self_clean,
2655 new_current_session_self_clean,
2656 }
2657 .compute();
2658 if result.dirty_count_update - result.current_session_clean_update < 0 {
2659 if let Some(activeness_state) = task.get_activeness_mut() {
2661 activeness_state.all_clean_event.notify(usize::MAX);
2662 activeness_state.unset_active_until_clean();
2663 if activeness_state.is_empty() {
2664 task.take_activeness();
2665 }
2666 }
2667 }
2668 result
2669 .aggregated_update(task_id)
2670 .and_then(|aggregated_update| {
2671 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2672 })
2673 } else {
2674 None
2675 };
2676
2677 #[cfg(feature = "verify_determinism")]
2678 let reschedule =
2679 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2680 #[cfg(not(feature = "verify_determinism"))]
2681 let reschedule = false;
2682 if reschedule {
2683 let old = task.set_in_progress(InProgressState::Scheduled {
2684 done_event,
2685 reason: TaskExecutionReason::Stale,
2686 });
2687 debug_assert!(old.is_none(), "InProgress already exists");
2688 drop(task);
2689 } else {
2690 drop(task);
2691
2692 done_event.notify(usize::MAX);
2694 }
2695
2696 drop(old_content);
2697
2698 if let Some(data_update) = data_update {
2699 AggregationUpdateQueue::run(data_update, ctx);
2700 }
2701
2702 (reschedule, in_progress_cells)
2704 }
2705
2706 fn task_execution_completed_cleanup(
2707 &self,
2708 ctx: &mut impl ExecuteContext<'_>,
2709 task_id: TaskId,
2710 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2711 is_error: bool,
2712 ) -> Vec<SharedReference> {
2713 let mut task = ctx.task(task_id, TaskDataCategory::All);
2714 let mut removed_cell_data = Vec::new();
2715 if !is_error {
2719 let to_remove_persistent: Vec<_> = task
2726 .iter_persistent_cell_data()
2727 .filter_map(|(cell, _)| {
2728 cell_counters
2729 .get(&cell.type_id)
2730 .is_none_or(|start_index| cell.index >= *start_index)
2731 .then_some(*cell)
2732 })
2733 .collect();
2734
2735 let to_remove_transient: Vec<_> = task
2737 .iter_transient_cell_data()
2738 .filter_map(|(cell, _)| {
2739 cell_counters
2740 .get(&cell.type_id)
2741 .is_none_or(|start_index| cell.index >= *start_index)
2742 .then_some(*cell)
2743 })
2744 .collect();
2745 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2746 for cell in to_remove_persistent {
2747 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2748 removed_cell_data.push(data.into_untyped());
2749 }
2750 }
2751 for cell in to_remove_transient {
2752 if let Some(data) = task.remove_transient_cell_data(&cell) {
2753 removed_cell_data.push(data);
2754 }
2755 }
2756 }
2757
2758 task.cleanup_after_execution();
2762
2763 drop(task);
2764
2765 removed_cell_data
2767 }
2768
2769 fn run_backend_job<'a>(
2770 self: &'a Arc<Self>,
2771 job: TurboTasksBackendJob,
2772 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2773 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2774 Box::pin(async move {
2775 match job {
2776 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2777 debug_assert!(self.should_persist());
2778
2779 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2780 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2781 let mut idle_start_listener = self.idle_start_event.listen();
2782 let mut idle_end_listener = self.idle_end_event.listen();
2783 let mut fresh_idle = true;
2784 loop {
2785 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2786 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2787 let idle_timeout = *IDLE_TIMEOUT;
2788 let (time, mut reason) =
2789 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2790 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2791 } else {
2792 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2793 };
2794
2795 let until = last_snapshot + time;
2796 if until > Instant::now() {
2797 let mut stop_listener = self.stopping_event.listen();
2798 if self.stopping.load(Ordering::Acquire) {
2799 return;
2800 }
2801 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2802 Instant::now() + idle_timeout
2803 } else {
2804 far_future()
2805 };
2806 loop {
2807 tokio::select! {
2808 _ = &mut stop_listener => {
2809 return;
2810 },
2811 _ = &mut idle_start_listener => {
2812 fresh_idle = true;
2813 idle_time = Instant::now() + idle_timeout;
2814 idle_start_listener = self.idle_start_event.listen()
2815 },
2816 _ = &mut idle_end_listener => {
2817 idle_time = until + idle_timeout;
2818 idle_end_listener = self.idle_end_event.listen()
2819 },
2820 _ = tokio::time::sleep_until(until) => {
2821 break;
2822 },
2823 _ = tokio::time::sleep_until(idle_time) => {
2824 if turbo_tasks.is_idle() {
2825 reason = "idle timeout";
2826 break;
2827 }
2828 },
2829 }
2830 }
2831 }
2832
2833 let this = self.clone();
2834 let snapshot = this.snapshot_and_persist(None, reason, turbo_tasks);
2835 if let Some((snapshot_start, new_data)) = snapshot {
2836 last_snapshot = snapshot_start;
2837 if !new_data {
2838 fresh_idle = false;
2839 continue;
2840 }
2841 let last_snapshot = last_snapshot.duration_since(self.start_time);
2842 self.last_snapshot.store(
2843 last_snapshot.as_millis().try_into().unwrap(),
2844 Ordering::Relaxed,
2845 );
2846
2847 turbo_tasks.schedule_backend_background_job(
2848 TurboTasksBackendJob::FollowUpSnapshot,
2849 );
2850 return;
2851 }
2852 }
2853 }
2854 }
2855 })
2856 }
2857
2858 fn try_read_own_task_cell(
2859 &self,
2860 task_id: TaskId,
2861 cell: CellId,
2862 options: ReadCellOptions,
2863 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2864 ) -> Result<TypedCellContent> {
2865 let mut ctx = self.execute_context(turbo_tasks);
2866 let task = ctx.task(task_id, TaskDataCategory::Data);
2867 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2868 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2869 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2870 } else {
2871 Ok(CellContent(None).into_typed(cell.type_id))
2872 }
2873 }
2874
2875 fn read_task_collectibles(
2876 &self,
2877 task_id: TaskId,
2878 collectible_type: TraitTypeId,
2879 reader_id: Option<TaskId>,
2880 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2881 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2882 let mut ctx = self.execute_context(turbo_tasks);
2883 let mut collectibles = AutoMap::default();
2884 {
2885 let mut task = ctx.task(task_id, TaskDataCategory::All);
2886 loop {
2888 let aggregation_number = get_aggregation_number(&task);
2889 if is_root_node(aggregation_number) {
2890 break;
2891 }
2892 drop(task);
2893 AggregationUpdateQueue::run(
2894 AggregationUpdateJob::UpdateAggregationNumber {
2895 task_id,
2896 base_aggregation_number: u32::MAX,
2897 distance: None,
2898 },
2899 &mut ctx,
2900 );
2901 task = ctx.task(task_id, TaskDataCategory::All);
2902 }
2903 for (collectible, count) in task.iter_aggregated_collectibles() {
2904 if *count > 0 && collectible.collectible_type == collectible_type {
2905 *collectibles
2906 .entry(RawVc::TaskCell(
2907 collectible.cell.task,
2908 collectible.cell.cell,
2909 ))
2910 .or_insert(0) += 1;
2911 }
2912 }
2913 for (&collectible, &count) in task.iter_collectibles() {
2914 if collectible.collectible_type == collectible_type {
2915 *collectibles
2916 .entry(RawVc::TaskCell(
2917 collectible.cell.task,
2918 collectible.cell.cell,
2919 ))
2920 .or_insert(0) += count;
2921 }
2922 }
2923 if let Some(reader_id) = reader_id {
2924 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2925 }
2926 }
2927 if let Some(reader_id) = reader_id {
2928 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2929 let target = CollectiblesRef {
2930 task: task_id,
2931 collectible_type,
2932 };
2933 if !reader.remove_outdated_collectibles_dependencies(&target) {
2934 let _ = reader.add_collectibles_dependencies(target);
2935 }
2936 }
2937 collectibles
2938 }
2939
2940 fn emit_collectible(
2941 &self,
2942 collectible_type: TraitTypeId,
2943 collectible: RawVc,
2944 task_id: TaskId,
2945 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2946 ) {
2947 self.assert_valid_collectible(task_id, collectible);
2948
2949 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2950 panic!("Collectibles need to be resolved");
2951 };
2952 let cell = CellRef {
2953 task: collectible_task,
2954 cell,
2955 };
2956 operation::UpdateCollectibleOperation::run(
2957 task_id,
2958 CollectibleRef {
2959 collectible_type,
2960 cell,
2961 },
2962 1,
2963 self.execute_context(turbo_tasks),
2964 );
2965 }
2966
2967 fn unemit_collectible(
2968 &self,
2969 collectible_type: TraitTypeId,
2970 collectible: RawVc,
2971 count: u32,
2972 task_id: TaskId,
2973 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2974 ) {
2975 self.assert_valid_collectible(task_id, collectible);
2976
2977 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2978 panic!("Collectibles need to be resolved");
2979 };
2980 let cell = CellRef {
2981 task: collectible_task,
2982 cell,
2983 };
2984 operation::UpdateCollectibleOperation::run(
2985 task_id,
2986 CollectibleRef {
2987 collectible_type,
2988 cell,
2989 },
2990 -(i32::try_from(count).unwrap()),
2991 self.execute_context(turbo_tasks),
2992 );
2993 }
2994
2995 fn update_task_cell(
2996 &self,
2997 task_id: TaskId,
2998 cell: CellId,
2999 is_serializable_cell_content: bool,
3000 content: CellContent,
3001 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3002 verification_mode: VerificationMode,
3003 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3004 ) {
3005 operation::UpdateCellOperation::run(
3006 task_id,
3007 cell,
3008 content,
3009 is_serializable_cell_content,
3010 updated_key_hashes,
3011 verification_mode,
3012 self.execute_context(turbo_tasks),
3013 );
3014 }
3015
3016 fn mark_own_task_as_session_dependent(
3017 &self,
3018 task_id: TaskId,
3019 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3020 ) {
3021 if !self.should_track_dependencies() {
3022 return;
3024 }
3025 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3026 let mut ctx = self.execute_context(turbo_tasks);
3027 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3028 let aggregation_number = get_aggregation_number(&task);
3029 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3030 drop(task);
3031 AggregationUpdateQueue::run(
3034 AggregationUpdateJob::UpdateAggregationNumber {
3035 task_id,
3036 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3037 distance: None,
3038 },
3039 &mut ctx,
3040 );
3041 task = ctx.task(task_id, TaskDataCategory::Meta);
3042 }
3043 if let Some(InProgressState::InProgress(box InProgressStateInner {
3044 session_dependent,
3045 ..
3046 })) = task.get_in_progress_mut()
3047 {
3048 *session_dependent = true;
3049 }
3050 }
3051
3052 fn mark_own_task_as_finished(
3053 &self,
3054 task: TaskId,
3055 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3056 ) {
3057 let mut ctx = self.execute_context(turbo_tasks);
3058 let mut task = ctx.task(task, TaskDataCategory::Data);
3059 if let Some(InProgressState::InProgress(box InProgressStateInner {
3060 marked_as_completed,
3061 ..
3062 })) = task.get_in_progress_mut()
3063 {
3064 *marked_as_completed = true;
3065 }
3070 }
3071
3072 fn connect_task(
3073 &self,
3074 task: TaskId,
3075 parent_task: Option<TaskId>,
3076 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3077 ) {
3078 self.assert_not_persistent_calling_transient(parent_task, task, None);
3079 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3080 }
3081
3082 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3083 let task_id = self.transient_task_id_factory.get();
3084 {
3085 let mut task = self.storage.access_mut(task_id);
3086 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3087 }
3088 #[cfg(feature = "verify_aggregation_graph")]
3089 self.root_tasks.lock().insert(task_id);
3090 task_id
3091 }
3092
3093 fn dispose_root_task(
3094 &self,
3095 task_id: TaskId,
3096 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3097 ) {
3098 #[cfg(feature = "verify_aggregation_graph")]
3099 self.root_tasks.lock().remove(&task_id);
3100
3101 let mut ctx = self.execute_context(turbo_tasks);
3102 let mut task = ctx.task(task_id, TaskDataCategory::All);
3103 let is_dirty = task.is_dirty();
3104 let has_dirty_containers = task.has_dirty_containers();
3105 if is_dirty.is_some() || has_dirty_containers {
3106 if let Some(activeness_state) = task.get_activeness_mut() {
3107 activeness_state.unset_root_type();
3109 activeness_state.set_active_until_clean();
3110 };
3111 } else if let Some(activeness_state) = task.take_activeness() {
3112 activeness_state.all_clean_event.notify(usize::MAX);
3115 }
3116 }
3117
3118 #[cfg(feature = "verify_aggregation_graph")]
3119 fn verify_aggregation_graph(
3120 &self,
3121 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3122 idle: bool,
3123 ) {
3124 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3125 return;
3126 }
3127 use std::{collections::VecDeque, env, io::stdout};
3128
3129 use crate::backend::operation::{get_uppers, is_aggregating_node};
3130
3131 let mut ctx = self.execute_context(turbo_tasks);
3132 let root_tasks = self.root_tasks.lock().clone();
3133
3134 for task_id in root_tasks.into_iter() {
3135 let mut queue = VecDeque::new();
3136 let mut visited = FxHashSet::default();
3137 let mut aggregated_nodes = FxHashSet::default();
3138 let mut collectibles = FxHashMap::default();
3139 let root_task_id = task_id;
3140 visited.insert(task_id);
3141 aggregated_nodes.insert(task_id);
3142 queue.push_back(task_id);
3143 let mut counter = 0;
3144 while let Some(task_id) = queue.pop_front() {
3145 counter += 1;
3146 if counter % 100000 == 0 {
3147 println!(
3148 "queue={}, visited={}, aggregated_nodes={}",
3149 queue.len(),
3150 visited.len(),
3151 aggregated_nodes.len()
3152 );
3153 }
3154 let task = ctx.task(task_id, TaskDataCategory::All);
3155 if idle && !self.is_idle.load(Ordering::Relaxed) {
3156 return;
3157 }
3158
3159 let uppers = get_uppers(&task);
3160 if task_id != root_task_id
3161 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3162 {
3163 panic!(
3164 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3165 {:?})",
3166 task_id,
3167 task.get_task_description(),
3168 uppers
3169 );
3170 }
3171
3172 for (collectible, _) in task.iter_aggregated_collectibles() {
3173 collectibles
3174 .entry(*collectible)
3175 .or_insert_with(|| (false, Vec::new()))
3176 .1
3177 .push(task_id);
3178 }
3179
3180 for (&collectible, &value) in task.iter_collectibles() {
3181 if value > 0 {
3182 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3183 *flag = true
3184 } else {
3185 panic!(
3186 "Task {} has a collectible {:?} that is not in any upper task",
3187 task_id, collectible
3188 );
3189 }
3190 }
3191 }
3192
3193 let is_dirty = task.has_dirty();
3194 let has_dirty_container = task.has_dirty_containers();
3195 let should_be_in_upper = is_dirty || has_dirty_container;
3196
3197 let aggregation_number = get_aggregation_number(&task);
3198 if is_aggregating_node(aggregation_number) {
3199 aggregated_nodes.insert(task_id);
3200 }
3201 for child_id in task.iter_children() {
3208 if visited.insert(child_id) {
3210 queue.push_back(child_id);
3211 }
3212 }
3213 drop(task);
3214
3215 if should_be_in_upper {
3216 for upper_id in uppers {
3217 let upper = ctx.task(upper_id, TaskDataCategory::All);
3218 let in_upper = upper
3219 .get_aggregated_dirty_containers(&task_id)
3220 .is_some_and(|&dirty| dirty > 0);
3221 if !in_upper {
3222 let containers: Vec<_> = upper
3223 .iter_aggregated_dirty_containers()
3224 .map(|(&k, &v)| (k, v))
3225 .collect();
3226 let upper_task_desc = upper.get_task_description();
3227 drop(upper);
3228 panic!(
3229 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3230 ({})\nThese dirty containers are present:\n{:#?}",
3231 task_id,
3232 ctx.task(task_id, TaskDataCategory::Data)
3233 .get_task_description(),
3234 upper_id,
3235 upper_task_desc,
3236 containers,
3237 );
3238 }
3239 }
3240 }
3241 }
3242
3243 for (collectible, (flag, task_ids)) in collectibles {
3244 if !flag {
3245 use std::io::Write;
3246 let mut stdout = stdout().lock();
3247 writeln!(
3248 stdout,
3249 "{:?} that is not emitted in any child task but in these aggregated \
3250 tasks: {:#?}",
3251 collectible,
3252 task_ids
3253 .iter()
3254 .map(|t| format!(
3255 "{t} {}",
3256 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3257 ))
3258 .collect::<Vec<_>>()
3259 )
3260 .unwrap();
3261
3262 let task_id = collectible.cell.task;
3263 let mut queue = {
3264 let task = ctx.task(task_id, TaskDataCategory::All);
3265 get_uppers(&task)
3266 };
3267 let mut visited = FxHashSet::default();
3268 for &upper_id in queue.iter() {
3269 visited.insert(upper_id);
3270 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3271 }
3272 while let Some(task_id) = queue.pop() {
3273 let task = ctx.task(task_id, TaskDataCategory::All);
3274 let desc = task.get_task_description();
3275 let aggregated_collectible = task
3276 .get_aggregated_collectibles(&collectible)
3277 .copied()
3278 .unwrap_or_default();
3279 let uppers = get_uppers(&task);
3280 drop(task);
3281 writeln!(
3282 stdout,
3283 "upper {task_id} {desc} collectible={aggregated_collectible}"
3284 )
3285 .unwrap();
3286 if task_ids.contains(&task_id) {
3287 writeln!(
3288 stdout,
3289 "Task has an upper connection to an aggregated task that doesn't \
3290 reference it. Upper connection is invalid!"
3291 )
3292 .unwrap();
3293 }
3294 for upper_id in uppers {
3295 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3296 if !visited.contains(&upper_id) {
3297 queue.push(upper_id);
3298 }
3299 }
3300 }
3301 panic!("See stdout for more details");
3302 }
3303 }
3304 }
3305 }
3306
3307 fn assert_not_persistent_calling_transient(
3308 &self,
3309 parent_id: Option<TaskId>,
3310 child_id: TaskId,
3311 cell_id: Option<CellId>,
3312 ) {
3313 if let Some(parent_id) = parent_id
3314 && !parent_id.is_transient()
3315 && child_id.is_transient()
3316 {
3317 self.panic_persistent_calling_transient(
3318 self.debug_get_task_description(parent_id),
3319 self.debug_get_cached_task_type(child_id).as_deref(),
3320 cell_id,
3321 );
3322 }
3323 }
3324
3325 fn panic_persistent_calling_transient(
3326 &self,
3327 parent: String,
3328 child: Option<&CachedTaskType>,
3329 cell_id: Option<CellId>,
3330 ) {
3331 let transient_reason = if let Some(child) = child {
3332 Cow::Owned(format!(
3333 " The callee is transient because it depends on:\n{}",
3334 self.debug_trace_transient_task(child, cell_id),
3335 ))
3336 } else {
3337 Cow::Borrowed("")
3338 };
3339 panic!(
3340 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3341 parent,
3342 child.map_or("unknown", |t| t.get_name()),
3343 transient_reason,
3344 );
3345 }
3346
3347 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3348 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3350 let task_info = if let Some(col_task_ty) = collectible
3352 .try_get_task_id()
3353 .map(|t| self.debug_get_task_description(t))
3354 {
3355 Cow::Owned(format!(" (return type of {col_task_ty})"))
3356 } else {
3357 Cow::Borrowed("")
3358 };
3359 panic!("Collectible{task_info} must be a ResolvedVc")
3360 };
3361 if col_task_id.is_transient() && !task_id.is_transient() {
3362 let transient_reason =
3363 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3364 Cow::Owned(format!(
3365 ". The collectible is transient because it depends on:\n{}",
3366 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3367 ))
3368 } else {
3369 Cow::Borrowed("")
3370 };
3371 panic!(
3373 "Collectible is transient, transient collectibles cannot be emitted from \
3374 persistent tasks{transient_reason}",
3375 )
3376 }
3377 }
3378}
3379
3380impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3381 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3382 self.0.startup(turbo_tasks);
3383 }
3384
3385 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3386 self.0.stopping();
3387 }
3388
3389 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3390 self.0.stop(turbo_tasks);
3391 }
3392
3393 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3394 self.0.idle_start(turbo_tasks);
3395 }
3396
3397 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3398 self.0.idle_end();
3399 }
3400
3401 fn get_or_create_persistent_task(
3402 &self,
3403 task_type: CachedTaskType,
3404 parent_task: Option<TaskId>,
3405 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3406 ) -> TaskId {
3407 self.0
3408 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3409 }
3410
3411 fn get_or_create_transient_task(
3412 &self,
3413 task_type: CachedTaskType,
3414 parent_task: Option<TaskId>,
3415 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3416 ) -> TaskId {
3417 self.0
3418 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3419 }
3420
3421 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3422 self.0.invalidate_task(task_id, turbo_tasks);
3423 }
3424
3425 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3426 self.0.invalidate_tasks(tasks, turbo_tasks);
3427 }
3428
3429 fn invalidate_tasks_set(
3430 &self,
3431 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3432 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3433 ) {
3434 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3435 }
3436
3437 fn invalidate_serialization(
3438 &self,
3439 task_id: TaskId,
3440 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3441 ) {
3442 self.0.invalidate_serialization(task_id, turbo_tasks);
3443 }
3444
3445 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3446 self.0.task_execution_canceled(task, turbo_tasks)
3447 }
3448
3449 fn try_start_task_execution(
3450 &self,
3451 task_id: TaskId,
3452 priority: TaskPriority,
3453 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3454 ) -> Option<TaskExecutionSpec<'_>> {
3455 self.0
3456 .try_start_task_execution(task_id, priority, turbo_tasks)
3457 }
3458
3459 fn task_execution_completed(
3460 &self,
3461 task_id: TaskId,
3462 result: Result<RawVc, TurboTasksExecutionError>,
3463 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3464 #[cfg(feature = "verify_determinism")] stateful: bool,
3465 has_invalidator: bool,
3466 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3467 ) -> bool {
3468 self.0.task_execution_completed(
3469 task_id,
3470 result,
3471 cell_counters,
3472 #[cfg(feature = "verify_determinism")]
3473 stateful,
3474 has_invalidator,
3475 turbo_tasks,
3476 )
3477 }
3478
3479 type BackendJob = TurboTasksBackendJob;
3480
3481 fn run_backend_job<'a>(
3482 &'a self,
3483 job: Self::BackendJob,
3484 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3485 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3486 self.0.run_backend_job(job, turbo_tasks)
3487 }
3488
3489 fn try_read_task_output(
3490 &self,
3491 task_id: TaskId,
3492 reader: Option<TaskId>,
3493 options: ReadOutputOptions,
3494 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3495 ) -> Result<Result<RawVc, EventListener>> {
3496 self.0
3497 .try_read_task_output(task_id, reader, options, turbo_tasks)
3498 }
3499
3500 fn try_read_task_cell(
3501 &self,
3502 task_id: TaskId,
3503 cell: CellId,
3504 reader: Option<TaskId>,
3505 options: ReadCellOptions,
3506 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3507 ) -> Result<Result<TypedCellContent, EventListener>> {
3508 self.0
3509 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3510 }
3511
3512 fn try_read_own_task_cell(
3513 &self,
3514 task_id: TaskId,
3515 cell: CellId,
3516 options: ReadCellOptions,
3517 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3518 ) -> Result<TypedCellContent> {
3519 self.0
3520 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3521 }
3522
3523 fn read_task_collectibles(
3524 &self,
3525 task_id: TaskId,
3526 collectible_type: TraitTypeId,
3527 reader: Option<TaskId>,
3528 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3529 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3530 self.0
3531 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3532 }
3533
3534 fn emit_collectible(
3535 &self,
3536 collectible_type: TraitTypeId,
3537 collectible: RawVc,
3538 task_id: TaskId,
3539 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3540 ) {
3541 self.0
3542 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3543 }
3544
3545 fn unemit_collectible(
3546 &self,
3547 collectible_type: TraitTypeId,
3548 collectible: RawVc,
3549 count: u32,
3550 task_id: TaskId,
3551 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3552 ) {
3553 self.0
3554 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3555 }
3556
3557 fn update_task_cell(
3558 &self,
3559 task_id: TaskId,
3560 cell: CellId,
3561 is_serializable_cell_content: bool,
3562 content: CellContent,
3563 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3564 verification_mode: VerificationMode,
3565 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3566 ) {
3567 self.0.update_task_cell(
3568 task_id,
3569 cell,
3570 is_serializable_cell_content,
3571 content,
3572 updated_key_hashes,
3573 verification_mode,
3574 turbo_tasks,
3575 );
3576 }
3577
3578 fn mark_own_task_as_finished(
3579 &self,
3580 task_id: TaskId,
3581 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3582 ) {
3583 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3584 }
3585
3586 fn mark_own_task_as_session_dependent(
3587 &self,
3588 task: TaskId,
3589 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3590 ) {
3591 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3592 }
3593
3594 fn connect_task(
3595 &self,
3596 task: TaskId,
3597 parent_task: Option<TaskId>,
3598 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3599 ) {
3600 self.0.connect_task(task, parent_task, turbo_tasks);
3601 }
3602
3603 fn create_transient_task(
3604 &self,
3605 task_type: TransientTaskType,
3606 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3607 ) -> TaskId {
3608 self.0.create_transient_task(task_type)
3609 }
3610
3611 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3612 self.0.dispose_root_task(task_id, turbo_tasks);
3613 }
3614
3615 fn task_statistics(&self) -> &TaskStatisticsApi {
3616 &self.0.task_statistics
3617 }
3618
3619 fn is_tracking_dependencies(&self) -> bool {
3620 self.0.options.dependency_tracking
3621 }
3622
3623 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3624 self.0.get_task_name(task, turbo_tasks)
3625 }
3626}
3627
3628enum DebugTraceTransientTask {
3629 Cached {
3630 task_name: &'static str,
3631 cell_type_id: Option<ValueTypeId>,
3632 cause_self: Option<Box<DebugTraceTransientTask>>,
3633 cause_args: Vec<DebugTraceTransientTask>,
3634 },
3635 Collapsed {
3637 task_name: &'static str,
3638 cell_type_id: Option<ValueTypeId>,
3639 },
3640 Uncached {
3641 cell_type_id: Option<ValueTypeId>,
3642 },
3643}
3644
3645impl DebugTraceTransientTask {
3646 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3647 let indent = " ".repeat(level);
3648 f.write_str(&indent)?;
3649
3650 fn fmt_cell_type_id(
3651 f: &mut fmt::Formatter<'_>,
3652 cell_type_id: Option<ValueTypeId>,
3653 ) -> fmt::Result {
3654 if let Some(ty) = cell_type_id {
3655 write!(f, " (read cell of type {})", get_value_type(ty).global_name)
3656 } else {
3657 Ok(())
3658 }
3659 }
3660
3661 match self {
3663 Self::Cached {
3664 task_name,
3665 cell_type_id,
3666 ..
3667 }
3668 | Self::Collapsed {
3669 task_name,
3670 cell_type_id,
3671 ..
3672 } => {
3673 f.write_str(task_name)?;
3674 fmt_cell_type_id(f, *cell_type_id)?;
3675 if matches!(self, Self::Collapsed { .. }) {
3676 f.write_str(" (collapsed)")?;
3677 }
3678 }
3679 Self::Uncached { cell_type_id } => {
3680 f.write_str("unknown transient task")?;
3681 fmt_cell_type_id(f, *cell_type_id)?;
3682 }
3683 }
3684 f.write_char('\n')?;
3685
3686 if let Self::Cached {
3688 cause_self,
3689 cause_args,
3690 ..
3691 } = self
3692 {
3693 if let Some(c) = cause_self {
3694 writeln!(f, "{indent} self:")?;
3695 c.fmt_indented(f, level + 1)?;
3696 }
3697 if !cause_args.is_empty() {
3698 writeln!(f, "{indent} args:")?;
3699 for c in cause_args {
3700 c.fmt_indented(f, level + 1)?;
3701 }
3702 }
3703 }
3704 Ok(())
3705 }
3706}
3707
3708impl fmt::Display for DebugTraceTransientTask {
3709 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3710 self.fmt_indented(f, 0)
3711 }
3712}
3713
3714fn far_future() -> Instant {
3716 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3721}
3722
3723fn encode_task_data(
3728 task: TaskId,
3729 data: &TaskStorage,
3730 category: SpecificTaskDataCategory,
3731 scratch_buffer: &mut TurboBincodeBuffer,
3732) -> Result<TurboBincodeBuffer> {
3733 scratch_buffer.clear();
3734 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3735 data.encode(category, &mut encoder)?;
3736
3737 if cfg!(feature = "verify_serialization") {
3738 TaskStorage::new()
3739 .decode(
3740 category,
3741 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3742 )
3743 .with_context(|| {
3744 format!(
3745 "expected to be able to decode serialized data for '{category:?}' information \
3746 for {task}"
3747 )
3748 })?;
3749 }
3750 Ok(SmallVec::from_slice(scratch_buffer))
3751}