1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17 time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33 backend::{
34 Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType,
35 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37 VerificationMode,
38 },
39 event::{Event, EventDescription, EventListener},
40 message_queue::{TimingEvent, TraceEvent},
41 registry::get_value_type,
42 scope::scope_and_block,
43 task_statistics::TaskStatisticsApi,
44 trace::TraceRawVcs,
45 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49 operation::AnyOperation,
50 storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55 backend::{
56 operation::{
57 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
59 LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
60 connect_children, get_aggregation_number, get_uppers, is_root_node,
61 make_task_dirty_internal, prepare_new_children,
62 },
63 storage::Storage,
64 storage_schema::{TaskStorage, TaskStorageAccessors},
65 },
66 backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
67 data::{
68 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69 InProgressState, InProgressStateInner, OutputValue, TransientTask,
70 },
71 error::TaskError,
72 utils::{
73 arc_or_owned::ArcOrOwned,
74 dash_map_drop_contents::drop_contents,
75 dash_map_raw_entry::{RawEntry, raw_entry},
76 ptr_eq_arc::PtrEqArc,
77 shard_amount::compute_shard_amount,
78 },
79};
80
81const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
85
86const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
87
88static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
91 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
92 .ok()
93 .and_then(|v| v.parse::<u64>().ok())
94 .map(Duration::from_millis)
95 .unwrap_or(Duration::from_secs(2))
96});
97
98struct SnapshotRequest {
99 snapshot_requested: bool,
100 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
101}
102
103impl SnapshotRequest {
104 fn new() -> Self {
105 Self {
106 snapshot_requested: false,
107 suspended_operations: FxHashSet::default(),
108 }
109 }
110}
111
112pub enum StorageMode {
113 ReadOnly,
115 ReadWrite,
118 ReadWriteOnShutdown,
121}
122
123pub struct BackendOptions {
124 pub dependency_tracking: bool,
129
130 pub active_tracking: bool,
136
137 pub storage_mode: Option<StorageMode>,
139
140 pub num_workers: Option<usize>,
143
144 pub small_preallocation: bool,
146}
147
148impl Default for BackendOptions {
149 fn default() -> Self {
150 Self {
151 dependency_tracking: true,
152 active_tracking: true,
153 storage_mode: Some(StorageMode::ReadWrite),
154 num_workers: None,
155 small_preallocation: false,
156 }
157 }
158}
159
160pub enum TurboTasksBackendJob {
161 InitialSnapshot,
162 FollowUpSnapshot,
163}
164
165pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
166
167struct TurboTasksBackendInner<B: BackingStorage> {
168 options: BackendOptions,
169
170 start_time: Instant,
171
172 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
173 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
174
175 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
176
177 storage: Storage,
178
179 in_progress_operations: AtomicUsize,
185
186 snapshot_request: Mutex<SnapshotRequest>,
187 operations_suspended: Condvar,
191 snapshot_completed: Condvar,
194 last_snapshot: AtomicU64,
196
197 stopping: AtomicBool,
198 stopping_event: Event,
199 idle_start_event: Event,
200 idle_end_event: Event,
201 #[cfg(feature = "verify_aggregation_graph")]
202 is_idle: AtomicBool,
203
204 task_statistics: TaskStatisticsApi,
205
206 backing_storage: B,
207
208 #[cfg(feature = "verify_aggregation_graph")]
209 root_tasks: Mutex<FxHashSet<TaskId>>,
210}
211
212impl<B: BackingStorage> TurboTasksBackend<B> {
213 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
214 Self(Arc::new(TurboTasksBackendInner::new(
215 options,
216 backing_storage,
217 )))
218 }
219
220 pub fn backing_storage(&self) -> &B {
221 &self.0.backing_storage
222 }
223}
224
225impl<B: BackingStorage> TurboTasksBackendInner<B> {
226 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
227 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
228 if !options.dependency_tracking {
229 options.active_tracking = false;
230 }
231 let small_preallocation = options.small_preallocation;
232 let next_task_id = backing_storage
233 .next_free_task_id()
234 .expect("Failed to get task id");
235 Self {
236 options,
237 start_time: Instant::now(),
238 persisted_task_id_factory: IdFactoryWithReuse::new(
239 next_task_id,
240 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
241 ),
242 transient_task_id_factory: IdFactoryWithReuse::new(
243 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
244 TaskId::MAX,
245 ),
246 task_cache: FxDashMap::default(),
247 storage: Storage::new(shard_amount, small_preallocation),
248 in_progress_operations: AtomicUsize::new(0),
249 snapshot_request: Mutex::new(SnapshotRequest::new()),
250 operations_suspended: Condvar::new(),
251 snapshot_completed: Condvar::new(),
252 last_snapshot: AtomicU64::new(0),
253 stopping: AtomicBool::new(false),
254 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
255 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
256 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
257 #[cfg(feature = "verify_aggregation_graph")]
258 is_idle: AtomicBool::new(false),
259 task_statistics: TaskStatisticsApi::default(),
260 backing_storage,
261 #[cfg(feature = "verify_aggregation_graph")]
262 root_tasks: Default::default(),
263 }
264 }
265
266 fn execute_context<'a>(
267 &'a self,
268 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
269 ) -> impl ExecuteContext<'a> {
270 ExecuteContextImpl::new(self, turbo_tasks)
271 }
272
273 fn suspending_requested(&self) -> bool {
274 self.should_persist()
275 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
276 }
277
278 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
279 #[cold]
280 fn operation_suspend_point_cold<B: BackingStorage>(
281 this: &TurboTasksBackendInner<B>,
282 suspend: impl FnOnce() -> AnyOperation,
283 ) {
284 let operation = Arc::new(suspend());
285 let mut snapshot_request = this.snapshot_request.lock();
286 if snapshot_request.snapshot_requested {
287 snapshot_request
288 .suspended_operations
289 .insert(operation.clone().into());
290 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
291 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
292 if value == SNAPSHOT_REQUESTED_BIT {
293 this.operations_suspended.notify_all();
294 }
295 this.snapshot_completed
296 .wait_while(&mut snapshot_request, |snapshot_request| {
297 snapshot_request.snapshot_requested
298 });
299 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
300 snapshot_request
301 .suspended_operations
302 .remove(&operation.into());
303 }
304 }
305
306 if self.suspending_requested() {
307 operation_suspend_point_cold(self, suspend);
308 }
309 }
310
311 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
312 if !self.should_persist() {
313 return OperationGuard { backend: None };
314 }
315 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
317 let mut snapshot_request = self.snapshot_request.lock();
318 if snapshot_request.snapshot_requested {
319 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
320 if value == SNAPSHOT_REQUESTED_BIT {
321 self.operations_suspended.notify_all();
322 }
323 self.snapshot_completed
324 .wait_while(&mut snapshot_request, |snapshot_request| {
325 snapshot_request.snapshot_requested
326 });
327 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
328 }
329 }
330 OperationGuard {
331 backend: Some(self),
332 }
333 }
334
335 fn should_persist(&self) -> bool {
336 matches!(
337 self.options.storage_mode,
338 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
339 )
340 }
341
342 fn should_restore(&self) -> bool {
343 self.options.storage_mode.is_some()
344 }
345
346 fn should_track_dependencies(&self) -> bool {
347 self.options.dependency_tracking
348 }
349
350 fn should_track_activeness(&self) -> bool {
351 self.options.active_tracking
352 }
353
354 fn track_cache_hit(&self, task_type: &CachedTaskType) {
355 self.task_statistics
356 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
357 }
358
359 fn track_cache_miss(&self, task_type: &CachedTaskType) {
360 self.task_statistics
361 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
362 }
363
364 fn task_error_to_turbo_tasks_execution_error(
369 &self,
370 error: &TaskError,
371 ctx: &mut impl ExecuteContext<'_>,
372 ) -> TurboTasksExecutionError {
373 match error {
374 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
375 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
376 message: item.message.clone(),
377 source: item
378 .source
379 .as_ref()
380 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
381 })),
382 TaskError::LocalTaskContext(local_task_context) => {
383 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
384 name: local_task_context.name.clone(),
385 source: local_task_context
386 .source
387 .as_ref()
388 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
389 }))
390 }
391 TaskError::TaskChain(chain) => {
392 let task_id = chain.last().unwrap();
393 let error = {
394 let task = ctx.task(*task_id, TaskDataCategory::Meta);
395 if let Some(OutputValue::Error(error)) = task.get_output() {
396 Some(error.clone())
397 } else {
398 None
399 }
400 };
401 let error = error.map_or_else(
402 || {
403 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
405 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
406 "Error no longer available",
407 )),
408 location: None,
409 }))
410 },
411 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
412 );
413 let mut current_error = error;
414 for &task_id in chain.iter().rev() {
415 current_error =
416 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
417 task_id,
418 source: Some(current_error),
419 turbo_tasks: ctx.turbo_tasks(),
420 }));
421 }
422 current_error
423 }
424 }
425 }
426}
427
428pub(crate) struct OperationGuard<'a, B: BackingStorage> {
429 backend: Option<&'a TurboTasksBackendInner<B>>,
430}
431
432impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
433 fn drop(&mut self) {
434 if let Some(backend) = self.backend {
435 let fetch_sub = backend
436 .in_progress_operations
437 .fetch_sub(1, Ordering::AcqRel);
438 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
439 backend.operations_suspended.notify_all();
440 }
441 }
442 }
443}
444
445struct TaskExecutionCompletePrepareResult {
447 pub new_children: FxHashSet<TaskId>,
448 pub is_now_immutable: bool,
449 #[cfg(feature = "verify_determinism")]
450 pub no_output_set: bool,
451 pub new_output: Option<OutputValue>,
452 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
453 pub is_recomputation: bool,
454}
455
456impl<B: BackingStorage> TurboTasksBackendInner<B> {
458 fn connect_child(
459 &self,
460 parent_task: Option<TaskId>,
461 child_task: TaskId,
462 task_type: Option<ArcOrOwned<CachedTaskType>>,
463 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
464 ) {
465 operation::ConnectChildOperation::run(
466 parent_task,
467 child_task,
468 task_type,
469 self.execute_context(turbo_tasks),
470 );
471 }
472
473 fn try_read_task_output(
474 self: &Arc<Self>,
475 task_id: TaskId,
476 reader: Option<TaskId>,
477 options: ReadOutputOptions,
478 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479 ) -> Result<Result<RawVc, EventListener>> {
480 self.assert_not_persistent_calling_transient(reader, task_id, None);
481
482 let mut ctx = self.execute_context(turbo_tasks);
483 let need_reader_task = if self.should_track_dependencies()
484 && !matches!(options.tracking, ReadTracking::Untracked)
485 && reader.is_some_and(|reader_id| reader_id != task_id)
486 && let Some(reader_id) = reader
487 && reader_id != task_id
488 {
489 Some(reader_id)
490 } else {
491 None
492 };
493 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
494 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
498 (task, Some(reader))
499 } else {
500 (ctx.task(task_id, TaskDataCategory::All), None)
501 };
502
503 fn listen_to_done_event(
504 reader_description: Option<EventDescription>,
505 tracking: ReadTracking,
506 done_event: &Event,
507 ) -> EventListener {
508 done_event.listen_with_note(move || {
509 move || {
510 if let Some(reader_description) = reader_description.as_ref() {
511 format!(
512 "try_read_task_output from {} ({})",
513 reader_description, tracking
514 )
515 } else {
516 format!("try_read_task_output ({})", tracking)
517 }
518 }
519 })
520 }
521
522 fn check_in_progress(
523 task: &impl TaskGuard,
524 reader_description: Option<EventDescription>,
525 tracking: ReadTracking,
526 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
527 {
528 match task.get_in_progress() {
529 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
530 listen_to_done_event(reader_description, tracking, done_event),
531 ))),
532 Some(InProgressState::InProgress(box InProgressStateInner {
533 done_event, ..
534 })) => Some(Ok(Err(listen_to_done_event(
535 reader_description,
536 tracking,
537 done_event,
538 )))),
539 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
540 "{} was canceled",
541 task.get_task_description()
542 ))),
543 None => None,
544 }
545 }
546
547 if matches!(options.consistency, ReadConsistency::Strong) {
548 loop {
550 let aggregation_number = get_aggregation_number(&task);
551 if is_root_node(aggregation_number) {
552 break;
553 }
554 drop(task);
555 drop(reader_task);
556 {
557 let _span = tracing::trace_span!(
558 "make root node for strongly consistent read",
559 %task_id
560 )
561 .entered();
562 AggregationUpdateQueue::run(
563 AggregationUpdateJob::UpdateAggregationNumber {
564 task_id,
565 base_aggregation_number: u32::MAX,
566 distance: None,
567 },
568 &mut ctx,
569 );
570 }
571 (task, reader_task) = if let Some(reader_id) = need_reader_task {
572 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
574 (task, Some(reader))
575 } else {
576 (ctx.task(task_id, TaskDataCategory::All), None)
577 }
578 }
579
580 let is_dirty = task.is_dirty();
581
582 let has_dirty_containers = task.has_dirty_containers();
584 if has_dirty_containers || is_dirty.is_some() {
585 let activeness = task.get_activeness_mut();
586 let mut task_ids_to_schedule: Vec<_> = Vec::new();
587 let activeness = if let Some(activeness) = activeness {
589 activeness.set_active_until_clean();
593 activeness
594 } else {
595 if ctx.should_track_activeness() {
599 task_ids_to_schedule = task.dirty_containers().collect();
601 task_ids_to_schedule.push(task_id);
602 }
603 let activeness =
604 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
605 activeness.set_active_until_clean();
606 activeness
607 };
608 let listener = activeness.all_clean_event.listen_with_note(move || {
609 let this = self.clone();
610 let tt = turbo_tasks.pin();
611 move || {
612 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
613 let mut ctx = this.execute_context(tt);
614 let mut visited = FxHashSet::default();
615 fn indent(s: &str) -> String {
616 s.split_inclusive('\n')
617 .flat_map(|line: &str| [" ", line].into_iter())
618 .collect::<String>()
619 }
620 fn get_info(
621 ctx: &mut impl ExecuteContext<'_>,
622 task_id: TaskId,
623 parent_and_count: Option<(TaskId, i32)>,
624 visited: &mut FxHashSet<TaskId>,
625 ) -> String {
626 let task = ctx.task(task_id, TaskDataCategory::All);
627 let is_dirty = task.is_dirty();
628 let in_progress =
629 task.get_in_progress()
630 .map_or("not in progress", |p| match p {
631 InProgressState::InProgress(_) => "in progress",
632 InProgressState::Scheduled { .. } => "scheduled",
633 InProgressState::Canceled => "canceled",
634 });
635 let activeness = task.get_activeness().map_or_else(
636 || "not active".to_string(),
637 |activeness| format!("{activeness:?}"),
638 );
639 let aggregation_number = get_aggregation_number(&task);
640 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
641 {
642 let uppers = get_uppers(&task);
643 !uppers.contains(&parent_task_id)
644 } else {
645 false
646 };
647
648 let has_dirty_containers = task.has_dirty_containers();
650
651 let task_description = task.get_task_description();
652 let is_dirty_label = if let Some(parent_priority) = is_dirty {
653 format!(", dirty({parent_priority})")
654 } else {
655 String::new()
656 };
657 let has_dirty_containers_label = if has_dirty_containers {
658 ", dirty containers"
659 } else {
660 ""
661 };
662 let count = if let Some((_, count)) = parent_and_count {
663 format!(" {count}")
664 } else {
665 String::new()
666 };
667 let mut info = format!(
668 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
669 {in_progress}, \
670 {activeness}{is_dirty_label}{has_dirty_containers_label})",
671 );
672 let children: Vec<_> = task.dirty_containers_with_count().collect();
673 drop(task);
674
675 if missing_upper {
676 info.push_str("\n ERROR: missing upper connection");
677 }
678
679 if has_dirty_containers || !children.is_empty() {
680 writeln!(info, "\n dirty tasks:").unwrap();
681
682 for (child_task_id, count) in children {
683 let task_description = ctx
684 .task(child_task_id, TaskDataCategory::Data)
685 .get_task_description();
686 if visited.insert(child_task_id) {
687 let child_info = get_info(
688 ctx,
689 child_task_id,
690 Some((task_id, count)),
691 visited,
692 );
693 info.push_str(&indent(&child_info));
694 if !info.ends_with('\n') {
695 info.push('\n');
696 }
697 } else {
698 writeln!(
699 info,
700 " {child_task_id} {task_description} {count} \
701 (already visited)"
702 )
703 .unwrap();
704 }
705 }
706 }
707 info
708 }
709 let info = get_info(&mut ctx, task_id, None, &mut visited);
710 format!(
711 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
712 )
713 }
714 });
715 drop(reader_task);
716 drop(task);
717 if !task_ids_to_schedule.is_empty() {
718 let mut queue = AggregationUpdateQueue::new();
719 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
720 queue.execute(&mut ctx);
721 }
722
723 return Ok(Err(listener));
724 }
725 }
726
727 let reader_description = reader_task
728 .as_ref()
729 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
730 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
731 {
732 return value;
733 }
734
735 if let Some(output) = task.get_output() {
736 let result = match output {
737 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
738 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
739 OutputValue::Error(error) => Err(error.clone()),
740 };
741 if let Some(mut reader_task) = reader_task.take()
742 && options.tracking.should_track(result.is_err())
743 && (!task.immutable() || cfg!(feature = "verify_immutable"))
744 {
745 #[cfg(feature = "trace_task_output_dependencies")]
746 let _span = tracing::trace_span!(
747 "add output dependency",
748 task = %task_id,
749 dependent_task = ?reader
750 )
751 .entered();
752 let mut queue = LeafDistanceUpdateQueue::new();
753 let reader = reader.unwrap();
754 if task.add_output_dependent(reader) {
755 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
757 let reader_leaf_distance =
758 reader_task.get_leaf_distance().copied().unwrap_or_default();
759 if reader_leaf_distance.distance <= leaf_distance.distance {
760 queue.push(
761 reader,
762 leaf_distance.distance,
763 leaf_distance.max_distance_in_buffer,
764 );
765 }
766 }
767
768 drop(task);
769
770 if !reader_task.remove_outdated_output_dependencies(&task_id) {
776 let _ = reader_task.add_output_dependencies(task_id);
777 }
778 drop(reader_task);
779
780 queue.execute(&mut ctx);
781 } else {
782 drop(task);
783 }
784
785 return result.map_err(|error| {
786 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
787 .with_task_context(task_id, turbo_tasks.pin())
788 .into()
789 });
790 }
791 drop(reader_task);
792
793 let note = EventDescription::new(|| {
794 move || {
795 if let Some(reader) = reader_description.as_ref() {
796 format!("try_read_task_output (recompute) from {reader}",)
797 } else {
798 "try_read_task_output (recompute, untracked)".to_string()
799 }
800 }
801 });
802
803 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
805 TaskExecutionReason::OutputNotAvailable,
806 EventDescription::new(|| task.get_task_desc_fn()),
807 note,
808 );
809
810 let old = task.set_in_progress(in_progress_state);
813 debug_assert!(old.is_none(), "InProgress already exists");
814 ctx.schedule_task(task, TaskPriority::Initial);
815
816 Ok(Err(listener))
817 }
818
819 fn try_read_task_cell(
820 &self,
821 task_id: TaskId,
822 reader: Option<TaskId>,
823 cell: CellId,
824 options: ReadCellOptions,
825 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
826 ) -> Result<Result<TypedCellContent, EventListener>> {
827 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
828
829 fn add_cell_dependency(
830 task_id: TaskId,
831 mut task: impl TaskGuard,
832 reader: Option<TaskId>,
833 reader_task: Option<impl TaskGuard>,
834 cell: CellId,
835 key: Option<u64>,
836 ) {
837 if let Some(mut reader_task) = reader_task
838 && (!task.immutable() || cfg!(feature = "verify_immutable"))
839 {
840 let reader = reader.unwrap();
841 let _ = task.add_cell_dependents((cell, key, reader));
842 drop(task);
843
844 let target = CellRef {
850 task: task_id,
851 cell,
852 };
853 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
854 let _ = reader_task.add_cell_dependencies((target, key));
855 }
856 drop(reader_task);
857 }
858 }
859
860 let ReadCellOptions {
861 is_serializable_cell_content,
862 tracking,
863 final_read_hint,
864 } = options;
865
866 let mut ctx = self.execute_context(turbo_tasks);
867 let (mut task, reader_task) = if self.should_track_dependencies()
868 && !matches!(tracking, ReadCellTracking::Untracked)
869 && let Some(reader_id) = reader
870 && reader_id != task_id
871 {
872 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
876 (task, Some(reader))
877 } else {
878 (ctx.task(task_id, TaskDataCategory::All), None)
879 };
880
881 let content = if final_read_hint {
882 task.remove_cell_data(is_serializable_cell_content, cell)
883 } else {
884 task.get_cell_data(is_serializable_cell_content, cell)
885 };
886 if let Some(content) = content {
887 if tracking.should_track(false) {
888 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
889 }
890 return Ok(Ok(TypedCellContent(
891 cell.type_id,
892 CellContent(Some(content.reference)),
893 )));
894 }
895
896 let in_progress = task.get_in_progress();
897 if matches!(
898 in_progress,
899 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
900 ) {
901 return Ok(Err(self
902 .listen_to_cell(&mut task, task_id, &reader_task, cell)
903 .0));
904 }
905 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
906
907 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
909 let Some(max_id) = max_id else {
910 let task_desc = task.get_task_description();
911 if tracking.should_track(true) {
912 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
913 }
914 bail!(
915 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
916 );
917 };
918 if cell.index >= max_id {
919 let task_desc = task.get_task_description();
920 if tracking.should_track(true) {
921 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
922 }
923 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
924 }
925
926 if is_cancelled {
932 bail!("{} was canceled", task.get_task_description());
933 }
934
935 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
937 drop(reader_task);
938 if !new_listener {
939 return Ok(Err(listener));
940 }
941
942 let _span = tracing::trace_span!(
943 "recomputation",
944 cell_type = get_value_type(cell.type_id).ty.global_name,
945 cell_index = cell.index
946 )
947 .entered();
948
949 let _ = task.add_scheduled(
950 TaskExecutionReason::CellNotAvailable,
951 EventDescription::new(|| task.get_task_desc_fn()),
952 );
953 ctx.schedule_task(task, TaskPriority::Initial);
954
955 Ok(Err(listener))
956 }
957
958 fn listen_to_cell(
959 &self,
960 task: &mut impl TaskGuard,
961 task_id: TaskId,
962 reader_task: &Option<impl TaskGuard>,
963 cell: CellId,
964 ) -> (EventListener, bool) {
965 let note = || {
966 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
967 move || {
968 if let Some(reader_desc) = reader_desc.as_ref() {
969 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
970 } else {
971 "try_read_task_cell (in progress, untracked)".to_string()
972 }
973 }
974 };
975 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
976 let listener = in_progress.event.listen_with_note(note);
978 return (listener, false);
979 }
980 let in_progress = InProgressCellState::new(task_id, cell);
981 let listener = in_progress.event.listen_with_note(note);
982 let old = task.insert_in_progress_cells(cell, in_progress);
983 debug_assert!(old.is_none(), "InProgressCell already exists");
984 (listener, true)
985 }
986
987 fn snapshot_and_persist(
988 &self,
989 parent_span: Option<tracing::Id>,
990 reason: &str,
991 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
992 ) -> Result<(Instant, bool), anyhow::Error> {
993 let snapshot_span =
994 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
995 .entered();
996 let start = Instant::now();
997 let wall_start = SystemTime::now();
1001 debug_assert!(self.should_persist());
1002
1003 let suspended_operations;
1004 {
1005 let _span = tracing::info_span!("blocking").entered();
1006 let mut snapshot_request = self.snapshot_request.lock();
1007 snapshot_request.snapshot_requested = true;
1008 let active_operations = self
1009 .in_progress_operations
1010 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1011 if active_operations != 0 {
1012 self.operations_suspended
1013 .wait_while(&mut snapshot_request, |_| {
1014 self.in_progress_operations.load(Ordering::Relaxed)
1015 != SNAPSHOT_REQUESTED_BIT
1016 });
1017 }
1018 suspended_operations = snapshot_request
1019 .suspended_operations
1020 .iter()
1021 .map(|op| op.arc().clone())
1022 .collect::<Vec<_>>();
1023 }
1024 let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
1027 let mut snapshot_request = self.snapshot_request.lock();
1028 snapshot_request.snapshot_requested = false;
1029 self.in_progress_operations
1030 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1031 self.snapshot_completed.notify_all();
1032 let snapshot_time = Instant::now();
1033 drop(snapshot_request);
1034
1035 if !has_modifications {
1036 drop(snapshot_guard);
1039 return Ok((start, false));
1040 }
1041
1042 #[cfg(feature = "print_cache_item_size")]
1043 #[derive(Default)]
1044 struct TaskCacheStats {
1045 data: usize,
1046 #[cfg(feature = "print_cache_item_size_with_compressed")]
1047 data_compressed: usize,
1048 data_count: usize,
1049 meta: usize,
1050 #[cfg(feature = "print_cache_item_size_with_compressed")]
1051 meta_compressed: usize,
1052 meta_count: usize,
1053 upper_count: usize,
1054 collectibles_count: usize,
1055 aggregated_collectibles_count: usize,
1056 children_count: usize,
1057 followers_count: usize,
1058 collectibles_dependents_count: usize,
1059 aggregated_dirty_containers_count: usize,
1060 output_size: usize,
1061 }
1062 #[cfg(feature = "print_cache_item_size")]
1065 struct FormatSizes {
1066 size: usize,
1067 #[cfg(feature = "print_cache_item_size_with_compressed")]
1068 compressed_size: usize,
1069 }
1070 #[cfg(feature = "print_cache_item_size")]
1071 impl std::fmt::Display for FormatSizes {
1072 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1073 use turbo_tasks::util::FormatBytes;
1074 #[cfg(feature = "print_cache_item_size_with_compressed")]
1075 {
1076 write!(
1077 f,
1078 "{} ({} compressed)",
1079 FormatBytes(self.size),
1080 FormatBytes(self.compressed_size)
1081 )
1082 }
1083 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1084 {
1085 write!(f, "{}", FormatBytes(self.size))
1086 }
1087 }
1088 }
1089 #[cfg(feature = "print_cache_item_size")]
1090 impl TaskCacheStats {
1091 #[cfg(feature = "print_cache_item_size_with_compressed")]
1092 fn compressed_size(data: &[u8]) -> Result<usize> {
1093 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1094 data,
1095 &mut Vec::new(),
1096 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1097 )?)
1098 }
1099
1100 fn add_data(&mut self, data: &[u8]) {
1101 self.data += data.len();
1102 #[cfg(feature = "print_cache_item_size_with_compressed")]
1103 {
1104 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1105 }
1106 self.data_count += 1;
1107 }
1108
1109 fn add_meta(&mut self, data: &[u8]) {
1110 self.meta += data.len();
1111 #[cfg(feature = "print_cache_item_size_with_compressed")]
1112 {
1113 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1114 }
1115 self.meta_count += 1;
1116 }
1117
1118 fn add_counts(&mut self, storage: &TaskStorage) {
1119 let counts = storage.meta_counts();
1120 self.upper_count += counts.upper;
1121 self.collectibles_count += counts.collectibles;
1122 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1123 self.children_count += counts.children;
1124 self.followers_count += counts.followers;
1125 self.collectibles_dependents_count += counts.collectibles_dependents;
1126 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1127 if let Some(output) = storage.get_output() {
1128 use turbo_bincode::turbo_bincode_encode;
1129
1130 self.output_size += turbo_bincode_encode(&output)
1131 .map(|data| data.len())
1132 .unwrap_or(0);
1133 }
1134 }
1135
1136 fn task_name(storage: &TaskStorage) -> String {
1138 storage
1139 .get_persistent_task_type()
1140 .map(|t| t.to_string())
1141 .unwrap_or_else(|| "<unknown>".to_string())
1142 }
1143
1144 fn sort_key(&self) -> usize {
1147 #[cfg(feature = "print_cache_item_size_with_compressed")]
1148 {
1149 self.data_compressed + self.meta_compressed
1150 }
1151 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1152 {
1153 self.data + self.meta
1154 }
1155 }
1156
1157 fn format_total(&self) -> FormatSizes {
1158 FormatSizes {
1159 size: self.data + self.meta,
1160 #[cfg(feature = "print_cache_item_size_with_compressed")]
1161 compressed_size: self.data_compressed + self.meta_compressed,
1162 }
1163 }
1164
1165 fn format_data(&self) -> FormatSizes {
1166 FormatSizes {
1167 size: self.data,
1168 #[cfg(feature = "print_cache_item_size_with_compressed")]
1169 compressed_size: self.data_compressed,
1170 }
1171 }
1172
1173 fn format_avg_data(&self) -> FormatSizes {
1174 FormatSizes {
1175 size: self.data.checked_div(self.data_count).unwrap_or(0),
1176 #[cfg(feature = "print_cache_item_size_with_compressed")]
1177 compressed_size: self
1178 .data_compressed
1179 .checked_div(self.data_count)
1180 .unwrap_or(0),
1181 }
1182 }
1183
1184 fn format_meta(&self) -> FormatSizes {
1185 FormatSizes {
1186 size: self.meta,
1187 #[cfg(feature = "print_cache_item_size_with_compressed")]
1188 compressed_size: self.meta_compressed,
1189 }
1190 }
1191
1192 fn format_avg_meta(&self) -> FormatSizes {
1193 FormatSizes {
1194 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1195 #[cfg(feature = "print_cache_item_size_with_compressed")]
1196 compressed_size: self
1197 .meta_compressed
1198 .checked_div(self.meta_count)
1199 .unwrap_or(0),
1200 }
1201 }
1202 }
1203 #[cfg(feature = "print_cache_item_size")]
1204 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1205 Mutex::new(FxHashMap::default());
1206
1207 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1214 let encode_category = |task_id: TaskId,
1215 data: &TaskStorage,
1216 category: SpecificTaskDataCategory,
1217 buffer: &mut TurboBincodeBuffer|
1218 -> Option<TurboBincodeBuffer> {
1219 match encode_task_data(task_id, data, category, buffer) {
1220 Ok(encoded) => {
1221 #[cfg(feature = "print_cache_item_size")]
1222 {
1223 let mut stats = task_cache_stats.lock();
1224 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1225 match category {
1226 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1227 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1228 }
1229 }
1230 Some(encoded)
1231 }
1232 Err(err) => {
1233 panic!(
1234 "Serializing task {} failed ({:?}): {:?}",
1235 self.debug_get_task_description(task_id),
1236 category,
1237 err
1238 );
1239 }
1240 }
1241 };
1242 if task_id.is_transient() {
1243 unreachable!("transient task_ids should never be enqueued to be persisted");
1244 }
1245
1246 let encode_meta = inner.flags.meta_modified();
1247 let encode_data = inner.flags.data_modified();
1248
1249 #[cfg(feature = "print_cache_item_size")]
1250 if encode_data || encode_meta {
1251 task_cache_stats
1252 .lock()
1253 .entry(TaskCacheStats::task_name(inner))
1254 .or_default()
1255 .add_counts(inner);
1256 }
1257
1258 let meta = if encode_meta {
1259 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1260 } else {
1261 None
1262 };
1263
1264 let data = if encode_data {
1265 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1266 } else {
1267 None
1268 };
1269 let task_type_hash = if inner.flags.new_task() {
1270 let task_type = inner.get_persistent_task_type().expect(
1271 "It is not possible for a new_task to not have a persistent_task_type. Task \
1272 creation for persistent tasks uses a single ExecutionContextImpl for \
1273 creating the task (which sets new_task) and connect_child (which sets \
1274 persistent_task_type) and take_snapshot waits for all operations to complete \
1275 or suspend before we start snapshotting. So task creation will always set \
1276 the task_type.",
1277 );
1278 Some(compute_task_type_hash(task_type))
1279 } else {
1280 None
1281 };
1282
1283 SnapshotItem {
1284 task_id,
1285 meta,
1286 data,
1287 task_type_hash,
1288 }
1289 };
1290
1291 let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
1292
1293 drop(snapshot_span);
1294 let snapshot_duration = start.elapsed();
1295 let task_count = task_snapshots.len();
1296
1297 if task_snapshots.is_empty() {
1298 std::hint::cold_path();
1301 return Ok((snapshot_time, false));
1302 }
1303
1304 let persist_start = Instant::now();
1305 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1306 {
1307 self.backing_storage
1311 .save_snapshot(suspended_operations, task_snapshots)?;
1312 #[cfg(feature = "print_cache_item_size")]
1313 {
1314 let mut task_cache_stats = task_cache_stats
1315 .into_inner()
1316 .into_iter()
1317 .collect::<Vec<_>>();
1318 if !task_cache_stats.is_empty() {
1319 use turbo_tasks::util::FormatBytes;
1320
1321 use crate::utils::markdown_table::print_markdown_table;
1322
1323 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1324 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1325 });
1326
1327 println!(
1328 "Task cache stats: {}",
1329 FormatSizes {
1330 size: task_cache_stats
1331 .iter()
1332 .map(|(_, s)| s.data + s.meta)
1333 .sum::<usize>(),
1334 #[cfg(feature = "print_cache_item_size_with_compressed")]
1335 compressed_size: task_cache_stats
1336 .iter()
1337 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1338 .sum::<usize>()
1339 },
1340 );
1341
1342 print_markdown_table(
1343 [
1344 "Task",
1345 " Total Size",
1346 " Data Size",
1347 " Data Count x Avg",
1348 " Data Count x Avg",
1349 " Meta Size",
1350 " Meta Count x Avg",
1351 " Meta Count x Avg",
1352 " Uppers",
1353 " Coll",
1354 " Agg Coll",
1355 " Children",
1356 " Followers",
1357 " Coll Deps",
1358 " Agg Dirty",
1359 " Output Size",
1360 ],
1361 task_cache_stats.iter(),
1362 |(task_desc, stats)| {
1363 [
1364 task_desc.to_string(),
1365 format!(" {}", stats.format_total()),
1366 format!(" {}", stats.format_data()),
1367 format!(" {} x", stats.data_count),
1368 format!("{}", stats.format_avg_data()),
1369 format!(" {}", stats.format_meta()),
1370 format!(" {} x", stats.meta_count),
1371 format!("{}", stats.format_avg_meta()),
1372 format!(" {}", stats.upper_count),
1373 format!(" {}", stats.collectibles_count),
1374 format!(" {}", stats.aggregated_collectibles_count),
1375 format!(" {}", stats.children_count),
1376 format!(" {}", stats.followers_count),
1377 format!(" {}", stats.collectibles_dependents_count),
1378 format!(" {}", stats.aggregated_dirty_containers_count),
1379 format!(" {}", FormatBytes(stats.output_size)),
1380 ]
1381 },
1382 );
1383 }
1384 }
1385 }
1386
1387 let elapsed = start.elapsed();
1388 let persist_duration = persist_start.elapsed();
1389 if elapsed > Duration::from_secs(10) {
1391 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1392 "Finished writing to filesystem cache".to_string(),
1393 elapsed,
1394 )));
1395 }
1396
1397 let wall_start_ms = wall_start
1398 .duration_since(SystemTime::UNIX_EPOCH)
1399 .unwrap_or_default()
1400 .as_secs_f64()
1402 * 1000.0;
1403 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1404 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1405 "turbopack-persistence",
1406 wall_start_ms,
1407 wall_end_ms,
1408 vec![
1409 ("reason", serde_json::Value::from(reason)),
1410 (
1411 "snapshot_duration_ms",
1412 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1413 ),
1414 (
1415 "persist_duration_ms",
1416 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1417 ),
1418 ("task_count", serde_json::Value::from(task_count)),
1419 ],
1420 )));
1421
1422 Ok((snapshot_time, true))
1423 }
1424
1425 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1426 if self.should_restore() {
1427 let uncompleted_operations = self
1431 .backing_storage
1432 .uncompleted_operations()
1433 .expect("Failed to get uncompleted operations");
1434 if !uncompleted_operations.is_empty() {
1435 let mut ctx = self.execute_context(turbo_tasks);
1436 for op in uncompleted_operations {
1437 op.execute(&mut ctx);
1438 }
1439 }
1440 }
1441
1442 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1445 let _span = trace_span!("persisting background job").entered();
1447 let _span = tracing::info_span!("thread").entered();
1448 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1449 }
1450 }
1451
1452 fn stopping(&self) {
1453 self.stopping.store(true, Ordering::Release);
1454 self.stopping_event.notify(usize::MAX);
1455 }
1456
1457 #[allow(unused_variables)]
1458 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1459 #[cfg(feature = "verify_aggregation_graph")]
1460 {
1461 self.is_idle.store(false, Ordering::Release);
1462 self.verify_aggregation_graph(turbo_tasks, false);
1463 }
1464 if self.should_persist()
1465 && let Err(err) = self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks)
1466 {
1467 eprintln!("Persisting failed during shutdown: {err:?}");
1468 }
1469 drop_contents(&self.task_cache);
1470 self.storage.drop_contents();
1471 if let Err(err) = self.backing_storage.shutdown() {
1472 println!("Shutting down failed: {err}");
1473 }
1474 }
1475
1476 #[allow(unused_variables)]
1477 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1478 self.idle_start_event.notify(usize::MAX);
1479
1480 #[cfg(feature = "verify_aggregation_graph")]
1481 {
1482 use tokio::select;
1483
1484 self.is_idle.store(true, Ordering::Release);
1485 let this = self.clone();
1486 let turbo_tasks = turbo_tasks.pin();
1487 tokio::task::spawn(async move {
1488 select! {
1489 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1490 }
1492 _ = this.idle_end_event.listen() => {
1493 return;
1494 }
1495 }
1496 if !this.is_idle.load(Ordering::Relaxed) {
1497 return;
1498 }
1499 this.verify_aggregation_graph(&*turbo_tasks, true);
1500 });
1501 }
1502 }
1503
1504 fn idle_end(&self) {
1505 #[cfg(feature = "verify_aggregation_graph")]
1506 self.is_idle.store(false, Ordering::Release);
1507 self.idle_end_event.notify(usize::MAX);
1508 }
1509
1510 fn get_or_create_persistent_task(
1511 &self,
1512 task_type: CachedTaskType,
1513 parent_task: Option<TaskId>,
1514 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1515 ) -> TaskId {
1516 let is_root = task_type.native_fn.is_root;
1517
1518 if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1522 self.track_cache_hit(&task_type);
1523 self.connect_child(
1524 parent_task,
1525 task_id,
1526 Some(ArcOrOwned::Owned(task_type)),
1527 turbo_tasks,
1528 );
1529 return task_id;
1530 }
1531
1532 let mut ctx = self.execute_context(turbo_tasks);
1534
1535 let mut is_new = false;
1536 let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1537 self.track_cache_hit(&task_type);
1540 let task_type = match raw_entry(&self.task_cache, &task_type) {
1541 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1542 RawEntry::Vacant(e) => {
1543 let task_type = Arc::new(task_type);
1544 e.insert(task_type.clone(), task_id);
1545 ArcOrOwned::Arc(task_type)
1546 }
1547 };
1548 (task_id, task_type)
1549 } else {
1550 let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1553 RawEntry::Occupied(e) => {
1554 let task_id = *e.get();
1557 drop(e);
1558 self.track_cache_hit(&task_type);
1559 (task_id, ArcOrOwned::Owned(task_type))
1560 }
1561 RawEntry::Vacant(e) => {
1562 let task_type = Arc::new(task_type);
1564 let task_id = self.persisted_task_id_factory.get();
1565 self.storage.initialize_new_task(task_id);
1569 e.insert(task_type.clone(), task_id);
1570 self.track_cache_miss(&task_type);
1572 is_new = true;
1573 (task_id, ArcOrOwned::Arc(task_type))
1574 }
1575 };
1576 (task_id, task_type)
1577 };
1578 if is_new && is_root {
1579 AggregationUpdateQueue::run(
1580 AggregationUpdateJob::UpdateAggregationNumber {
1581 task_id,
1582 base_aggregation_number: u32::MAX,
1583 distance: None,
1584 },
1585 &mut ctx,
1586 );
1587 }
1588 operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1590
1591 task_id
1592 }
1593
1594 fn get_or_create_transient_task(
1595 &self,
1596 task_type: CachedTaskType,
1597 parent_task: Option<TaskId>,
1598 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1599 ) -> TaskId {
1600 let is_root = task_type.native_fn.is_root;
1601
1602 if let Some(parent_task) = parent_task
1603 && !parent_task.is_transient()
1604 {
1605 self.panic_persistent_calling_transient(
1606 self.debug_get_task_description(parent_task),
1607 Some(&task_type),
1608 None,
1609 );
1610 }
1611 if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
1615 self.track_cache_hit(&task_type);
1616 self.connect_child(
1617 parent_task,
1618 task_id,
1619 Some(ArcOrOwned::Owned(task_type)),
1620 turbo_tasks,
1621 );
1622 return task_id;
1623 }
1624 match raw_entry(&self.task_cache, &task_type) {
1626 RawEntry::Occupied(e) => {
1627 let task_id = *e.get();
1628 drop(e);
1629 self.track_cache_hit(&task_type);
1630 self.connect_child(
1631 parent_task,
1632 task_id,
1633 Some(ArcOrOwned::Owned(task_type)),
1634 turbo_tasks,
1635 );
1636 task_id
1637 }
1638 RawEntry::Vacant(e) => {
1639 let task_type = Arc::new(task_type);
1640 let task_id = self.transient_task_id_factory.get();
1641 self.storage.initialize_new_task(task_id);
1643 e.insert(task_type.clone(), task_id);
1644 self.track_cache_miss(&task_type);
1645
1646 if is_root {
1647 let mut ctx = self.execute_context(turbo_tasks);
1648 AggregationUpdateQueue::run(
1649 AggregationUpdateJob::UpdateAggregationNumber {
1650 task_id,
1651 base_aggregation_number: u32::MAX,
1652 distance: None,
1653 },
1654 &mut ctx,
1655 );
1656 }
1657
1658 self.connect_child(
1659 parent_task,
1660 task_id,
1661 Some(ArcOrOwned::Arc(task_type)),
1662 turbo_tasks,
1663 );
1664
1665 task_id
1666 }
1667 }
1668 }
1669
1670 fn debug_trace_transient_task(
1673 &self,
1674 task_type: &CachedTaskType,
1675 cell_id: Option<CellId>,
1676 ) -> DebugTraceTransientTask {
1677 fn inner_id(
1680 backend: &TurboTasksBackendInner<impl BackingStorage>,
1681 task_id: TaskId,
1682 cell_type_id: Option<ValueTypeId>,
1683 visited_set: &mut FxHashSet<TaskId>,
1684 ) -> DebugTraceTransientTask {
1685 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1686 if visited_set.contains(&task_id) {
1687 let task_name = task_type.get_name();
1688 DebugTraceTransientTask::Collapsed {
1689 task_name,
1690 cell_type_id,
1691 }
1692 } else {
1693 inner_cached(backend, &task_type, cell_type_id, visited_set)
1694 }
1695 } else {
1696 DebugTraceTransientTask::Uncached { cell_type_id }
1697 }
1698 }
1699 fn inner_cached(
1700 backend: &TurboTasksBackendInner<impl BackingStorage>,
1701 task_type: &CachedTaskType,
1702 cell_type_id: Option<ValueTypeId>,
1703 visited_set: &mut FxHashSet<TaskId>,
1704 ) -> DebugTraceTransientTask {
1705 let task_name = task_type.get_name();
1706
1707 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1708 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1709 return None;
1713 };
1714 if task_id.is_transient() {
1715 Some(Box::new(inner_id(
1716 backend,
1717 task_id,
1718 cause_self_raw_vc.try_get_type_id(),
1719 visited_set,
1720 )))
1721 } else {
1722 None
1723 }
1724 });
1725 let cause_args = task_type
1726 .arg
1727 .get_raw_vcs()
1728 .into_iter()
1729 .filter_map(|raw_vc| {
1730 let Some(task_id) = raw_vc.try_get_task_id() else {
1731 return None;
1733 };
1734 if !task_id.is_transient() {
1735 return None;
1736 }
1737 Some((task_id, raw_vc.try_get_type_id()))
1738 })
1739 .collect::<IndexSet<_>>() .into_iter()
1741 .map(|(task_id, cell_type_id)| {
1742 inner_id(backend, task_id, cell_type_id, visited_set)
1743 })
1744 .collect();
1745
1746 DebugTraceTransientTask::Cached {
1747 task_name,
1748 cell_type_id,
1749 cause_self,
1750 cause_args,
1751 }
1752 }
1753 inner_cached(
1754 self,
1755 task_type,
1756 cell_id.map(|c| c.type_id),
1757 &mut FxHashSet::default(),
1758 )
1759 }
1760
1761 fn invalidate_task(
1762 &self,
1763 task_id: TaskId,
1764 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1765 ) {
1766 if !self.should_track_dependencies() {
1767 panic!("Dependency tracking is disabled so invalidation is not allowed");
1768 }
1769 operation::InvalidateOperation::run(
1770 smallvec![task_id],
1771 #[cfg(feature = "trace_task_dirty")]
1772 TaskDirtyCause::Invalidator,
1773 self.execute_context(turbo_tasks),
1774 );
1775 }
1776
1777 fn invalidate_tasks(
1778 &self,
1779 tasks: &[TaskId],
1780 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1781 ) {
1782 if !self.should_track_dependencies() {
1783 panic!("Dependency tracking is disabled so invalidation is not allowed");
1784 }
1785 operation::InvalidateOperation::run(
1786 tasks.iter().copied().collect(),
1787 #[cfg(feature = "trace_task_dirty")]
1788 TaskDirtyCause::Unknown,
1789 self.execute_context(turbo_tasks),
1790 );
1791 }
1792
1793 fn invalidate_tasks_set(
1794 &self,
1795 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1796 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1797 ) {
1798 if !self.should_track_dependencies() {
1799 panic!("Dependency tracking is disabled so invalidation is not allowed");
1800 }
1801 operation::InvalidateOperation::run(
1802 tasks.iter().copied().collect(),
1803 #[cfg(feature = "trace_task_dirty")]
1804 TaskDirtyCause::Unknown,
1805 self.execute_context(turbo_tasks),
1806 );
1807 }
1808
1809 fn invalidate_serialization(
1810 &self,
1811 task_id: TaskId,
1812 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1813 ) {
1814 if task_id.is_transient() {
1815 return;
1816 }
1817 let mut ctx = self.execute_context(turbo_tasks);
1818 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1819 task.invalidate_serialization();
1820 }
1821
1822 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1823 let task = self.storage.access_mut(task_id);
1824 if let Some(value) = task.get_persistent_task_type() {
1825 format!("{task_id:?} {}", value)
1826 } else if let Some(value) = task.get_transient_task_type() {
1827 format!("{task_id:?} {}", value)
1828 } else {
1829 format!("{task_id:?} unknown")
1830 }
1831 }
1832
1833 fn get_task_name(
1834 &self,
1835 task_id: TaskId,
1836 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1837 ) -> String {
1838 let mut ctx = self.execute_context(turbo_tasks);
1839 let task = ctx.task(task_id, TaskDataCategory::Data);
1840 if let Some(value) = task.get_persistent_task_type() {
1841 value.to_string()
1842 } else if let Some(value) = task.get_transient_task_type() {
1843 value.to_string()
1844 } else {
1845 "unknown".to_string()
1846 }
1847 }
1848
1849 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1850 let task = self.storage.access_mut(task_id);
1851 task.get_persistent_task_type().cloned()
1852 }
1853
1854 fn task_execution_canceled(
1855 &self,
1856 task_id: TaskId,
1857 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1858 ) {
1859 let mut ctx = self.execute_context(turbo_tasks);
1860 let mut task = ctx.task(task_id, TaskDataCategory::All);
1861 if let Some(in_progress) = task.take_in_progress() {
1862 match in_progress {
1863 InProgressState::Scheduled {
1864 done_event,
1865 reason: _,
1866 } => done_event.notify(usize::MAX),
1867 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1868 done_event.notify(usize::MAX)
1869 }
1870 InProgressState::Canceled => {}
1871 }
1872 }
1873 let in_progress_cells = task.take_in_progress_cells();
1876 if let Some(ref cells) = in_progress_cells {
1877 for state in cells.values() {
1878 state.event.notify(usize::MAX);
1879 }
1880 }
1881
1882 let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1888 task.update_dirty_state(Some(Dirtyness::SessionDependent))
1889 } else {
1890 None
1891 };
1892
1893 let old = task.set_in_progress(InProgressState::Canceled);
1894 debug_assert!(old.is_none(), "InProgress already exists");
1895 drop(task);
1896
1897 if let Some(data_update) = data_update {
1898 AggregationUpdateQueue::run(data_update, &mut ctx);
1899 }
1900
1901 drop(in_progress_cells);
1902 }
1903
1904 fn try_start_task_execution(
1905 &self,
1906 task_id: TaskId,
1907 priority: TaskPriority,
1908 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1909 ) -> Option<TaskExecutionSpec<'_>> {
1910 let execution_reason;
1911 let task_type;
1912 {
1913 let mut ctx = self.execute_context(turbo_tasks);
1914 let mut task = ctx.task(task_id, TaskDataCategory::All);
1915 task_type = task.get_task_type().to_owned();
1916 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1917 if let Some(tasks) = task.prefetch() {
1918 drop(task);
1919 ctx.prepare_tasks(tasks, "prefetch");
1920 task = ctx.task(task_id, TaskDataCategory::All);
1921 }
1922 let in_progress = task.take_in_progress()?;
1923 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1924 let old = task.set_in_progress(in_progress);
1925 debug_assert!(old.is_none(), "InProgress already exists");
1926 return None;
1927 };
1928 execution_reason = reason;
1929 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1930 InProgressStateInner {
1931 stale: false,
1932 once_task,
1933 done_event,
1934 session_dependent: false,
1935 marked_as_completed: false,
1936 new_children: Default::default(),
1937 },
1938 )));
1939 debug_assert!(old.is_none(), "InProgress already exists");
1940
1941 enum Collectible {
1943 Current(CollectibleRef, i32),
1944 Outdated(CollectibleRef),
1945 }
1946 let collectibles = task
1947 .iter_collectibles()
1948 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1949 .chain(
1950 task.iter_outdated_collectibles()
1951 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1952 )
1953 .collect::<Vec<_>>();
1954 for collectible in collectibles {
1955 match collectible {
1956 Collectible::Current(collectible, value) => {
1957 let _ = task.insert_outdated_collectible(collectible, value);
1958 }
1959 Collectible::Outdated(collectible) => {
1960 if task
1961 .collectibles()
1962 .is_none_or(|m| m.get(&collectible).is_none())
1963 {
1964 task.remove_outdated_collectibles(&collectible);
1965 }
1966 }
1967 }
1968 }
1969
1970 if self.should_track_dependencies() {
1971 let cell_dependencies = task.iter_cell_dependencies().collect();
1973 task.set_outdated_cell_dependencies(cell_dependencies);
1974
1975 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1976 task.set_outdated_output_dependencies(outdated_output_dependencies);
1977 }
1978 }
1979
1980 let (span, future) = match task_type {
1981 TaskType::Cached(task_type) => {
1982 let CachedTaskType {
1983 native_fn,
1984 this,
1985 arg,
1986 } = &*task_type;
1987 (
1988 native_fn.span(task_id.persistence(), execution_reason, priority),
1989 native_fn.execute(*this, &**arg),
1990 )
1991 }
1992 TaskType::Transient(task_type) => {
1993 let span = tracing::trace_span!("turbo_tasks::root_task");
1994 let future = match &*task_type {
1995 TransientTask::Root(f) => f(),
1996 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1997 };
1998 (span, future)
1999 }
2000 };
2001 Some(TaskExecutionSpec { future, span })
2002 }
2003
2004 fn task_execution_completed(
2005 &self,
2006 task_id: TaskId,
2007 result: Result<RawVc, TurboTasksExecutionError>,
2008 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2009 #[cfg(feature = "verify_determinism")] stateful: bool,
2010 has_invalidator: bool,
2011 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2012 ) -> bool {
2013 #[cfg(not(feature = "trace_task_details"))]
2028 let span = tracing::trace_span!(
2029 "task execution completed",
2030 new_children = tracing::field::Empty
2031 )
2032 .entered();
2033 #[cfg(feature = "trace_task_details")]
2034 let span = tracing::trace_span!(
2035 "task execution completed",
2036 task_id = display(task_id),
2037 result = match result.as_ref() {
2038 Ok(value) => display(either::Either::Left(value)),
2039 Err(err) => display(either::Either::Right(err)),
2040 },
2041 new_children = tracing::field::Empty,
2042 immutable = tracing::field::Empty,
2043 new_output = tracing::field::Empty,
2044 output_dependents = tracing::field::Empty,
2045 stale = tracing::field::Empty,
2046 )
2047 .entered();
2048
2049 let is_error = result.is_err();
2050
2051 let mut ctx = self.execute_context(turbo_tasks);
2052
2053 let Some(TaskExecutionCompletePrepareResult {
2054 new_children,
2055 is_now_immutable,
2056 #[cfg(feature = "verify_determinism")]
2057 no_output_set,
2058 new_output,
2059 output_dependent_tasks,
2060 is_recomputation,
2061 }) = self.task_execution_completed_prepare(
2062 &mut ctx,
2063 #[cfg(feature = "trace_task_details")]
2064 &span,
2065 task_id,
2066 result,
2067 cell_counters,
2068 #[cfg(feature = "verify_determinism")]
2069 stateful,
2070 has_invalidator,
2071 )
2072 else {
2073 #[cfg(feature = "trace_task_details")]
2075 span.record("stale", "prepare");
2076 return true;
2077 };
2078
2079 #[cfg(feature = "trace_task_details")]
2080 span.record("new_output", new_output.is_some());
2081 #[cfg(feature = "trace_task_details")]
2082 span.record("output_dependents", output_dependent_tasks.len());
2083
2084 if !output_dependent_tasks.is_empty() {
2089 self.task_execution_completed_invalidate_output_dependent(
2090 &mut ctx,
2091 task_id,
2092 output_dependent_tasks,
2093 );
2094 }
2095
2096 let has_new_children = !new_children.is_empty();
2097 span.record("new_children", new_children.len());
2098
2099 if has_new_children {
2100 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2101 }
2102
2103 if has_new_children
2104 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2105 {
2106 #[cfg(feature = "trace_task_details")]
2108 span.record("stale", "connect");
2109 return true;
2110 }
2111
2112 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2113 &mut ctx,
2114 task_id,
2115 #[cfg(feature = "verify_determinism")]
2116 no_output_set,
2117 new_output,
2118 is_now_immutable,
2119 );
2120 if stale {
2121 #[cfg(feature = "trace_task_details")]
2123 span.record("stale", "finish");
2124 return true;
2125 }
2126
2127 let removed_data = self.task_execution_completed_cleanup(
2128 &mut ctx,
2129 task_id,
2130 cell_counters,
2131 is_error,
2132 is_recomputation,
2133 );
2134
2135 drop(removed_data);
2137 drop(in_progress_cells);
2138
2139 false
2140 }
2141
2142 fn task_execution_completed_prepare(
2143 &self,
2144 ctx: &mut impl ExecuteContext<'_>,
2145 #[cfg(feature = "trace_task_details")] span: &Span,
2146 task_id: TaskId,
2147 result: Result<RawVc, TurboTasksExecutionError>,
2148 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2149 #[cfg(feature = "verify_determinism")] stateful: bool,
2150 has_invalidator: bool,
2151 ) -> Option<TaskExecutionCompletePrepareResult> {
2152 let mut task = ctx.task(task_id, TaskDataCategory::All);
2153 let is_recomputation = task.is_dirty().is_none();
2154 let Some(in_progress) = task.get_in_progress_mut() else {
2155 panic!("Task execution completed, but task is not in progress: {task:#?}");
2156 };
2157 if matches!(in_progress, InProgressState::Canceled) {
2158 return Some(TaskExecutionCompletePrepareResult {
2159 new_children: Default::default(),
2160 is_now_immutable: false,
2161 #[cfg(feature = "verify_determinism")]
2162 no_output_set: false,
2163 new_output: None,
2164 output_dependent_tasks: Default::default(),
2165 is_recomputation,
2166 });
2167 }
2168 let &mut InProgressState::InProgress(box InProgressStateInner {
2169 stale,
2170 ref mut new_children,
2171 session_dependent,
2172 once_task: is_once_task,
2173 ..
2174 }) = in_progress
2175 else {
2176 panic!("Task execution completed, but task is not in progress: {task:#?}");
2177 };
2178
2179 #[cfg(not(feature = "no_fast_stale"))]
2181 if stale && !is_once_task {
2182 let Some(InProgressState::InProgress(box InProgressStateInner {
2183 done_event,
2184 mut new_children,
2185 ..
2186 })) = task.take_in_progress()
2187 else {
2188 unreachable!();
2189 };
2190 let old = task.set_in_progress(InProgressState::Scheduled {
2191 done_event,
2192 reason: TaskExecutionReason::Stale,
2193 });
2194 debug_assert!(old.is_none(), "InProgress already exists");
2195 for task in task.iter_children() {
2198 new_children.remove(&task);
2199 }
2200 drop(task);
2201
2202 AggregationUpdateQueue::run(
2205 AggregationUpdateJob::DecreaseActiveCounts {
2206 task_ids: new_children.into_iter().collect(),
2207 },
2208 ctx,
2209 );
2210 return None;
2211 }
2212
2213 let mut new_children = take(new_children);
2215
2216 #[cfg(feature = "verify_determinism")]
2218 if stateful {
2219 task.set_stateful(true);
2220 }
2221
2222 if has_invalidator {
2224 task.set_invalidator(true);
2225 }
2226
2227 if result.is_ok() || is_recomputation {
2237 let old_counters: FxHashMap<_, _> = task
2238 .iter_cell_type_max_index()
2239 .map(|(&k, &v)| (k, v))
2240 .collect();
2241 let mut counters_to_remove = old_counters.clone();
2242
2243 for (&cell_type, &max_index) in cell_counters.iter() {
2244 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2245 if old_max_index != max_index {
2246 task.insert_cell_type_max_index(cell_type, max_index);
2247 }
2248 } else {
2249 task.insert_cell_type_max_index(cell_type, max_index);
2250 }
2251 }
2252 for (cell_type, _) in counters_to_remove {
2253 task.remove_cell_type_max_index(&cell_type);
2254 }
2255 }
2256
2257 let mut queue = AggregationUpdateQueue::new();
2258
2259 let mut old_edges = Vec::new();
2260
2261 let has_children = !new_children.is_empty();
2262 let is_immutable = task.immutable();
2263 let task_dependencies_for_immutable =
2264 if !is_immutable
2266 && !session_dependent
2268 && !task.invalidator()
2270 && task.is_collectibles_dependencies_empty()
2272 {
2273 Some(
2274 task.iter_output_dependencies()
2276 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2277 .collect::<FxHashSet<_>>(),
2278 )
2279 } else {
2280 None
2281 };
2282
2283 if has_children {
2284 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2286
2287 old_edges.extend(
2289 task.iter_children()
2290 .filter(|task| !new_children.remove(task))
2291 .map(OutdatedEdge::Child),
2292 );
2293 } else {
2294 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2295 }
2296
2297 old_edges.extend(
2298 task.iter_outdated_collectibles()
2299 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2300 );
2301
2302 if self.should_track_dependencies() {
2303 old_edges.extend(
2310 task.iter_outdated_cell_dependencies()
2311 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2312 );
2313 old_edges.extend(
2314 task.iter_outdated_output_dependencies()
2315 .map(OutdatedEdge::OutputDependency),
2316 );
2317 }
2318
2319 let current_output = task.get_output();
2321 #[cfg(feature = "verify_determinism")]
2322 let no_output_set = current_output.is_none();
2323 let new_output = match result {
2324 Ok(RawVc::TaskOutput(output_task_id)) => {
2325 if let Some(OutputValue::Output(current_task_id)) = current_output
2326 && *current_task_id == output_task_id
2327 {
2328 None
2329 } else {
2330 Some(OutputValue::Output(output_task_id))
2331 }
2332 }
2333 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2334 if let Some(OutputValue::Cell(CellRef {
2335 task: current_task_id,
2336 cell: current_cell,
2337 })) = current_output
2338 && *current_task_id == output_task_id
2339 && *current_cell == cell
2340 {
2341 None
2342 } else {
2343 Some(OutputValue::Cell(CellRef {
2344 task: output_task_id,
2345 cell,
2346 }))
2347 }
2348 }
2349 Ok(RawVc::LocalOutput(..)) => {
2350 panic!("Non-local tasks must not return a local Vc");
2351 }
2352 Err(err) => {
2353 if let Some(OutputValue::Error(old_error)) = current_output
2354 && **old_error == err
2355 {
2356 None
2357 } else {
2358 Some(OutputValue::Error(Arc::new((&err).into())))
2359 }
2360 }
2361 };
2362 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2363 if new_output.is_some() && ctx.should_track_dependencies() {
2365 output_dependent_tasks = task.iter_output_dependent().collect();
2366 }
2367
2368 drop(task);
2369
2370 let mut is_now_immutable = false;
2372 if let Some(dependencies) = task_dependencies_for_immutable
2373 && dependencies
2374 .iter()
2375 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2376 {
2377 is_now_immutable = true;
2378 }
2379 #[cfg(feature = "trace_task_details")]
2380 span.record("immutable", is_immutable || is_now_immutable);
2381
2382 if !queue.is_empty() || !old_edges.is_empty() {
2383 #[cfg(feature = "trace_task_completion")]
2384 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2385 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2389 }
2390
2391 Some(TaskExecutionCompletePrepareResult {
2392 new_children,
2393 is_now_immutable,
2394 #[cfg(feature = "verify_determinism")]
2395 no_output_set,
2396 new_output,
2397 output_dependent_tasks,
2398 is_recomputation,
2399 })
2400 }
2401
2402 fn task_execution_completed_invalidate_output_dependent(
2403 &self,
2404 ctx: &mut impl ExecuteContext<'_>,
2405 task_id: TaskId,
2406 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2407 ) {
2408 debug_assert!(!output_dependent_tasks.is_empty());
2409
2410 if output_dependent_tasks.len() > 1 {
2411 ctx.prepare_tasks(
2412 output_dependent_tasks
2413 .iter()
2414 .map(|&id| (id, TaskDataCategory::All)),
2415 "invalidate output dependents",
2416 );
2417 }
2418
2419 fn process_output_dependents(
2420 ctx: &mut impl ExecuteContext<'_>,
2421 task_id: TaskId,
2422 dependent_task_id: TaskId,
2423 queue: &mut AggregationUpdateQueue,
2424 ) {
2425 #[cfg(feature = "trace_task_output_dependencies")]
2426 let span = tracing::trace_span!(
2427 "invalidate output dependency",
2428 task = %task_id,
2429 dependent_task = %dependent_task_id,
2430 result = tracing::field::Empty,
2431 )
2432 .entered();
2433 let mut make_stale = true;
2434 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2435 let transient_task_type = dependent.get_transient_task_type();
2436 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2437 #[cfg(feature = "trace_task_output_dependencies")]
2439 span.record("result", "once task");
2440 return;
2441 }
2442 if dependent.outdated_output_dependencies_contains(&task_id) {
2443 #[cfg(feature = "trace_task_output_dependencies")]
2444 span.record("result", "outdated dependency");
2445 make_stale = false;
2450 } else if !dependent.output_dependencies_contains(&task_id) {
2451 #[cfg(feature = "trace_task_output_dependencies")]
2454 span.record("result", "no backward dependency");
2455 return;
2456 }
2457 make_task_dirty_internal(
2458 dependent,
2459 dependent_task_id,
2460 make_stale,
2461 #[cfg(feature = "trace_task_dirty")]
2462 TaskDirtyCause::OutputChange { task_id },
2463 queue,
2464 ctx,
2465 );
2466 #[cfg(feature = "trace_task_output_dependencies")]
2467 span.record("result", "marked dirty");
2468 }
2469
2470 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2471 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2472 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2473 let _ = scope_and_block(chunks.len(), |scope| {
2474 for chunk in chunks {
2475 let child_ctx = ctx.child_context();
2476 scope.spawn(move || {
2477 let mut ctx = child_ctx.create();
2478 let mut queue = AggregationUpdateQueue::new();
2479 for dependent_task_id in chunk {
2480 process_output_dependents(
2481 &mut ctx,
2482 task_id,
2483 dependent_task_id,
2484 &mut queue,
2485 )
2486 }
2487 queue.execute(&mut ctx);
2488 });
2489 }
2490 });
2491 } else {
2492 let mut queue = AggregationUpdateQueue::new();
2493 for dependent_task_id in output_dependent_tasks {
2494 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2495 }
2496 queue.execute(ctx);
2497 }
2498 }
2499
2500 fn task_execution_completed_unfinished_children_dirty(
2501 &self,
2502 ctx: &mut impl ExecuteContext<'_>,
2503 new_children: &FxHashSet<TaskId>,
2504 ) {
2505 debug_assert!(!new_children.is_empty());
2506
2507 let mut queue = AggregationUpdateQueue::new();
2508 ctx.for_each_task_all(
2509 new_children.iter().copied(),
2510 "unfinished children dirty",
2511 |child_task, ctx| {
2512 if !child_task.has_output() {
2513 let child_id = child_task.id();
2514 make_task_dirty_internal(
2515 child_task,
2516 child_id,
2517 false,
2518 #[cfg(feature = "trace_task_dirty")]
2519 TaskDirtyCause::InitialDirty,
2520 &mut queue,
2521 ctx,
2522 );
2523 }
2524 },
2525 );
2526
2527 queue.execute(ctx);
2528 }
2529
2530 fn task_execution_completed_connect(
2531 &self,
2532 ctx: &mut impl ExecuteContext<'_>,
2533 task_id: TaskId,
2534 new_children: FxHashSet<TaskId>,
2535 ) -> bool {
2536 debug_assert!(!new_children.is_empty());
2537
2538 let mut task = ctx.task(task_id, TaskDataCategory::All);
2539 let Some(in_progress) = task.get_in_progress() else {
2540 panic!("Task execution completed, but task is not in progress: {task:#?}");
2541 };
2542 if matches!(in_progress, InProgressState::Canceled) {
2543 return false;
2545 }
2546 let InProgressState::InProgress(box InProgressStateInner {
2547 #[cfg(not(feature = "no_fast_stale"))]
2548 stale,
2549 once_task: is_once_task,
2550 ..
2551 }) = in_progress
2552 else {
2553 panic!("Task execution completed, but task is not in progress: {task:#?}");
2554 };
2555
2556 #[cfg(not(feature = "no_fast_stale"))]
2558 if *stale && !is_once_task {
2559 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2560 task.take_in_progress()
2561 else {
2562 unreachable!();
2563 };
2564 let old = task.set_in_progress(InProgressState::Scheduled {
2565 done_event,
2566 reason: TaskExecutionReason::Stale,
2567 });
2568 debug_assert!(old.is_none(), "InProgress already exists");
2569 drop(task);
2570
2571 AggregationUpdateQueue::run(
2574 AggregationUpdateJob::DecreaseActiveCounts {
2575 task_ids: new_children.into_iter().collect(),
2576 },
2577 ctx,
2578 );
2579 return true;
2580 }
2581
2582 let has_active_count = ctx.should_track_activeness()
2583 && task
2584 .get_activeness()
2585 .is_some_and(|activeness| activeness.active_counter > 0);
2586 connect_children(
2587 ctx,
2588 task_id,
2589 task,
2590 new_children,
2591 has_active_count,
2592 ctx.should_track_activeness(),
2593 );
2594
2595 false
2596 }
2597
2598 fn task_execution_completed_finish(
2599 &self,
2600 ctx: &mut impl ExecuteContext<'_>,
2601 task_id: TaskId,
2602 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2603 new_output: Option<OutputValue>,
2604 is_now_immutable: bool,
2605 ) -> (
2606 bool,
2607 Option<
2608 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2609 >,
2610 ) {
2611 let mut task = ctx.task(task_id, TaskDataCategory::All);
2612 let Some(in_progress) = task.take_in_progress() else {
2613 panic!("Task execution completed, but task is not in progress: {task:#?}");
2614 };
2615 if matches!(in_progress, InProgressState::Canceled) {
2616 return (false, None);
2618 }
2619 let InProgressState::InProgress(box InProgressStateInner {
2620 done_event,
2621 once_task: is_once_task,
2622 stale,
2623 session_dependent,
2624 marked_as_completed: _,
2625 new_children,
2626 }) = in_progress
2627 else {
2628 panic!("Task execution completed, but task is not in progress: {task:#?}");
2629 };
2630 debug_assert!(new_children.is_empty());
2631
2632 if stale && !is_once_task {
2634 let old = task.set_in_progress(InProgressState::Scheduled {
2635 done_event,
2636 reason: TaskExecutionReason::Stale,
2637 });
2638 debug_assert!(old.is_none(), "InProgress already exists");
2639 return (true, None);
2640 }
2641
2642 let mut old_content = None;
2644 if let Some(value) = new_output {
2645 old_content = task.set_output(value);
2646 }
2647
2648 if is_now_immutable {
2651 task.set_immutable(true);
2652 }
2653
2654 let in_progress_cells = task.take_in_progress_cells();
2656 if let Some(ref cells) = in_progress_cells {
2657 for state in cells.values() {
2658 state.event.notify(usize::MAX);
2659 }
2660 }
2661
2662 let new_dirtyness = if session_dependent {
2664 Some(Dirtyness::SessionDependent)
2665 } else {
2666 None
2667 };
2668 #[cfg(feature = "verify_determinism")]
2669 let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2670 let data_update = task.update_dirty_state(new_dirtyness);
2671
2672 #[cfg(feature = "verify_determinism")]
2673 let reschedule =
2674 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2675 #[cfg(not(feature = "verify_determinism"))]
2676 let reschedule = false;
2677 if reschedule {
2678 let old = task.set_in_progress(InProgressState::Scheduled {
2679 done_event,
2680 reason: TaskExecutionReason::Stale,
2681 });
2682 debug_assert!(old.is_none(), "InProgress already exists");
2683 drop(task);
2684 } else {
2685 drop(task);
2686
2687 done_event.notify(usize::MAX);
2689 }
2690
2691 drop(old_content);
2692
2693 if let Some(data_update) = data_update {
2694 AggregationUpdateQueue::run(data_update, ctx);
2695 }
2696
2697 (reschedule, in_progress_cells)
2699 }
2700
2701 fn task_execution_completed_cleanup(
2702 &self,
2703 ctx: &mut impl ExecuteContext<'_>,
2704 task_id: TaskId,
2705 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2706 is_error: bool,
2707 is_recomputation: bool,
2708 ) -> Vec<SharedReference> {
2709 let mut task = ctx.task(task_id, TaskDataCategory::All);
2710 let mut removed_cell_data = Vec::new();
2711 if !is_error || is_recomputation {
2717 let to_remove_persistent: Vec<_> = task
2724 .iter_persistent_cell_data()
2725 .filter_map(|(cell, _)| {
2726 cell_counters
2727 .get(&cell.type_id)
2728 .is_none_or(|start_index| cell.index >= *start_index)
2729 .then_some(*cell)
2730 })
2731 .collect();
2732
2733 let to_remove_transient: Vec<_> = task
2735 .iter_transient_cell_data()
2736 .filter_map(|(cell, _)| {
2737 cell_counters
2738 .get(&cell.type_id)
2739 .is_none_or(|start_index| cell.index >= *start_index)
2740 .then_some(*cell)
2741 })
2742 .collect();
2743 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2744 for cell in to_remove_persistent {
2745 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2746 removed_cell_data.push(data.into_untyped());
2747 }
2748 }
2749 for cell in to_remove_transient {
2750 if let Some(data) = task.remove_transient_cell_data(&cell) {
2751 removed_cell_data.push(data);
2752 }
2753 }
2754 let to_remove_hash: Vec<_> = task
2756 .iter_cell_data_hash()
2757 .filter_map(|(cell, _)| {
2758 cell_counters
2759 .get(&cell.type_id)
2760 .is_none_or(|start_index| cell.index >= *start_index)
2761 .then_some(*cell)
2762 })
2763 .collect();
2764 for cell in to_remove_hash {
2765 task.remove_cell_data_hash(&cell);
2766 }
2767 }
2768
2769 task.cleanup_after_execution();
2773
2774 drop(task);
2775
2776 removed_cell_data
2778 }
2779
2780 fn log_unrecoverable_persist_error() {
2783 eprintln!(
2784 "Persisting is disabled for this session due to an unrecoverable error. Stopping the \
2785 background persisting process."
2786 );
2787 }
2788
2789 fn run_backend_job<'a>(
2790 self: &'a Arc<Self>,
2791 job: TurboTasksBackendJob,
2792 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2793 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2794 Box::pin(async move {
2795 match job {
2796 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2797 debug_assert!(self.should_persist());
2798
2799 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2800 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2801 let mut idle_start_listener = self.idle_start_event.listen();
2802 let mut idle_end_listener = self.idle_end_event.listen();
2803 let mut fresh_idle = true;
2804 loop {
2805 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2806 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2807 let idle_timeout = *IDLE_TIMEOUT;
2808 let (time, mut reason) =
2809 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2810 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2811 } else {
2812 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2813 };
2814
2815 let until = last_snapshot + time;
2816 if until > Instant::now() {
2817 let mut stop_listener = self.stopping_event.listen();
2818 if self.stopping.load(Ordering::Acquire) {
2819 return;
2820 }
2821 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2822 Instant::now() + idle_timeout
2823 } else {
2824 far_future()
2825 };
2826 loop {
2827 tokio::select! {
2828 _ = &mut stop_listener => {
2829 return;
2830 },
2831 _ = &mut idle_start_listener => {
2832 idle_time = Instant::now() + idle_timeout;
2833 idle_start_listener = self.idle_start_event.listen()
2834 },
2835 _ = &mut idle_end_listener => {
2836 idle_time = until + idle_timeout;
2837 idle_end_listener = self.idle_end_event.listen()
2838 },
2839 _ = tokio::time::sleep_until(until) => {
2840 break;
2841 },
2842 _ = tokio::time::sleep_until(idle_time) => {
2843 if turbo_tasks.is_idle() {
2844 reason = "idle timeout";
2845 break;
2846 }
2847 },
2848 }
2849 }
2850 }
2851
2852 let this = self.clone();
2853 let background_span =
2857 tracing::info_span!(parent: None, "background snapshot");
2858 match this.snapshot_and_persist(background_span.id(), reason, turbo_tasks) {
2859 Err(err) => {
2860 eprintln!("Persisting failed: {err:?}");
2863 Self::log_unrecoverable_persist_error();
2864 return;
2865 }
2866 Ok((snapshot_start, new_data)) => {
2867 last_snapshot = snapshot_start;
2868
2869 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2876 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2877 let idle_ended = tokio::select! {
2878 biased;
2879 _ = &mut idle_end_listener => {
2880 idle_end_listener = self.idle_end_event.listen();
2881 true
2882 },
2883 _ = std::future::ready(()) => false,
2884 };
2885 if idle_ended {
2886 break;
2887 }
2888 let _compact_span = tracing::info_span!(
2892 parent: background_span.id(),
2893 "compact database"
2894 )
2895 .entered();
2896 match self.backing_storage.compact() {
2897 Ok(true) => {}
2898 Ok(false) => break,
2899 Err(err) => {
2900 eprintln!("Compaction failed: {err:?}");
2901 if self.backing_storage.has_unrecoverable_write_error()
2902 {
2903 Self::log_unrecoverable_persist_error();
2904 return;
2905 }
2906 break;
2907 }
2908 }
2909 }
2910
2911 if !new_data {
2912 fresh_idle = false;
2913 continue;
2914 }
2915 let last_snapshot = last_snapshot.duration_since(self.start_time);
2916 self.last_snapshot.store(
2917 last_snapshot.as_millis().try_into().unwrap(),
2918 Ordering::Relaxed,
2919 );
2920
2921 turbo_tasks.schedule_backend_background_job(
2922 TurboTasksBackendJob::FollowUpSnapshot,
2923 );
2924 return;
2925 }
2926 }
2927 }
2928 }
2929 }
2930 })
2931 }
2932
2933 fn try_read_own_task_cell(
2934 &self,
2935 task_id: TaskId,
2936 cell: CellId,
2937 options: ReadCellOptions,
2938 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2939 ) -> Result<TypedCellContent> {
2940 let mut ctx = self.execute_context(turbo_tasks);
2941 let task = ctx.task(task_id, TaskDataCategory::Data);
2942 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2943 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2944 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2945 } else {
2946 Ok(CellContent(None).into_typed(cell.type_id))
2947 }
2948 }
2949
2950 fn read_task_collectibles(
2951 &self,
2952 task_id: TaskId,
2953 collectible_type: TraitTypeId,
2954 reader_id: Option<TaskId>,
2955 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2956 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2957 let mut ctx = self.execute_context(turbo_tasks);
2958 let mut collectibles = AutoMap::default();
2959 {
2960 let mut task = ctx.task(task_id, TaskDataCategory::All);
2961 loop {
2963 let aggregation_number = get_aggregation_number(&task);
2964 if is_root_node(aggregation_number) {
2965 break;
2966 }
2967 drop(task);
2968 AggregationUpdateQueue::run(
2969 AggregationUpdateJob::UpdateAggregationNumber {
2970 task_id,
2971 base_aggregation_number: u32::MAX,
2972 distance: None,
2973 },
2974 &mut ctx,
2975 );
2976 task = ctx.task(task_id, TaskDataCategory::All);
2977 }
2978 for (collectible, count) in task.iter_aggregated_collectibles() {
2979 if *count > 0 && collectible.collectible_type == collectible_type {
2980 *collectibles
2981 .entry(RawVc::TaskCell(
2982 collectible.cell.task,
2983 collectible.cell.cell,
2984 ))
2985 .or_insert(0) += 1;
2986 }
2987 }
2988 for (&collectible, &count) in task.iter_collectibles() {
2989 if collectible.collectible_type == collectible_type {
2990 *collectibles
2991 .entry(RawVc::TaskCell(
2992 collectible.cell.task,
2993 collectible.cell.cell,
2994 ))
2995 .or_insert(0) += count;
2996 }
2997 }
2998 if let Some(reader_id) = reader_id {
2999 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
3000 }
3001 }
3002 if let Some(reader_id) = reader_id {
3003 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
3004 let target = CollectiblesRef {
3005 task: task_id,
3006 collectible_type,
3007 };
3008 if !reader.remove_outdated_collectibles_dependencies(&target) {
3009 let _ = reader.add_collectibles_dependencies(target);
3010 }
3011 }
3012 collectibles
3013 }
3014
3015 fn emit_collectible(
3016 &self,
3017 collectible_type: TraitTypeId,
3018 collectible: RawVc,
3019 task_id: TaskId,
3020 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3021 ) {
3022 self.assert_valid_collectible(task_id, collectible);
3023
3024 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3025 panic!("Collectibles need to be resolved");
3026 };
3027 let cell = CellRef {
3028 task: collectible_task,
3029 cell,
3030 };
3031 operation::UpdateCollectibleOperation::run(
3032 task_id,
3033 CollectibleRef {
3034 collectible_type,
3035 cell,
3036 },
3037 1,
3038 self.execute_context(turbo_tasks),
3039 );
3040 }
3041
3042 fn unemit_collectible(
3043 &self,
3044 collectible_type: TraitTypeId,
3045 collectible: RawVc,
3046 count: u32,
3047 task_id: TaskId,
3048 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3049 ) {
3050 self.assert_valid_collectible(task_id, collectible);
3051
3052 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3053 panic!("Collectibles need to be resolved");
3054 };
3055 let cell = CellRef {
3056 task: collectible_task,
3057 cell,
3058 };
3059 operation::UpdateCollectibleOperation::run(
3060 task_id,
3061 CollectibleRef {
3062 collectible_type,
3063 cell,
3064 },
3065 -(i32::try_from(count).unwrap()),
3066 self.execute_context(turbo_tasks),
3067 );
3068 }
3069
3070 fn update_task_cell(
3071 &self,
3072 task_id: TaskId,
3073 cell: CellId,
3074 is_serializable_cell_content: bool,
3075 content: CellContent,
3076 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3077 content_hash: Option<CellHash>,
3078 verification_mode: VerificationMode,
3079 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3080 ) {
3081 operation::UpdateCellOperation::run(
3082 task_id,
3083 cell,
3084 content,
3085 is_serializable_cell_content,
3086 updated_key_hashes,
3087 content_hash,
3088 verification_mode,
3089 self.execute_context(turbo_tasks),
3090 );
3091 }
3092
3093 fn mark_own_task_as_session_dependent(
3094 &self,
3095 task_id: TaskId,
3096 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3097 ) {
3098 if !self.should_track_dependencies() {
3099 return;
3101 }
3102 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3103 let mut ctx = self.execute_context(turbo_tasks);
3104 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3105 let aggregation_number = get_aggregation_number(&task);
3106 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3107 drop(task);
3108 AggregationUpdateQueue::run(
3111 AggregationUpdateJob::UpdateAggregationNumber {
3112 task_id,
3113 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3114 distance: None,
3115 },
3116 &mut ctx,
3117 );
3118 task = ctx.task(task_id, TaskDataCategory::Meta);
3119 }
3120 if let Some(InProgressState::InProgress(box InProgressStateInner {
3121 session_dependent,
3122 ..
3123 })) = task.get_in_progress_mut()
3124 {
3125 *session_dependent = true;
3126 }
3127 }
3128
3129 fn mark_own_task_as_finished(
3130 &self,
3131 task: TaskId,
3132 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3133 ) {
3134 let mut ctx = self.execute_context(turbo_tasks);
3135 let mut task = ctx.task(task, TaskDataCategory::Data);
3136 if let Some(InProgressState::InProgress(box InProgressStateInner {
3137 marked_as_completed,
3138 ..
3139 })) = task.get_in_progress_mut()
3140 {
3141 *marked_as_completed = true;
3142 }
3147 }
3148
3149 fn connect_task(
3150 &self,
3151 task: TaskId,
3152 parent_task: Option<TaskId>,
3153 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3154 ) {
3155 self.assert_not_persistent_calling_transient(parent_task, task, None);
3156 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3157 }
3158
3159 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3160 let task_id = self.transient_task_id_factory.get();
3161 {
3162 let mut task = self.storage.access_mut(task_id);
3163 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3164 }
3165 #[cfg(feature = "verify_aggregation_graph")]
3166 self.root_tasks.lock().insert(task_id);
3167 task_id
3168 }
3169
3170 fn dispose_root_task(
3171 &self,
3172 task_id: TaskId,
3173 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3174 ) {
3175 #[cfg(feature = "verify_aggregation_graph")]
3176 self.root_tasks.lock().remove(&task_id);
3177
3178 let mut ctx = self.execute_context(turbo_tasks);
3179 let mut task = ctx.task(task_id, TaskDataCategory::All);
3180 let is_dirty = task.is_dirty();
3181 let has_dirty_containers = task.has_dirty_containers();
3182 if is_dirty.is_some() || has_dirty_containers {
3183 if let Some(activeness_state) = task.get_activeness_mut() {
3184 activeness_state.unset_root_type();
3186 activeness_state.set_active_until_clean();
3187 };
3188 } else if let Some(activeness_state) = task.take_activeness() {
3189 activeness_state.all_clean_event.notify(usize::MAX);
3192 }
3193 }
3194
3195 #[cfg(feature = "verify_aggregation_graph")]
3196 fn verify_aggregation_graph(
3197 &self,
3198 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3199 idle: bool,
3200 ) {
3201 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3202 return;
3203 }
3204 use std::{collections::VecDeque, env, io::stdout};
3205
3206 use crate::backend::operation::{get_uppers, is_aggregating_node};
3207
3208 let mut ctx = self.execute_context(turbo_tasks);
3209 let root_tasks = self.root_tasks.lock().clone();
3210
3211 for task_id in root_tasks.into_iter() {
3212 let mut queue = VecDeque::new();
3213 let mut visited = FxHashSet::default();
3214 let mut aggregated_nodes = FxHashSet::default();
3215 let mut collectibles = FxHashMap::default();
3216 let root_task_id = task_id;
3217 visited.insert(task_id);
3218 aggregated_nodes.insert(task_id);
3219 queue.push_back(task_id);
3220 let mut counter = 0;
3221 while let Some(task_id) = queue.pop_front() {
3222 counter += 1;
3223 if counter % 100000 == 0 {
3224 println!(
3225 "queue={}, visited={}, aggregated_nodes={}",
3226 queue.len(),
3227 visited.len(),
3228 aggregated_nodes.len()
3229 );
3230 }
3231 let task = ctx.task(task_id, TaskDataCategory::All);
3232 if idle && !self.is_idle.load(Ordering::Relaxed) {
3233 return;
3234 }
3235
3236 let uppers = get_uppers(&task);
3237 if task_id != root_task_id
3238 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3239 {
3240 panic!(
3241 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3242 {:?})",
3243 task_id,
3244 task.get_task_description(),
3245 uppers
3246 );
3247 }
3248
3249 for (collectible, _) in task.iter_aggregated_collectibles() {
3250 collectibles
3251 .entry(*collectible)
3252 .or_insert_with(|| (false, Vec::new()))
3253 .1
3254 .push(task_id);
3255 }
3256
3257 for (&collectible, &value) in task.iter_collectibles() {
3258 if value > 0 {
3259 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3260 *flag = true
3261 } else {
3262 panic!(
3263 "Task {} has a collectible {:?} that is not in any upper task",
3264 task_id, collectible
3265 );
3266 }
3267 }
3268 }
3269
3270 let is_dirty = task.has_dirty();
3271 let has_dirty_container = task.has_dirty_containers();
3272 let should_be_in_upper = is_dirty || has_dirty_container;
3273
3274 let aggregation_number = get_aggregation_number(&task);
3275 if is_aggregating_node(aggregation_number) {
3276 aggregated_nodes.insert(task_id);
3277 }
3278 for child_id in task.iter_children() {
3285 if visited.insert(child_id) {
3287 queue.push_back(child_id);
3288 }
3289 }
3290 drop(task);
3291
3292 if should_be_in_upper {
3293 for upper_id in uppers {
3294 let upper = ctx.task(upper_id, TaskDataCategory::All);
3295 let in_upper = upper
3296 .get_aggregated_dirty_containers(&task_id)
3297 .is_some_and(|&dirty| dirty > 0);
3298 if !in_upper {
3299 let containers: Vec<_> = upper
3300 .iter_aggregated_dirty_containers()
3301 .map(|(&k, &v)| (k, v))
3302 .collect();
3303 let upper_task_desc = upper.get_task_description();
3304 drop(upper);
3305 panic!(
3306 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3307 ({})\nThese dirty containers are present:\n{:#?}",
3308 task_id,
3309 ctx.task(task_id, TaskDataCategory::Data)
3310 .get_task_description(),
3311 upper_id,
3312 upper_task_desc,
3313 containers,
3314 );
3315 }
3316 }
3317 }
3318 }
3319
3320 for (collectible, (flag, task_ids)) in collectibles {
3321 if !flag {
3322 use std::io::Write;
3323 let mut stdout = stdout().lock();
3324 writeln!(
3325 stdout,
3326 "{:?} that is not emitted in any child task but in these aggregated \
3327 tasks: {:#?}",
3328 collectible,
3329 task_ids
3330 .iter()
3331 .map(|t| format!(
3332 "{t} {}",
3333 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3334 ))
3335 .collect::<Vec<_>>()
3336 )
3337 .unwrap();
3338
3339 let task_id = collectible.cell.task;
3340 let mut queue = {
3341 let task = ctx.task(task_id, TaskDataCategory::All);
3342 get_uppers(&task)
3343 };
3344 let mut visited = FxHashSet::default();
3345 for &upper_id in queue.iter() {
3346 visited.insert(upper_id);
3347 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3348 }
3349 while let Some(task_id) = queue.pop() {
3350 let task = ctx.task(task_id, TaskDataCategory::All);
3351 let desc = task.get_task_description();
3352 let aggregated_collectible = task
3353 .get_aggregated_collectibles(&collectible)
3354 .copied()
3355 .unwrap_or_default();
3356 let uppers = get_uppers(&task);
3357 drop(task);
3358 writeln!(
3359 stdout,
3360 "upper {task_id} {desc} collectible={aggregated_collectible}"
3361 )
3362 .unwrap();
3363 if task_ids.contains(&task_id) {
3364 writeln!(
3365 stdout,
3366 "Task has an upper connection to an aggregated task that doesn't \
3367 reference it. Upper connection is invalid!"
3368 )
3369 .unwrap();
3370 }
3371 for upper_id in uppers {
3372 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3373 if !visited.contains(&upper_id) {
3374 queue.push(upper_id);
3375 }
3376 }
3377 }
3378 panic!("See stdout for more details");
3379 }
3380 }
3381 }
3382 }
3383
3384 fn assert_not_persistent_calling_transient(
3385 &self,
3386 parent_id: Option<TaskId>,
3387 child_id: TaskId,
3388 cell_id: Option<CellId>,
3389 ) {
3390 if let Some(parent_id) = parent_id
3391 && !parent_id.is_transient()
3392 && child_id.is_transient()
3393 {
3394 self.panic_persistent_calling_transient(
3395 self.debug_get_task_description(parent_id),
3396 self.debug_get_cached_task_type(child_id).as_deref(),
3397 cell_id,
3398 );
3399 }
3400 }
3401
3402 fn panic_persistent_calling_transient(
3403 &self,
3404 parent: String,
3405 child: Option<&CachedTaskType>,
3406 cell_id: Option<CellId>,
3407 ) {
3408 let transient_reason = if let Some(child) = child {
3409 Cow::Owned(format!(
3410 " The callee is transient because it depends on:\n{}",
3411 self.debug_trace_transient_task(child, cell_id),
3412 ))
3413 } else {
3414 Cow::Borrowed("")
3415 };
3416 panic!(
3417 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3418 parent,
3419 child.map_or("unknown", |t| t.get_name()),
3420 transient_reason,
3421 );
3422 }
3423
3424 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3425 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3427 let task_info = if let Some(col_task_ty) = collectible
3429 .try_get_task_id()
3430 .map(|t| self.debug_get_task_description(t))
3431 {
3432 Cow::Owned(format!(" (return type of {col_task_ty})"))
3433 } else {
3434 Cow::Borrowed("")
3435 };
3436 panic!("Collectible{task_info} must be a ResolvedVc")
3437 };
3438 if col_task_id.is_transient() && !task_id.is_transient() {
3439 let transient_reason =
3440 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3441 Cow::Owned(format!(
3442 ". The collectible is transient because it depends on:\n{}",
3443 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3444 ))
3445 } else {
3446 Cow::Borrowed("")
3447 };
3448 panic!(
3450 "Collectible is transient, transient collectibles cannot be emitted from \
3451 persistent tasks{transient_reason}",
3452 )
3453 }
3454 }
3455}
3456
3457impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3458 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3459 self.0.startup(turbo_tasks);
3460 }
3461
3462 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3463 self.0.stopping();
3464 }
3465
3466 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3467 self.0.stop(turbo_tasks);
3468 }
3469
3470 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3471 self.0.idle_start(turbo_tasks);
3472 }
3473
3474 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3475 self.0.idle_end();
3476 }
3477
3478 fn get_or_create_persistent_task(
3479 &self,
3480 task_type: CachedTaskType,
3481 parent_task: Option<TaskId>,
3482 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3483 ) -> TaskId {
3484 self.0
3485 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3486 }
3487
3488 fn get_or_create_transient_task(
3489 &self,
3490 task_type: CachedTaskType,
3491 parent_task: Option<TaskId>,
3492 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3493 ) -> TaskId {
3494 self.0
3495 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3496 }
3497
3498 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3499 self.0.invalidate_task(task_id, turbo_tasks);
3500 }
3501
3502 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3503 self.0.invalidate_tasks(tasks, turbo_tasks);
3504 }
3505
3506 fn invalidate_tasks_set(
3507 &self,
3508 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3509 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3510 ) {
3511 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3512 }
3513
3514 fn invalidate_serialization(
3515 &self,
3516 task_id: TaskId,
3517 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3518 ) {
3519 self.0.invalidate_serialization(task_id, turbo_tasks);
3520 }
3521
3522 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3523 self.0.task_execution_canceled(task, turbo_tasks)
3524 }
3525
3526 fn try_start_task_execution(
3527 &self,
3528 task_id: TaskId,
3529 priority: TaskPriority,
3530 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3531 ) -> Option<TaskExecutionSpec<'_>> {
3532 self.0
3533 .try_start_task_execution(task_id, priority, turbo_tasks)
3534 }
3535
3536 fn task_execution_completed(
3537 &self,
3538 task_id: TaskId,
3539 result: Result<RawVc, TurboTasksExecutionError>,
3540 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3541 #[cfg(feature = "verify_determinism")] stateful: bool,
3542 has_invalidator: bool,
3543 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3544 ) -> bool {
3545 self.0.task_execution_completed(
3546 task_id,
3547 result,
3548 cell_counters,
3549 #[cfg(feature = "verify_determinism")]
3550 stateful,
3551 has_invalidator,
3552 turbo_tasks,
3553 )
3554 }
3555
3556 type BackendJob = TurboTasksBackendJob;
3557
3558 fn run_backend_job<'a>(
3559 &'a self,
3560 job: Self::BackendJob,
3561 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3562 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3563 self.0.run_backend_job(job, turbo_tasks)
3564 }
3565
3566 fn try_read_task_output(
3567 &self,
3568 task_id: TaskId,
3569 reader: Option<TaskId>,
3570 options: ReadOutputOptions,
3571 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3572 ) -> Result<Result<RawVc, EventListener>> {
3573 self.0
3574 .try_read_task_output(task_id, reader, options, turbo_tasks)
3575 }
3576
3577 fn try_read_task_cell(
3578 &self,
3579 task_id: TaskId,
3580 cell: CellId,
3581 reader: Option<TaskId>,
3582 options: ReadCellOptions,
3583 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3584 ) -> Result<Result<TypedCellContent, EventListener>> {
3585 self.0
3586 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3587 }
3588
3589 fn try_read_own_task_cell(
3590 &self,
3591 task_id: TaskId,
3592 cell: CellId,
3593 options: ReadCellOptions,
3594 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3595 ) -> Result<TypedCellContent> {
3596 self.0
3597 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3598 }
3599
3600 fn read_task_collectibles(
3601 &self,
3602 task_id: TaskId,
3603 collectible_type: TraitTypeId,
3604 reader: Option<TaskId>,
3605 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3606 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3607 self.0
3608 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3609 }
3610
3611 fn emit_collectible(
3612 &self,
3613 collectible_type: TraitTypeId,
3614 collectible: RawVc,
3615 task_id: TaskId,
3616 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3617 ) {
3618 self.0
3619 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3620 }
3621
3622 fn unemit_collectible(
3623 &self,
3624 collectible_type: TraitTypeId,
3625 collectible: RawVc,
3626 count: u32,
3627 task_id: TaskId,
3628 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3629 ) {
3630 self.0
3631 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3632 }
3633
3634 fn update_task_cell(
3635 &self,
3636 task_id: TaskId,
3637 cell: CellId,
3638 is_serializable_cell_content: bool,
3639 content: CellContent,
3640 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3641 content_hash: Option<CellHash>,
3642 verification_mode: VerificationMode,
3643 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3644 ) {
3645 self.0.update_task_cell(
3646 task_id,
3647 cell,
3648 is_serializable_cell_content,
3649 content,
3650 updated_key_hashes,
3651 content_hash,
3652 verification_mode,
3653 turbo_tasks,
3654 );
3655 }
3656
3657 fn mark_own_task_as_finished(
3658 &self,
3659 task_id: TaskId,
3660 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3661 ) {
3662 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3663 }
3664
3665 fn mark_own_task_as_session_dependent(
3666 &self,
3667 task: TaskId,
3668 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3669 ) {
3670 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3671 }
3672
3673 fn connect_task(
3674 &self,
3675 task: TaskId,
3676 parent_task: Option<TaskId>,
3677 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3678 ) {
3679 self.0.connect_task(task, parent_task, turbo_tasks);
3680 }
3681
3682 fn create_transient_task(
3683 &self,
3684 task_type: TransientTaskType,
3685 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3686 ) -> TaskId {
3687 self.0.create_transient_task(task_type)
3688 }
3689
3690 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3691 self.0.dispose_root_task(task_id, turbo_tasks);
3692 }
3693
3694 fn task_statistics(&self) -> &TaskStatisticsApi {
3695 &self.0.task_statistics
3696 }
3697
3698 fn is_tracking_dependencies(&self) -> bool {
3699 self.0.options.dependency_tracking
3700 }
3701
3702 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3703 self.0.get_task_name(task, turbo_tasks)
3704 }
3705}
3706
3707enum DebugTraceTransientTask {
3708 Cached {
3709 task_name: &'static str,
3710 cell_type_id: Option<ValueTypeId>,
3711 cause_self: Option<Box<DebugTraceTransientTask>>,
3712 cause_args: Vec<DebugTraceTransientTask>,
3713 },
3714 Collapsed {
3716 task_name: &'static str,
3717 cell_type_id: Option<ValueTypeId>,
3718 },
3719 Uncached {
3720 cell_type_id: Option<ValueTypeId>,
3721 },
3722}
3723
3724impl DebugTraceTransientTask {
3725 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3726 let indent = " ".repeat(level);
3727 f.write_str(&indent)?;
3728
3729 fn fmt_cell_type_id(
3730 f: &mut fmt::Formatter<'_>,
3731 cell_type_id: Option<ValueTypeId>,
3732 ) -> fmt::Result {
3733 if let Some(ty) = cell_type_id {
3734 write!(
3735 f,
3736 " (read cell of type {})",
3737 get_value_type(ty).ty.global_name
3738 )
3739 } else {
3740 Ok(())
3741 }
3742 }
3743
3744 match self {
3746 Self::Cached {
3747 task_name,
3748 cell_type_id,
3749 ..
3750 }
3751 | Self::Collapsed {
3752 task_name,
3753 cell_type_id,
3754 ..
3755 } => {
3756 f.write_str(task_name)?;
3757 fmt_cell_type_id(f, *cell_type_id)?;
3758 if matches!(self, Self::Collapsed { .. }) {
3759 f.write_str(" (collapsed)")?;
3760 }
3761 }
3762 Self::Uncached { cell_type_id } => {
3763 f.write_str("unknown transient task")?;
3764 fmt_cell_type_id(f, *cell_type_id)?;
3765 }
3766 }
3767 f.write_char('\n')?;
3768
3769 if let Self::Cached {
3771 cause_self,
3772 cause_args,
3773 ..
3774 } = self
3775 {
3776 if let Some(c) = cause_self {
3777 writeln!(f, "{indent} self:")?;
3778 c.fmt_indented(f, level + 1)?;
3779 }
3780 if !cause_args.is_empty() {
3781 writeln!(f, "{indent} args:")?;
3782 for c in cause_args {
3783 c.fmt_indented(f, level + 1)?;
3784 }
3785 }
3786 }
3787 Ok(())
3788 }
3789}
3790
3791impl fmt::Display for DebugTraceTransientTask {
3792 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3793 self.fmt_indented(f, 0)
3794 }
3795}
3796
3797fn far_future() -> Instant {
3799 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3804}
3805
3806fn encode_task_data(
3818 task: TaskId,
3819 data: &TaskStorage,
3820 category: SpecificTaskDataCategory,
3821 scratch_buffer: &mut TurboBincodeBuffer,
3822) -> Result<TurboBincodeBuffer> {
3823 scratch_buffer.clear();
3824 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3825 data.encode(category, &mut encoder)?;
3826
3827 if cfg!(feature = "verify_serialization") {
3828 TaskStorage::new()
3829 .decode(
3830 category,
3831 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3832 )
3833 .with_context(|| {
3834 format!(
3835 "expected to be able to decode serialized data for '{category:?}' information \
3836 for {task}"
3837 )
3838 })?;
3839 }
3840 Ok(SmallVec::from_slice(scratch_buffer))
3841}