1mod counter_map;
2mod operation;
3mod storage;
4pub mod storage_schema;
5
6use std::{
7 borrow::Cow,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 pin::Pin,
13 sync::{
14 Arc, LazyLock,
15 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
16 },
17 time::SystemTime,
18};
19
20use anyhow::{Context, Result, bail};
21use auto_hash_map::{AutoMap, AutoSet};
22use indexmap::IndexSet;
23use parking_lot::{Condvar, Mutex};
24use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25use smallvec::{SmallVec, smallvec};
26use tokio::time::{Duration, Instant};
27use tracing::{Span, trace_span};
28use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
29use turbo_tasks::{
30 CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
31 ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
32 TaskId, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, ValueTypeId,
33 backend::{
34 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
35 TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError,
36 TurboTasksExecutionError, TurboTasksExecutionErrorMessage, TypedCellContent,
37 VerificationMode,
38 },
39 event::{Event, EventDescription, EventListener},
40 message_queue::{TimingEvent, TraceEvent},
41 registry::get_value_type,
42 scope::scope_and_block,
43 task_statistics::TaskStatisticsApi,
44 trace::TraceRawVcs,
45 util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
46};
47
48pub use self::{
49 operation::AnyOperation,
50 storage::{SpecificTaskDataCategory, TaskDataCategory},
51};
52#[cfg(feature = "trace_task_dirty")]
53use crate::backend::operation::TaskDirtyCause;
54use crate::{
55 backend::{
56 operation::{
57 AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58 CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59 ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60 TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61 is_root_node, make_task_dirty_internal, prepare_new_children,
62 },
63 storage::Storage,
64 storage_schema::{TaskStorage, TaskStorageAccessors},
65 },
66 backing_storage::{BackingStorage, SnapshotItem},
67 data::{
68 ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
69 InProgressState, InProgressStateInner, OutputValue, TransientTask,
70 },
71 error::TaskError,
72 utils::{
73 arc_or_owned::ArcOrOwned,
74 chunked_vec::ChunkedVec,
75 dash_map_drop_contents::drop_contents,
76 dash_map_raw_entry::{RawEntry, raw_entry},
77 ptr_eq_arc::PtrEqArc,
78 shard_amount::compute_shard_amount,
79 sharded::Sharded,
80 swap_retain,
81 },
82};
83
84const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
88
89const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
90
91static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
94 std::env::var("TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS")
95 .ok()
96 .and_then(|v| v.parse::<u64>().ok())
97 .map(Duration::from_millis)
98 .unwrap_or(Duration::from_secs(2))
99});
100
101struct SnapshotRequest {
102 snapshot_requested: bool,
103 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
104}
105
106impl SnapshotRequest {
107 fn new() -> Self {
108 Self {
109 snapshot_requested: false,
110 suspended_operations: FxHashSet::default(),
111 }
112 }
113}
114
115pub enum StorageMode {
116 ReadOnly,
118 ReadWrite,
121 ReadWriteOnShutdown,
124}
125
126pub struct BackendOptions {
127 pub dependency_tracking: bool,
132
133 pub active_tracking: bool,
139
140 pub storage_mode: Option<StorageMode>,
142
143 pub num_workers: Option<usize>,
146
147 pub small_preallocation: bool,
149}
150
151impl Default for BackendOptions {
152 fn default() -> Self {
153 Self {
154 dependency_tracking: true,
155 active_tracking: true,
156 storage_mode: Some(StorageMode::ReadWrite),
157 num_workers: None,
158 small_preallocation: false,
159 }
160 }
161}
162
163pub enum TurboTasksBackendJob {
164 InitialSnapshot,
165 FollowUpSnapshot,
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173 options: BackendOptions,
174
175 start_time: Instant,
176
177 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179
180 persisted_task_cache_log: Option<TaskCacheLog>,
181 task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182
183 storage: Storage,
184
185 local_is_partial: bool,
188
189 in_progress_operations: AtomicUsize,
195
196 snapshot_request: Mutex<SnapshotRequest>,
197 operations_suspended: Condvar,
201 snapshot_completed: Condvar,
204 last_snapshot: AtomicU64,
206
207 stopping: AtomicBool,
208 stopping_event: Event,
209 idle_start_event: Event,
210 idle_end_event: Event,
211 #[cfg(feature = "verify_aggregation_graph")]
212 is_idle: AtomicBool,
213
214 task_statistics: TaskStatisticsApi,
215
216 backing_storage: B,
217
218 #[cfg(feature = "verify_aggregation_graph")]
219 root_tasks: Mutex<FxHashSet<TaskId>>,
220}
221
222impl<B: BackingStorage> TurboTasksBackend<B> {
223 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
224 Self(Arc::new(TurboTasksBackendInner::new(
225 options,
226 backing_storage,
227 )))
228 }
229
230 pub fn backing_storage(&self) -> &B {
231 &self.0.backing_storage
232 }
233}
234
235impl<B: BackingStorage> TurboTasksBackendInner<B> {
236 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
237 let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
238 let need_log = matches!(
239 options.storage_mode,
240 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
241 );
242 if !options.dependency_tracking {
243 options.active_tracking = false;
244 }
245 let small_preallocation = options.small_preallocation;
246 let next_task_id = backing_storage
247 .next_free_task_id()
248 .expect("Failed to get task id");
249 Self {
250 options,
251 start_time: Instant::now(),
252 persisted_task_id_factory: IdFactoryWithReuse::new(
253 next_task_id,
254 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
255 ),
256 transient_task_id_factory: IdFactoryWithReuse::new(
257 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
258 TaskId::MAX,
259 ),
260 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
261 task_cache: FxDashMap::default(),
262 local_is_partial: next_task_id != TaskId::MIN,
263 storage: Storage::new(shard_amount, small_preallocation),
264 in_progress_operations: AtomicUsize::new(0),
265 snapshot_request: Mutex::new(SnapshotRequest::new()),
266 operations_suspended: Condvar::new(),
267 snapshot_completed: Condvar::new(),
268 last_snapshot: AtomicU64::new(0),
269 stopping: AtomicBool::new(false),
270 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
271 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
272 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
273 #[cfg(feature = "verify_aggregation_graph")]
274 is_idle: AtomicBool::new(false),
275 task_statistics: TaskStatisticsApi::default(),
276 backing_storage,
277 #[cfg(feature = "verify_aggregation_graph")]
278 root_tasks: Default::default(),
279 }
280 }
281
282 fn execute_context<'a>(
283 &'a self,
284 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
285 ) -> impl ExecuteContext<'a> {
286 ExecuteContextImpl::new(self, turbo_tasks)
287 }
288
289 fn suspending_requested(&self) -> bool {
290 self.should_persist()
291 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
292 }
293
294 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
295 #[cold]
296 fn operation_suspend_point_cold<B: BackingStorage>(
297 this: &TurboTasksBackendInner<B>,
298 suspend: impl FnOnce() -> AnyOperation,
299 ) {
300 let operation = Arc::new(suspend());
301 let mut snapshot_request = this.snapshot_request.lock();
302 if snapshot_request.snapshot_requested {
303 snapshot_request
304 .suspended_operations
305 .insert(operation.clone().into());
306 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
307 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
308 if value == SNAPSHOT_REQUESTED_BIT {
309 this.operations_suspended.notify_all();
310 }
311 this.snapshot_completed
312 .wait_while(&mut snapshot_request, |snapshot_request| {
313 snapshot_request.snapshot_requested
314 });
315 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
316 snapshot_request
317 .suspended_operations
318 .remove(&operation.into());
319 }
320 }
321
322 if self.suspending_requested() {
323 operation_suspend_point_cold(self, suspend);
324 }
325 }
326
327 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
328 if !self.should_persist() {
329 return OperationGuard { backend: None };
330 }
331 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
332 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
333 let mut snapshot_request = self.snapshot_request.lock();
334 if snapshot_request.snapshot_requested {
335 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
336 if value == SNAPSHOT_REQUESTED_BIT {
337 self.operations_suspended.notify_all();
338 }
339 self.snapshot_completed
340 .wait_while(&mut snapshot_request, |snapshot_request| {
341 snapshot_request.snapshot_requested
342 });
343 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
344 }
345 }
346 OperationGuard {
347 backend: Some(self),
348 }
349 }
350
351 fn should_persist(&self) -> bool {
352 matches!(
353 self.options.storage_mode,
354 Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
355 )
356 }
357
358 fn should_restore(&self) -> bool {
359 self.options.storage_mode.is_some()
360 }
361
362 fn should_track_dependencies(&self) -> bool {
363 self.options.dependency_tracking
364 }
365
366 fn should_track_activeness(&self) -> bool {
367 self.options.active_tracking
368 }
369
370 fn track_cache_hit(&self, task_type: &CachedTaskType) {
371 self.task_statistics
372 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
373 }
374
375 fn track_cache_miss(&self, task_type: &CachedTaskType) {
376 self.task_statistics
377 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
378 }
379
380 fn task_error_to_turbo_tasks_execution_error(
385 &self,
386 error: &TaskError,
387 ctx: &mut impl ExecuteContext<'_>,
388 ) -> TurboTasksExecutionError {
389 match error {
390 TaskError::Panic(panic) => TurboTasksExecutionError::Panic(panic.clone()),
391 TaskError::Error(item) => TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
392 message: item.message.clone(),
393 source: item
394 .source
395 .as_ref()
396 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
397 })),
398 TaskError::LocalTaskContext(local_task_context) => {
399 TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
400 name: local_task_context.name.clone(),
401 source: local_task_context
402 .source
403 .as_ref()
404 .map(|e| self.task_error_to_turbo_tasks_execution_error(e, ctx)),
405 }))
406 }
407 TaskError::TaskChain(chain) => {
408 let task_id = chain.last().unwrap();
409 let error = {
410 let task = ctx.task(*task_id, TaskDataCategory::Meta);
411 if let Some(OutputValue::Error(error)) = task.get_output() {
412 Some(error.clone())
413 } else {
414 None
415 }
416 };
417 let error = error.map_or_else(
418 || {
419 TurboTasksExecutionError::Panic(Arc::new(TurboTasksPanic {
421 message: TurboTasksExecutionErrorMessage::PIISafe(Cow::Borrowed(
422 "Error no longer available",
423 )),
424 location: None,
425 }))
426 },
427 |e| self.task_error_to_turbo_tasks_execution_error(&e, ctx),
428 );
429 let mut current_error = error;
430 for &task_id in chain.iter().rev() {
431 current_error =
432 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
433 task_id,
434 source: Some(current_error),
435 turbo_tasks: ctx.turbo_tasks(),
436 }));
437 }
438 current_error
439 }
440 }
441 }
442}
443
444pub(crate) struct OperationGuard<'a, B: BackingStorage> {
445 backend: Option<&'a TurboTasksBackendInner<B>>,
446}
447
448impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
449 fn drop(&mut self) {
450 if let Some(backend) = self.backend {
451 let fetch_sub = backend
452 .in_progress_operations
453 .fetch_sub(1, Ordering::AcqRel);
454 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
455 backend.operations_suspended.notify_all();
456 }
457 }
458 }
459}
460
461struct TaskExecutionCompletePrepareResult {
463 pub new_children: FxHashSet<TaskId>,
464 pub is_now_immutable: bool,
465 #[cfg(feature = "verify_determinism")]
466 pub no_output_set: bool,
467 pub new_output: Option<OutputValue>,
468 pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
469}
470
471impl<B: BackingStorage> TurboTasksBackendInner<B> {
473 fn connect_child(
474 &self,
475 parent_task: Option<TaskId>,
476 child_task: TaskId,
477 task_type: Option<ArcOrOwned<CachedTaskType>>,
478 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479 ) {
480 operation::ConnectChildOperation::run(
481 parent_task,
482 child_task,
483 task_type,
484 self.execute_context(turbo_tasks),
485 );
486 }
487
488 fn try_read_task_output(
489 self: &Arc<Self>,
490 task_id: TaskId,
491 reader: Option<TaskId>,
492 options: ReadOutputOptions,
493 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
494 ) -> Result<Result<RawVc, EventListener>> {
495 self.assert_not_persistent_calling_transient(reader, task_id, None);
496
497 let mut ctx = self.execute_context(turbo_tasks);
498 let need_reader_task = if self.should_track_dependencies()
499 && !matches!(options.tracking, ReadTracking::Untracked)
500 && reader.is_some_and(|reader_id| reader_id != task_id)
501 && let Some(reader_id) = reader
502 && reader_id != task_id
503 {
504 Some(reader_id)
505 } else {
506 None
507 };
508 let (mut task, mut reader_task) = if let Some(reader_id) = need_reader_task {
509 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
513 (task, Some(reader))
514 } else {
515 (ctx.task(task_id, TaskDataCategory::All), None)
516 };
517
518 fn listen_to_done_event(
519 reader_description: Option<EventDescription>,
520 tracking: ReadTracking,
521 done_event: &Event,
522 ) -> EventListener {
523 done_event.listen_with_note(move || {
524 move || {
525 if let Some(reader_description) = reader_description.as_ref() {
526 format!(
527 "try_read_task_output from {} ({})",
528 reader_description, tracking
529 )
530 } else {
531 format!("try_read_task_output ({})", tracking)
532 }
533 }
534 })
535 }
536
537 fn check_in_progress(
538 task: &impl TaskGuard,
539 reader_description: Option<EventDescription>,
540 tracking: ReadTracking,
541 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
542 {
543 match task.get_in_progress() {
544 Some(InProgressState::Scheduled { done_event, .. }) => Some(Ok(Err(
545 listen_to_done_event(reader_description, tracking, done_event),
546 ))),
547 Some(InProgressState::InProgress(box InProgressStateInner {
548 done_event, ..
549 })) => Some(Ok(Err(listen_to_done_event(
550 reader_description,
551 tracking,
552 done_event,
553 )))),
554 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
555 "{} was canceled",
556 task.get_task_description()
557 ))),
558 None => None,
559 }
560 }
561
562 if matches!(options.consistency, ReadConsistency::Strong) {
563 loop {
565 let aggregation_number = get_aggregation_number(&task);
566 if is_root_node(aggregation_number) {
567 break;
568 }
569 drop(task);
570 drop(reader_task);
571 {
572 let _span = tracing::trace_span!(
573 "make root node for strongly consistent read",
574 %task_id
575 )
576 .entered();
577 AggregationUpdateQueue::run(
578 AggregationUpdateJob::UpdateAggregationNumber {
579 task_id,
580 base_aggregation_number: u32::MAX,
581 distance: None,
582 },
583 &mut ctx,
584 );
585 }
586 (task, reader_task) = if let Some(reader_id) = need_reader_task {
587 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
589 (task, Some(reader))
590 } else {
591 (ctx.task(task_id, TaskDataCategory::All), None)
592 }
593 }
594
595 let is_dirty = task.is_dirty();
596
597 let has_dirty_containers = task.has_dirty_containers();
599 if has_dirty_containers || is_dirty.is_some() {
600 let activeness = task.get_activeness_mut();
601 let mut task_ids_to_schedule: Vec<_> = Vec::new();
602 let activeness = if let Some(activeness) = activeness {
604 activeness.set_active_until_clean();
608 activeness
609 } else {
610 if ctx.should_track_activeness() {
614 task_ids_to_schedule = task.dirty_containers().collect();
616 task_ids_to_schedule.push(task_id);
617 }
618 let activeness =
619 task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
620 activeness.set_active_until_clean();
621 activeness
622 };
623 let listener = activeness.all_clean_event.listen_with_note(move || {
624 let this = self.clone();
625 let tt = turbo_tasks.pin();
626 move || {
627 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
628 let mut ctx = this.execute_context(tt);
629 let mut visited = FxHashSet::default();
630 fn indent(s: &str) -> String {
631 s.split_inclusive('\n')
632 .flat_map(|line: &str| [" ", line].into_iter())
633 .collect::<String>()
634 }
635 fn get_info(
636 ctx: &mut impl ExecuteContext<'_>,
637 task_id: TaskId,
638 parent_and_count: Option<(TaskId, i32)>,
639 visited: &mut FxHashSet<TaskId>,
640 ) -> String {
641 let task = ctx.task(task_id, TaskDataCategory::All);
642 let is_dirty = task.is_dirty();
643 let in_progress =
644 task.get_in_progress()
645 .map_or("not in progress", |p| match p {
646 InProgressState::InProgress(_) => "in progress",
647 InProgressState::Scheduled { .. } => "scheduled",
648 InProgressState::Canceled => "canceled",
649 });
650 let activeness = task.get_activeness().map_or_else(
651 || "not active".to_string(),
652 |activeness| format!("{activeness:?}"),
653 );
654 let aggregation_number = get_aggregation_number(&task);
655 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
656 {
657 let uppers = get_uppers(&task);
658 !uppers.contains(&parent_task_id)
659 } else {
660 false
661 };
662
663 let has_dirty_containers = task.has_dirty_containers();
665
666 let task_description = task.get_task_description();
667 let is_dirty_label = if let Some(parent_priority) = is_dirty {
668 format!(", dirty({parent_priority})")
669 } else {
670 String::new()
671 };
672 let has_dirty_containers_label = if has_dirty_containers {
673 ", dirty containers"
674 } else {
675 ""
676 };
677 let count = if let Some((_, count)) = parent_and_count {
678 format!(" {count}")
679 } else {
680 String::new()
681 };
682 let mut info = format!(
683 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
684 {in_progress}, \
685 {activeness}{is_dirty_label}{has_dirty_containers_label})",
686 );
687 let children: Vec<_> = task.dirty_containers_with_count().collect();
688 drop(task);
689
690 if missing_upper {
691 info.push_str("\n ERROR: missing upper connection");
692 }
693
694 if has_dirty_containers || !children.is_empty() {
695 writeln!(info, "\n dirty tasks:").unwrap();
696
697 for (child_task_id, count) in children {
698 let task_description = ctx
699 .task(child_task_id, TaskDataCategory::Data)
700 .get_task_description();
701 if visited.insert(child_task_id) {
702 let child_info = get_info(
703 ctx,
704 child_task_id,
705 Some((task_id, count)),
706 visited,
707 );
708 info.push_str(&indent(&child_info));
709 if !info.ends_with('\n') {
710 info.push('\n');
711 }
712 } else {
713 writeln!(
714 info,
715 " {child_task_id} {task_description} {count} \
716 (already visited)"
717 )
718 .unwrap();
719 }
720 }
721 }
722 info
723 }
724 let info = get_info(&mut ctx, task_id, None, &mut visited);
725 format!(
726 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
727 )
728 }
729 });
730 drop(reader_task);
731 drop(task);
732 if !task_ids_to_schedule.is_empty() {
733 let mut queue = AggregationUpdateQueue::new();
734 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
735 queue.execute(&mut ctx);
736 }
737
738 return Ok(Err(listener));
739 }
740 }
741
742 let reader_description = reader_task
743 .as_ref()
744 .map(|r| EventDescription::new(|| r.get_task_desc_fn()));
745 if let Some(value) = check_in_progress(&task, reader_description.clone(), options.tracking)
746 {
747 return value;
748 }
749
750 if let Some(output) = task.get_output() {
751 let result = match output {
752 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
753 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
754 OutputValue::Error(error) => Err(error.clone()),
755 };
756 if let Some(mut reader_task) = reader_task.take()
757 && options.tracking.should_track(result.is_err())
758 && (!task.immutable() || cfg!(feature = "verify_immutable"))
759 {
760 #[cfg(feature = "trace_task_output_dependencies")]
761 let _span = tracing::trace_span!(
762 "add output dependency",
763 task = %task_id,
764 dependent_task = ?reader
765 )
766 .entered();
767 let mut queue = LeafDistanceUpdateQueue::new();
768 let reader = reader.unwrap();
769 if task.add_output_dependent(reader) {
770 let leaf_distance = task.get_leaf_distance().copied().unwrap_or_default();
772 let reader_leaf_distance =
773 reader_task.get_leaf_distance().copied().unwrap_or_default();
774 if reader_leaf_distance.distance <= leaf_distance.distance {
775 queue.push(
776 reader,
777 leaf_distance.distance,
778 leaf_distance.max_distance_in_buffer,
779 );
780 }
781 }
782
783 drop(task);
784
785 if !reader_task.remove_outdated_output_dependencies(&task_id) {
791 let _ = reader_task.add_output_dependencies(task_id);
792 }
793 drop(reader_task);
794
795 queue.execute(&mut ctx);
796 } else {
797 drop(task);
798 }
799
800 return result.map_err(|error| {
801 self.task_error_to_turbo_tasks_execution_error(&error, &mut ctx)
802 .with_task_context(task_id, turbo_tasks.pin())
803 .into()
804 });
805 }
806 drop(reader_task);
807
808 let note = EventDescription::new(|| {
809 move || {
810 if let Some(reader) = reader_description.as_ref() {
811 format!("try_read_task_output (recompute) from {reader}",)
812 } else {
813 "try_read_task_output (recompute, untracked)".to_string()
814 }
815 }
816 });
817
818 let (in_progress_state, listener) = InProgressState::new_scheduled_with_listener(
820 TaskExecutionReason::OutputNotAvailable,
821 EventDescription::new(|| task.get_task_desc_fn()),
822 note,
823 );
824
825 let old = task.set_in_progress(in_progress_state);
828 debug_assert!(old.is_none(), "InProgress already exists");
829 ctx.schedule_task(task, TaskPriority::Initial);
830
831 Ok(Err(listener))
832 }
833
834 fn try_read_task_cell(
835 &self,
836 task_id: TaskId,
837 reader: Option<TaskId>,
838 cell: CellId,
839 options: ReadCellOptions,
840 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
841 ) -> Result<Result<TypedCellContent, EventListener>> {
842 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
843
844 fn add_cell_dependency(
845 task_id: TaskId,
846 mut task: impl TaskGuard,
847 reader: Option<TaskId>,
848 reader_task: Option<impl TaskGuard>,
849 cell: CellId,
850 key: Option<u64>,
851 ) {
852 if let Some(mut reader_task) = reader_task
853 && (!task.immutable() || cfg!(feature = "verify_immutable"))
854 {
855 let reader = reader.unwrap();
856 let _ = task.add_cell_dependents((cell, key, reader));
857 drop(task);
858
859 let target = CellRef {
865 task: task_id,
866 cell,
867 };
868 if !reader_task.remove_outdated_cell_dependencies(&(target, key)) {
869 let _ = reader_task.add_cell_dependencies((target, key));
870 }
871 drop(reader_task);
872 }
873 }
874
875 let ReadCellOptions {
876 is_serializable_cell_content,
877 tracking,
878 final_read_hint,
879 } = options;
880
881 let mut ctx = self.execute_context(turbo_tasks);
882 let (mut task, reader_task) = if self.should_track_dependencies()
883 && !matches!(tracking, ReadCellTracking::Untracked)
884 && let Some(reader_id) = reader
885 && reader_id != task_id
886 {
887 let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
891 (task, Some(reader))
892 } else {
893 (ctx.task(task_id, TaskDataCategory::All), None)
894 };
895
896 let content = if final_read_hint {
897 task.remove_cell_data(is_serializable_cell_content, cell)
898 } else {
899 task.get_cell_data(is_serializable_cell_content, cell)
900 };
901 if let Some(content) = content {
902 if tracking.should_track(false) {
903 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
904 }
905 return Ok(Ok(TypedCellContent(
906 cell.type_id,
907 CellContent(Some(content.reference)),
908 )));
909 }
910
911 let in_progress = task.get_in_progress();
912 if matches!(
913 in_progress,
914 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
915 ) {
916 return Ok(Err(self
917 .listen_to_cell(&mut task, task_id, &reader_task, cell)
918 .0));
919 }
920 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
921
922 let max_id = task.get_cell_type_max_index(&cell.type_id).copied();
924 let Some(max_id) = max_id else {
925 let task_desc = task.get_task_description();
926 if tracking.should_track(true) {
927 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
928 }
929 bail!(
930 "Cell {cell:?} no longer exists in task {task_desc} (no cell of this type exists)",
931 );
932 };
933 if cell.index >= max_id {
934 let task_desc = task.get_task_description();
935 if tracking.should_track(true) {
936 add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key());
937 }
938 bail!("Cell {cell:?} no longer exists in task {task_desc} (index out of bounds)");
939 }
940
941 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
946 drop(reader_task);
947 if !new_listener {
948 return Ok(Err(listener));
949 }
950
951 let _span = tracing::trace_span!(
952 "recomputation",
953 cell_type = get_value_type(cell.type_id).ty.global_name,
954 cell_index = cell.index
955 )
956 .entered();
957
958 if is_cancelled {
960 bail!("{} was canceled", task.get_task_description());
961 }
962
963 let _ = task.add_scheduled(
964 TaskExecutionReason::CellNotAvailable,
965 EventDescription::new(|| task.get_task_desc_fn()),
966 );
967 ctx.schedule_task(task, TaskPriority::Initial);
968
969 Ok(Err(listener))
970 }
971
972 fn listen_to_cell(
973 &self,
974 task: &mut impl TaskGuard,
975 task_id: TaskId,
976 reader_task: &Option<impl TaskGuard>,
977 cell: CellId,
978 ) -> (EventListener, bool) {
979 let note = || {
980 let reader_desc = reader_task.as_ref().map(|r| r.get_task_desc_fn());
981 move || {
982 if let Some(reader_desc) = reader_desc.as_ref() {
983 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
984 } else {
985 "try_read_task_cell (in progress, untracked)".to_string()
986 }
987 }
988 };
989 if let Some(in_progress) = task.get_in_progress_cells(&cell) {
990 let listener = in_progress.event.listen_with_note(note);
992 return (listener, false);
993 }
994 let in_progress = InProgressCellState::new(task_id, cell);
995 let listener = in_progress.event.listen_with_note(note);
996 let old = task.insert_in_progress_cells(cell, in_progress);
997 debug_assert!(old.is_none(), "InProgressCell already exists");
998 (listener, true)
999 }
1000
1001 fn snapshot_and_persist(
1002 &self,
1003 parent_span: Option<tracing::Id>,
1004 reason: &str,
1005 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1006 ) -> Option<(Instant, bool)> {
1007 let snapshot_span =
1008 tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
1009 .entered();
1010 let start = Instant::now();
1011 let wall_start = SystemTime::now();
1015 debug_assert!(self.should_persist());
1016
1017 let suspended_operations;
1018 {
1019 let _span = tracing::info_span!("blocking").entered();
1020 let mut snapshot_request = self.snapshot_request.lock();
1021 snapshot_request.snapshot_requested = true;
1022 let active_operations = self
1023 .in_progress_operations
1024 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1025 if active_operations != 0 {
1026 self.operations_suspended
1027 .wait_while(&mut snapshot_request, |_| {
1028 self.in_progress_operations.load(Ordering::Relaxed)
1029 != SNAPSHOT_REQUESTED_BIT
1030 });
1031 }
1032 suspended_operations = snapshot_request
1033 .suspended_operations
1034 .iter()
1035 .map(|op| op.arc().clone())
1036 .collect::<Vec<_>>();
1037 }
1038 self.storage.start_snapshot();
1039 let mut persisted_task_cache_log = self
1040 .persisted_task_cache_log
1041 .as_ref()
1042 .map(|l| l.take(|i| i))
1043 .unwrap_or_default();
1044 let mut snapshot_request = self.snapshot_request.lock();
1045 snapshot_request.snapshot_requested = false;
1046 self.in_progress_operations
1047 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1048 self.snapshot_completed.notify_all();
1049 let snapshot_time = Instant::now();
1050 drop(snapshot_request);
1051
1052 #[cfg(feature = "print_cache_item_size")]
1053 #[derive(Default)]
1054 struct TaskCacheStats {
1055 data: usize,
1056 #[cfg(feature = "print_cache_item_size_with_compressed")]
1057 data_compressed: usize,
1058 data_count: usize,
1059 meta: usize,
1060 #[cfg(feature = "print_cache_item_size_with_compressed")]
1061 meta_compressed: usize,
1062 meta_count: usize,
1063 upper_count: usize,
1064 collectibles_count: usize,
1065 aggregated_collectibles_count: usize,
1066 children_count: usize,
1067 followers_count: usize,
1068 collectibles_dependents_count: usize,
1069 aggregated_dirty_containers_count: usize,
1070 output_size: usize,
1071 }
1072 #[cfg(feature = "print_cache_item_size")]
1075 struct FormatSizes {
1076 size: usize,
1077 #[cfg(feature = "print_cache_item_size_with_compressed")]
1078 compressed_size: usize,
1079 }
1080 #[cfg(feature = "print_cache_item_size")]
1081 impl std::fmt::Display for FormatSizes {
1082 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1083 use turbo_tasks::util::FormatBytes;
1084 #[cfg(feature = "print_cache_item_size_with_compressed")]
1085 {
1086 write!(
1087 f,
1088 "{} ({} compressed)",
1089 FormatBytes(self.size),
1090 FormatBytes(self.compressed_size)
1091 )
1092 }
1093 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1094 {
1095 write!(f, "{}", FormatBytes(self.size))
1096 }
1097 }
1098 }
1099 #[cfg(feature = "print_cache_item_size")]
1100 impl TaskCacheStats {
1101 #[cfg(feature = "print_cache_item_size_with_compressed")]
1102 fn compressed_size(data: &[u8]) -> Result<usize> {
1103 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
1104 data,
1105 &mut Vec::new(),
1106 lzzzz::lz4::ACC_LEVEL_DEFAULT,
1107 )?)
1108 }
1109
1110 fn add_data(&mut self, data: &[u8]) {
1111 self.data += data.len();
1112 #[cfg(feature = "print_cache_item_size_with_compressed")]
1113 {
1114 self.data_compressed += Self::compressed_size(data).unwrap_or(0);
1115 }
1116 self.data_count += 1;
1117 }
1118
1119 fn add_meta(&mut self, data: &[u8]) {
1120 self.meta += data.len();
1121 #[cfg(feature = "print_cache_item_size_with_compressed")]
1122 {
1123 self.meta_compressed += Self::compressed_size(data).unwrap_or(0);
1124 }
1125 self.meta_count += 1;
1126 }
1127
1128 fn add_counts(&mut self, storage: &TaskStorage) {
1129 let counts = storage.meta_counts();
1130 self.upper_count += counts.upper;
1131 self.collectibles_count += counts.collectibles;
1132 self.aggregated_collectibles_count += counts.aggregated_collectibles;
1133 self.children_count += counts.children;
1134 self.followers_count += counts.followers;
1135 self.collectibles_dependents_count += counts.collectibles_dependents;
1136 self.aggregated_dirty_containers_count += counts.aggregated_dirty_containers;
1137 if let Some(output) = storage.get_output() {
1138 use turbo_bincode::turbo_bincode_encode;
1139
1140 self.output_size += turbo_bincode_encode(&output)
1141 .map(|data| data.len())
1142 .unwrap_or(0);
1143 }
1144 }
1145
1146 fn task_name(storage: &TaskStorage) -> String {
1148 storage
1149 .get_persistent_task_type()
1150 .map(|t| t.to_string())
1151 .unwrap_or_else(|| "<unknown>".to_string())
1152 }
1153
1154 fn sort_key(&self) -> usize {
1157 #[cfg(feature = "print_cache_item_size_with_compressed")]
1158 {
1159 self.data_compressed + self.meta_compressed
1160 }
1161 #[cfg(not(feature = "print_cache_item_size_with_compressed"))]
1162 {
1163 self.data + self.meta
1164 }
1165 }
1166
1167 fn format_total(&self) -> FormatSizes {
1168 FormatSizes {
1169 size: self.data + self.meta,
1170 #[cfg(feature = "print_cache_item_size_with_compressed")]
1171 compressed_size: self.data_compressed + self.meta_compressed,
1172 }
1173 }
1174
1175 fn format_data(&self) -> FormatSizes {
1176 FormatSizes {
1177 size: self.data,
1178 #[cfg(feature = "print_cache_item_size_with_compressed")]
1179 compressed_size: self.data_compressed,
1180 }
1181 }
1182
1183 fn format_avg_data(&self) -> FormatSizes {
1184 FormatSizes {
1185 size: self.data.checked_div(self.data_count).unwrap_or(0),
1186 #[cfg(feature = "print_cache_item_size_with_compressed")]
1187 compressed_size: self
1188 .data_compressed
1189 .checked_div(self.data_count)
1190 .unwrap_or(0),
1191 }
1192 }
1193
1194 fn format_meta(&self) -> FormatSizes {
1195 FormatSizes {
1196 size: self.meta,
1197 #[cfg(feature = "print_cache_item_size_with_compressed")]
1198 compressed_size: self.meta_compressed,
1199 }
1200 }
1201
1202 fn format_avg_meta(&self) -> FormatSizes {
1203 FormatSizes {
1204 size: self.meta.checked_div(self.meta_count).unwrap_or(0),
1205 #[cfg(feature = "print_cache_item_size_with_compressed")]
1206 compressed_size: self
1207 .meta_compressed
1208 .checked_div(self.meta_count)
1209 .unwrap_or(0),
1210 }
1211 }
1212 }
1213 #[cfg(feature = "print_cache_item_size")]
1214 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1215 Mutex::new(FxHashMap::default());
1216
1217 let process = |task_id: TaskId, inner: &TaskStorage, buffer: &mut TurboBincodeBuffer| {
1224 let encode_category = |task_id: TaskId,
1225 data: &TaskStorage,
1226 category: SpecificTaskDataCategory,
1227 buffer: &mut TurboBincodeBuffer|
1228 -> Option<TurboBincodeBuffer> {
1229 match encode_task_data(task_id, data, category, buffer) {
1230 Ok(encoded) => {
1231 #[cfg(feature = "print_cache_item_size")]
1232 {
1233 let mut stats = task_cache_stats.lock();
1234 let entry = stats.entry(TaskCacheStats::task_name(inner)).or_default();
1235 match category {
1236 SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
1237 SpecificTaskDataCategory::Data => entry.add_data(&encoded),
1238 }
1239 }
1240 Some(encoded)
1241 }
1242 Err(err) => {
1243 eprintln!(
1244 "Serializing task {} failed ({:?}): {:?}",
1245 self.debug_get_task_description(task_id),
1246 category,
1247 err
1248 );
1249 None
1250 }
1251 }
1252 };
1253 if task_id.is_transient() {
1254 unreachable!("transient task_ids should never be enqueued to be persisted");
1255 }
1256
1257 let encode_meta = inner.flags.meta_modified();
1258 let encode_data = inner.flags.data_modified();
1259
1260 #[cfg(feature = "print_cache_item_size")]
1261 if encode_data || encode_meta {
1262 task_cache_stats
1263 .lock()
1264 .entry(TaskCacheStats::task_name(inner))
1265 .or_default()
1266 .add_counts(inner);
1267 }
1268
1269 let meta = if encode_meta {
1270 encode_category(task_id, inner, SpecificTaskDataCategory::Meta, buffer)
1271 } else {
1272 None
1273 };
1274
1275 let data = if encode_data {
1276 encode_category(task_id, inner, SpecificTaskDataCategory::Data, buffer)
1277 } else {
1278 None
1279 };
1280
1281 SnapshotItem {
1282 task_id,
1283 meta,
1284 data,
1285 }
1286 };
1287
1288 let task_snapshots = self.storage.take_snapshot(&process);
1290
1291 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1292
1293 drop(snapshot_span);
1294 let snapshot_duration = start.elapsed();
1295 let task_count = task_snapshots.len();
1296
1297 if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1298 return Some((snapshot_time, false));
1299 }
1300
1301 let persist_start = Instant::now();
1302 let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
1303 {
1304 if let Err(err) = self.backing_storage.save_snapshot(
1305 suspended_operations,
1306 persisted_task_cache_log,
1307 task_snapshots,
1308 ) {
1309 eprintln!("Persisting failed: {err:?}");
1310 return None;
1311 }
1312 #[cfg(feature = "print_cache_item_size")]
1313 {
1314 let mut task_cache_stats = task_cache_stats
1315 .into_inner()
1316 .into_iter()
1317 .collect::<Vec<_>>();
1318 if !task_cache_stats.is_empty() {
1319 use turbo_tasks::util::FormatBytes;
1320
1321 use crate::utils::markdown_table::print_markdown_table;
1322
1323 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1324 (stats_b.sort_key(), key_b).cmp(&(stats_a.sort_key(), key_a))
1325 });
1326
1327 println!(
1328 "Task cache stats: {}",
1329 FormatSizes {
1330 size: task_cache_stats
1331 .iter()
1332 .map(|(_, s)| s.data + s.meta)
1333 .sum::<usize>(),
1334 #[cfg(feature = "print_cache_item_size_with_compressed")]
1335 compressed_size: task_cache_stats
1336 .iter()
1337 .map(|(_, s)| s.data_compressed + s.meta_compressed)
1338 .sum::<usize>()
1339 },
1340 );
1341
1342 print_markdown_table(
1343 [
1344 "Task",
1345 " Total Size",
1346 " Data Size",
1347 " Data Count x Avg",
1348 " Data Count x Avg",
1349 " Meta Size",
1350 " Meta Count x Avg",
1351 " Meta Count x Avg",
1352 " Uppers",
1353 " Coll",
1354 " Agg Coll",
1355 " Children",
1356 " Followers",
1357 " Coll Deps",
1358 " Agg Dirty",
1359 " Output Size",
1360 ],
1361 task_cache_stats.iter(),
1362 |(task_desc, stats)| {
1363 [
1364 task_desc.to_string(),
1365 format!(" {}", stats.format_total()),
1366 format!(" {}", stats.format_data()),
1367 format!(" {} x", stats.data_count),
1368 format!("{}", stats.format_avg_data()),
1369 format!(" {}", stats.format_meta()),
1370 format!(" {} x", stats.meta_count),
1371 format!("{}", stats.format_avg_meta()),
1372 format!(" {}", stats.upper_count),
1373 format!(" {}", stats.collectibles_count),
1374 format!(" {}", stats.aggregated_collectibles_count),
1375 format!(" {}", stats.children_count),
1376 format!(" {}", stats.followers_count),
1377 format!(" {}", stats.collectibles_dependents_count),
1378 format!(" {}", stats.aggregated_dirty_containers_count),
1379 format!(" {}", FormatBytes(stats.output_size)),
1380 ]
1381 },
1382 );
1383 }
1384 }
1385 }
1386
1387 let elapsed = start.elapsed();
1388 let persist_duration = persist_start.elapsed();
1389 if elapsed > Duration::from_secs(10) {
1391 turbo_tasks.send_compilation_event(Arc::new(TimingEvent::new(
1392 "Finished writing to filesystem cache".to_string(),
1393 elapsed,
1394 )));
1395 }
1396
1397 let wall_start_ms = wall_start
1398 .duration_since(SystemTime::UNIX_EPOCH)
1399 .unwrap_or_default()
1400 .as_secs_f64()
1402 * 1000.0;
1403 let wall_end_ms = wall_start_ms + elapsed.as_secs_f64() * 1000.0;
1404 turbo_tasks.send_compilation_event(Arc::new(TraceEvent::new(
1405 "turbopack-persistence",
1406 wall_start_ms,
1407 wall_end_ms,
1408 vec![
1409 ("reason", serde_json::Value::from(reason)),
1410 (
1411 "snapshot_duration_ms",
1412 serde_json::Value::from(snapshot_duration.as_secs_f64() * 1000.0),
1413 ),
1414 (
1415 "persist_duration_ms",
1416 serde_json::Value::from(persist_duration.as_secs_f64() * 1000.0),
1417 ),
1418 ("task_count", serde_json::Value::from(task_count)),
1419 ],
1420 )));
1421
1422 Some((snapshot_time, true))
1423 }
1424
1425 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1426 if self.should_restore() {
1427 let uncompleted_operations = self
1431 .backing_storage
1432 .uncompleted_operations()
1433 .expect("Failed to get uncompleted operations");
1434 if !uncompleted_operations.is_empty() {
1435 let mut ctx = self.execute_context(turbo_tasks);
1436 for op in uncompleted_operations {
1437 op.execute(&mut ctx);
1438 }
1439 }
1440 }
1441
1442 if matches!(self.options.storage_mode, Some(StorageMode::ReadWrite)) {
1445 let _span = trace_span!("persisting background job").entered();
1447 let _span = tracing::info_span!("thread").entered();
1448 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1449 }
1450 }
1451
1452 fn stopping(&self) {
1453 self.stopping.store(true, Ordering::Release);
1454 self.stopping_event.notify(usize::MAX);
1455 }
1456
1457 #[allow(unused_variables)]
1458 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1459 #[cfg(feature = "verify_aggregation_graph")]
1460 {
1461 self.is_idle.store(false, Ordering::Release);
1462 self.verify_aggregation_graph(turbo_tasks, false);
1463 }
1464 if self.should_persist() {
1465 self.snapshot_and_persist(Span::current().into(), "stop", turbo_tasks);
1466 }
1467 drop_contents(&self.task_cache);
1468 self.storage.drop_contents();
1469 if let Err(err) = self.backing_storage.shutdown() {
1470 println!("Shutting down failed: {err}");
1471 }
1472 }
1473
1474 #[allow(unused_variables)]
1475 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1476 self.idle_start_event.notify(usize::MAX);
1477
1478 #[cfg(feature = "verify_aggregation_graph")]
1479 {
1480 use tokio::select;
1481
1482 self.is_idle.store(true, Ordering::Release);
1483 let this = self.clone();
1484 let turbo_tasks = turbo_tasks.pin();
1485 tokio::task::spawn(async move {
1486 select! {
1487 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1488 }
1490 _ = this.idle_end_event.listen() => {
1491 return;
1492 }
1493 }
1494 if !this.is_idle.load(Ordering::Relaxed) {
1495 return;
1496 }
1497 this.verify_aggregation_graph(&*turbo_tasks, true);
1498 });
1499 }
1500 }
1501
1502 fn idle_end(&self) {
1503 #[cfg(feature = "verify_aggregation_graph")]
1504 self.is_idle.store(false, Ordering::Release);
1505 self.idle_end_event.notify(usize::MAX);
1506 }
1507
1508 fn get_or_create_persistent_task(
1509 &self,
1510 task_type: CachedTaskType,
1511 parent_task: Option<TaskId>,
1512 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1513 ) -> TaskId {
1514 let is_root = task_type.native_fn.is_root;
1515
1516 if let Some(task_id) = self.task_cache.get(&task_type) {
1518 let task_id = *task_id;
1519 self.track_cache_hit(&task_type);
1520 self.connect_child(
1521 parent_task,
1522 task_id,
1523 Some(ArcOrOwned::Owned(task_type)),
1524 turbo_tasks,
1525 );
1526 return task_id;
1527 }
1528
1529 let mut ctx = self.execute_context(turbo_tasks);
1531
1532 let mut is_new = false;
1533 let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
1534 self.track_cache_hit(&task_type);
1537 let task_type = match raw_entry(&self.task_cache, &task_type) {
1538 RawEntry::Occupied(_) => ArcOrOwned::Owned(task_type),
1539 RawEntry::Vacant(e) => {
1540 let task_type = Arc::new(task_type);
1541 e.insert(task_type.clone(), task_id);
1542 ArcOrOwned::Arc(task_type)
1543 }
1544 };
1545 (task_id, task_type)
1546 } else {
1547 let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
1550 RawEntry::Occupied(e) => {
1551 let task_id = *e.get();
1554 drop(e);
1555 self.track_cache_hit(&task_type);
1556 (task_id, ArcOrOwned::Owned(task_type))
1557 }
1558 RawEntry::Vacant(e) => {
1559 let task_type = Arc::new(task_type);
1561 let task_id = self.persisted_task_id_factory.get();
1562 e.insert(task_type.clone(), task_id);
1563 self.track_cache_miss(&task_type);
1565 is_new = true;
1566 if let Some(log) = &self.persisted_task_cache_log {
1567 log.lock(task_id).push((task_type.clone(), task_id));
1568 }
1569 (task_id, ArcOrOwned::Arc(task_type))
1570 }
1571 };
1572 (task_id, task_type)
1573 };
1574 if is_new && is_root {
1575 AggregationUpdateQueue::run(
1576 AggregationUpdateJob::UpdateAggregationNumber {
1577 task_id,
1578 base_aggregation_number: u32::MAX,
1579 distance: None,
1580 },
1581 &mut ctx,
1582 );
1583 }
1584 operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
1586
1587 task_id
1588 }
1589
1590 fn get_or_create_transient_task(
1591 &self,
1592 task_type: CachedTaskType,
1593 parent_task: Option<TaskId>,
1594 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1595 ) -> TaskId {
1596 let is_root = task_type.native_fn.is_root;
1597
1598 if let Some(parent_task) = parent_task
1599 && !parent_task.is_transient()
1600 {
1601 self.panic_persistent_calling_transient(
1602 self.debug_get_task_description(parent_task),
1603 Some(&task_type),
1604 None,
1605 );
1606 }
1607 if let Some(task_id) = self.task_cache.get(&task_type) {
1609 let task_id = *task_id;
1610 self.track_cache_hit(&task_type);
1611 self.connect_child(
1612 parent_task,
1613 task_id,
1614 Some(ArcOrOwned::Owned(task_type)),
1615 turbo_tasks,
1616 );
1617 return task_id;
1618 }
1619 match raw_entry(&self.task_cache, &task_type) {
1621 RawEntry::Occupied(e) => {
1622 let task_id = *e.get();
1623 drop(e);
1624 self.track_cache_hit(&task_type);
1625 self.connect_child(
1626 parent_task,
1627 task_id,
1628 Some(ArcOrOwned::Owned(task_type)),
1629 turbo_tasks,
1630 );
1631 task_id
1632 }
1633 RawEntry::Vacant(e) => {
1634 let task_type = Arc::new(task_type);
1635 let task_id = self.transient_task_id_factory.get();
1636 e.insert(task_type.clone(), task_id);
1637 self.track_cache_miss(&task_type);
1638
1639 if is_root {
1640 let mut ctx = self.execute_context(turbo_tasks);
1641 AggregationUpdateQueue::run(
1642 AggregationUpdateJob::UpdateAggregationNumber {
1643 task_id,
1644 base_aggregation_number: u32::MAX,
1645 distance: None,
1646 },
1647 &mut ctx,
1648 );
1649 }
1650
1651 self.connect_child(
1652 parent_task,
1653 task_id,
1654 Some(ArcOrOwned::Arc(task_type)),
1655 turbo_tasks,
1656 );
1657
1658 task_id
1659 }
1660 }
1661 }
1662
1663 fn debug_trace_transient_task(
1666 &self,
1667 task_type: &CachedTaskType,
1668 cell_id: Option<CellId>,
1669 ) -> DebugTraceTransientTask {
1670 fn inner_id(
1673 backend: &TurboTasksBackendInner<impl BackingStorage>,
1674 task_id: TaskId,
1675 cell_type_id: Option<ValueTypeId>,
1676 visited_set: &mut FxHashSet<TaskId>,
1677 ) -> DebugTraceTransientTask {
1678 if let Some(task_type) = backend.debug_get_cached_task_type(task_id) {
1679 if visited_set.contains(&task_id) {
1680 let task_name = task_type.get_name();
1681 DebugTraceTransientTask::Collapsed {
1682 task_name,
1683 cell_type_id,
1684 }
1685 } else {
1686 inner_cached(backend, &task_type, cell_type_id, visited_set)
1687 }
1688 } else {
1689 DebugTraceTransientTask::Uncached { cell_type_id }
1690 }
1691 }
1692 fn inner_cached(
1693 backend: &TurboTasksBackendInner<impl BackingStorage>,
1694 task_type: &CachedTaskType,
1695 cell_type_id: Option<ValueTypeId>,
1696 visited_set: &mut FxHashSet<TaskId>,
1697 ) -> DebugTraceTransientTask {
1698 let task_name = task_type.get_name();
1699
1700 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1701 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1702 return None;
1706 };
1707 if task_id.is_transient() {
1708 Some(Box::new(inner_id(
1709 backend,
1710 task_id,
1711 cause_self_raw_vc.try_get_type_id(),
1712 visited_set,
1713 )))
1714 } else {
1715 None
1716 }
1717 });
1718 let cause_args = task_type
1719 .arg
1720 .get_raw_vcs()
1721 .into_iter()
1722 .filter_map(|raw_vc| {
1723 let Some(task_id) = raw_vc.try_get_task_id() else {
1724 return None;
1726 };
1727 if !task_id.is_transient() {
1728 return None;
1729 }
1730 Some((task_id, raw_vc.try_get_type_id()))
1731 })
1732 .collect::<IndexSet<_>>() .into_iter()
1734 .map(|(task_id, cell_type_id)| {
1735 inner_id(backend, task_id, cell_type_id, visited_set)
1736 })
1737 .collect();
1738
1739 DebugTraceTransientTask::Cached {
1740 task_name,
1741 cell_type_id,
1742 cause_self,
1743 cause_args,
1744 }
1745 }
1746 inner_cached(
1747 self,
1748 task_type,
1749 cell_id.map(|c| c.type_id),
1750 &mut FxHashSet::default(),
1751 )
1752 }
1753
1754 fn invalidate_task(
1755 &self,
1756 task_id: TaskId,
1757 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1758 ) {
1759 if !self.should_track_dependencies() {
1760 panic!("Dependency tracking is disabled so invalidation is not allowed");
1761 }
1762 operation::InvalidateOperation::run(
1763 smallvec![task_id],
1764 #[cfg(feature = "trace_task_dirty")]
1765 TaskDirtyCause::Invalidator,
1766 self.execute_context(turbo_tasks),
1767 );
1768 }
1769
1770 fn invalidate_tasks(
1771 &self,
1772 tasks: &[TaskId],
1773 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1774 ) {
1775 if !self.should_track_dependencies() {
1776 panic!("Dependency tracking is disabled so invalidation is not allowed");
1777 }
1778 operation::InvalidateOperation::run(
1779 tasks.iter().copied().collect(),
1780 #[cfg(feature = "trace_task_dirty")]
1781 TaskDirtyCause::Unknown,
1782 self.execute_context(turbo_tasks),
1783 );
1784 }
1785
1786 fn invalidate_tasks_set(
1787 &self,
1788 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1789 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1790 ) {
1791 if !self.should_track_dependencies() {
1792 panic!("Dependency tracking is disabled so invalidation is not allowed");
1793 }
1794 operation::InvalidateOperation::run(
1795 tasks.iter().copied().collect(),
1796 #[cfg(feature = "trace_task_dirty")]
1797 TaskDirtyCause::Unknown,
1798 self.execute_context(turbo_tasks),
1799 );
1800 }
1801
1802 fn invalidate_serialization(
1803 &self,
1804 task_id: TaskId,
1805 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1806 ) {
1807 if task_id.is_transient() {
1808 return;
1809 }
1810 let mut ctx = self.execute_context(turbo_tasks);
1811 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1812 task.invalidate_serialization();
1813 }
1814
1815 fn debug_get_task_description(&self, task_id: TaskId) -> String {
1816 let task = self.storage.access_mut(task_id);
1817 if let Some(value) = task.get_persistent_task_type() {
1818 format!("{task_id:?} {}", value)
1819 } else if let Some(value) = task.get_transient_task_type() {
1820 format!("{task_id:?} {}", value)
1821 } else {
1822 format!("{task_id:?} unknown")
1823 }
1824 }
1825
1826 fn get_task_name(
1827 &self,
1828 task_id: TaskId,
1829 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1830 ) -> String {
1831 let mut ctx = self.execute_context(turbo_tasks);
1832 let task = ctx.task(task_id, TaskDataCategory::Data);
1833 if let Some(value) = task.get_persistent_task_type() {
1834 value.to_string()
1835 } else if let Some(value) = task.get_transient_task_type() {
1836 value.to_string()
1837 } else {
1838 "unknown".to_string()
1839 }
1840 }
1841
1842 fn debug_get_cached_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
1843 let task = self.storage.access_mut(task_id);
1844 task.get_persistent_task_type().cloned()
1845 }
1846
1847 fn task_execution_canceled(
1848 &self,
1849 task_id: TaskId,
1850 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1851 ) {
1852 let mut ctx = self.execute_context(turbo_tasks);
1853 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1854 if let Some(in_progress) = task.take_in_progress() {
1855 match in_progress {
1856 InProgressState::Scheduled {
1857 done_event,
1858 reason: _,
1859 } => done_event.notify(usize::MAX),
1860 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1861 done_event.notify(usize::MAX)
1862 }
1863 InProgressState::Canceled => {}
1864 }
1865 }
1866 let old = task.set_in_progress(InProgressState::Canceled);
1867 debug_assert!(old.is_none(), "InProgress already exists");
1868 }
1869
1870 fn try_start_task_execution(
1871 &self,
1872 task_id: TaskId,
1873 priority: TaskPriority,
1874 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1875 ) -> Option<TaskExecutionSpec<'_>> {
1876 let execution_reason;
1877 let task_type;
1878 {
1879 let mut ctx = self.execute_context(turbo_tasks);
1880 let mut task = ctx.task(task_id, TaskDataCategory::All);
1881 task_type = task.get_task_type().to_owned();
1882 let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
1883 if let Some(tasks) = task.prefetch() {
1884 drop(task);
1885 ctx.prepare_tasks(tasks);
1886 task = ctx.task(task_id, TaskDataCategory::All);
1887 }
1888 let in_progress = task.take_in_progress()?;
1889 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1890 let old = task.set_in_progress(in_progress);
1891 debug_assert!(old.is_none(), "InProgress already exists");
1892 return None;
1893 };
1894 execution_reason = reason;
1895 let old = task.set_in_progress(InProgressState::InProgress(Box::new(
1896 InProgressStateInner {
1897 stale: false,
1898 once_task,
1899 done_event,
1900 session_dependent: false,
1901 marked_as_completed: false,
1902 new_children: Default::default(),
1903 },
1904 )));
1905 debug_assert!(old.is_none(), "InProgress already exists");
1906
1907 enum Collectible {
1909 Current(CollectibleRef, i32),
1910 Outdated(CollectibleRef),
1911 }
1912 let collectibles = task
1913 .iter_collectibles()
1914 .map(|(&collectible, &value)| Collectible::Current(collectible, value))
1915 .chain(
1916 task.iter_outdated_collectibles()
1917 .map(|(collectible, _count)| Collectible::Outdated(*collectible)),
1918 )
1919 .collect::<Vec<_>>();
1920 for collectible in collectibles {
1921 match collectible {
1922 Collectible::Current(collectible, value) => {
1923 let _ = task.insert_outdated_collectible(collectible, value);
1924 }
1925 Collectible::Outdated(collectible) => {
1926 if task
1927 .collectibles()
1928 .is_none_or(|m| m.get(&collectible).is_none())
1929 {
1930 task.remove_outdated_collectibles(&collectible);
1931 }
1932 }
1933 }
1934 }
1935
1936 if self.should_track_dependencies() {
1937 let cell_dependencies = task.iter_cell_dependencies().collect();
1939 task.set_outdated_cell_dependencies(cell_dependencies);
1940
1941 let outdated_output_dependencies = task.iter_output_dependencies().collect();
1942 task.set_outdated_output_dependencies(outdated_output_dependencies);
1943 }
1944 }
1945
1946 let (span, future) = match task_type {
1947 TaskType::Cached(task_type) => {
1948 let CachedTaskType {
1949 native_fn,
1950 this,
1951 arg,
1952 } = &*task_type;
1953 (
1954 native_fn.span(task_id.persistence(), execution_reason, priority),
1955 native_fn.execute(*this, &**arg),
1956 )
1957 }
1958 TaskType::Transient(task_type) => {
1959 let span = tracing::trace_span!("turbo_tasks::root_task");
1960 let future = match &*task_type {
1961 TransientTask::Root(f) => f(),
1962 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1963 };
1964 (span, future)
1965 }
1966 };
1967 Some(TaskExecutionSpec { future, span })
1968 }
1969
1970 fn task_execution_completed(
1971 &self,
1972 task_id: TaskId,
1973 result: Result<RawVc, TurboTasksExecutionError>,
1974 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1975 #[cfg(feature = "verify_determinism")] stateful: bool,
1976 has_invalidator: bool,
1977 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1978 ) -> bool {
1979 #[cfg(not(feature = "trace_task_details"))]
1994 let span = tracing::trace_span!(
1995 "task execution completed",
1996 new_children = tracing::field::Empty
1997 )
1998 .entered();
1999 #[cfg(feature = "trace_task_details")]
2000 let span = tracing::trace_span!(
2001 "task execution completed",
2002 task_id = display(task_id),
2003 result = match result.as_ref() {
2004 Ok(value) => display(either::Either::Left(value)),
2005 Err(err) => display(either::Either::Right(err)),
2006 },
2007 new_children = tracing::field::Empty,
2008 immutable = tracing::field::Empty,
2009 new_output = tracing::field::Empty,
2010 output_dependents = tracing::field::Empty,
2011 stale = tracing::field::Empty,
2012 )
2013 .entered();
2014
2015 let is_error = result.is_err();
2016
2017 let mut ctx = self.execute_context(turbo_tasks);
2018
2019 let Some(TaskExecutionCompletePrepareResult {
2020 new_children,
2021 is_now_immutable,
2022 #[cfg(feature = "verify_determinism")]
2023 no_output_set,
2024 new_output,
2025 output_dependent_tasks,
2026 }) = self.task_execution_completed_prepare(
2027 &mut ctx,
2028 #[cfg(feature = "trace_task_details")]
2029 &span,
2030 task_id,
2031 result,
2032 cell_counters,
2033 #[cfg(feature = "verify_determinism")]
2034 stateful,
2035 has_invalidator,
2036 )
2037 else {
2038 #[cfg(feature = "trace_task_details")]
2040 span.record("stale", "prepare");
2041 return true;
2042 };
2043
2044 #[cfg(feature = "trace_task_details")]
2045 span.record("new_output", new_output.is_some());
2046 #[cfg(feature = "trace_task_details")]
2047 span.record("output_dependents", output_dependent_tasks.len());
2048
2049 if !output_dependent_tasks.is_empty() {
2054 self.task_execution_completed_invalidate_output_dependent(
2055 &mut ctx,
2056 task_id,
2057 output_dependent_tasks,
2058 );
2059 }
2060
2061 let has_new_children = !new_children.is_empty();
2062 span.record("new_children", new_children.len());
2063
2064 if has_new_children {
2065 self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
2066 }
2067
2068 if has_new_children
2069 && self.task_execution_completed_connect(&mut ctx, task_id, new_children)
2070 {
2071 #[cfg(feature = "trace_task_details")]
2073 span.record("stale", "connect");
2074 return true;
2075 }
2076
2077 let (stale, in_progress_cells) = self.task_execution_completed_finish(
2078 &mut ctx,
2079 task_id,
2080 #[cfg(feature = "verify_determinism")]
2081 no_output_set,
2082 new_output,
2083 is_now_immutable,
2084 );
2085 if stale {
2086 #[cfg(feature = "trace_task_details")]
2088 span.record("stale", "finish");
2089 return true;
2090 }
2091
2092 let removed_data =
2093 self.task_execution_completed_cleanup(&mut ctx, task_id, cell_counters, is_error);
2094
2095 drop(removed_data);
2097 drop(in_progress_cells);
2098
2099 false
2100 }
2101
2102 fn task_execution_completed_prepare(
2103 &self,
2104 ctx: &mut impl ExecuteContext<'_>,
2105 #[cfg(feature = "trace_task_details")] span: &Span,
2106 task_id: TaskId,
2107 result: Result<RawVc, TurboTasksExecutionError>,
2108 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2109 #[cfg(feature = "verify_determinism")] stateful: bool,
2110 has_invalidator: bool,
2111 ) -> Option<TaskExecutionCompletePrepareResult> {
2112 let mut task = ctx.task(task_id, TaskDataCategory::All);
2113 let Some(in_progress) = task.get_in_progress_mut() else {
2114 panic!("Task execution completed, but task is not in progress: {task:#?}");
2115 };
2116 if matches!(in_progress, InProgressState::Canceled) {
2117 return Some(TaskExecutionCompletePrepareResult {
2118 new_children: Default::default(),
2119 is_now_immutable: false,
2120 #[cfg(feature = "verify_determinism")]
2121 no_output_set: false,
2122 new_output: None,
2123 output_dependent_tasks: Default::default(),
2124 });
2125 }
2126 let &mut InProgressState::InProgress(box InProgressStateInner {
2127 stale,
2128 ref mut new_children,
2129 session_dependent,
2130 once_task: is_once_task,
2131 ..
2132 }) = in_progress
2133 else {
2134 panic!("Task execution completed, but task is not in progress: {task:#?}");
2135 };
2136
2137 #[cfg(not(feature = "no_fast_stale"))]
2139 if stale && !is_once_task {
2140 let Some(InProgressState::InProgress(box InProgressStateInner {
2141 done_event,
2142 mut new_children,
2143 ..
2144 })) = task.take_in_progress()
2145 else {
2146 unreachable!();
2147 };
2148 let old = task.set_in_progress(InProgressState::Scheduled {
2149 done_event,
2150 reason: TaskExecutionReason::Stale,
2151 });
2152 debug_assert!(old.is_none(), "InProgress already exists");
2153 for task in task.iter_children() {
2156 new_children.remove(&task);
2157 }
2158 drop(task);
2159
2160 AggregationUpdateQueue::run(
2163 AggregationUpdateJob::DecreaseActiveCounts {
2164 task_ids: new_children.into_iter().collect(),
2165 },
2166 ctx,
2167 );
2168 return None;
2169 }
2170
2171 let mut new_children = take(new_children);
2173
2174 #[cfg(feature = "verify_determinism")]
2176 if stateful {
2177 task.set_stateful(true);
2178 }
2179
2180 if has_invalidator {
2182 task.set_invalidator(true);
2183 }
2184
2185 let old_counters: FxHashMap<_, _> = task
2187 .iter_cell_type_max_index()
2188 .map(|(&k, &v)| (k, v))
2189 .collect();
2190 let mut counters_to_remove = old_counters.clone();
2191
2192 for (&cell_type, &max_index) in cell_counters.iter() {
2193 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2194 if old_max_index != max_index {
2195 task.insert_cell_type_max_index(cell_type, max_index);
2196 }
2197 } else {
2198 task.insert_cell_type_max_index(cell_type, max_index);
2199 }
2200 }
2201 for (cell_type, _) in counters_to_remove {
2202 task.remove_cell_type_max_index(&cell_type);
2203 }
2204
2205 let mut queue = AggregationUpdateQueue::new();
2206
2207 let mut old_edges = Vec::new();
2208
2209 let has_children = !new_children.is_empty();
2210 let is_immutable = task.immutable();
2211 let task_dependencies_for_immutable =
2212 if !is_immutable
2214 && !session_dependent
2216 && !task.invalidator()
2218 && task.is_collectibles_dependencies_empty()
2220 {
2221 Some(
2222 task.iter_output_dependencies()
2224 .chain(task.iter_cell_dependencies().map(|(target, _key)| target.task))
2225 .collect::<FxHashSet<_>>(),
2226 )
2227 } else {
2228 None
2229 };
2230
2231 if has_children {
2232 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
2234
2235 old_edges.extend(
2237 task.iter_children()
2238 .filter(|task| !new_children.remove(task))
2239 .map(OutdatedEdge::Child),
2240 );
2241 } else {
2242 old_edges.extend(task.iter_children().map(OutdatedEdge::Child));
2243 }
2244
2245 old_edges.extend(
2246 task.iter_outdated_collectibles()
2247 .map(|(&collectible, &count)| OutdatedEdge::Collectible(collectible, count)),
2248 );
2249
2250 if self.should_track_dependencies() {
2251 old_edges.extend(
2258 task.iter_outdated_cell_dependencies()
2259 .map(|(target, key)| OutdatedEdge::CellDependency(target, key)),
2260 );
2261 old_edges.extend(
2262 task.iter_outdated_output_dependencies()
2263 .map(OutdatedEdge::OutputDependency),
2264 );
2265 }
2266
2267 let current_output = task.get_output();
2269 #[cfg(feature = "verify_determinism")]
2270 let no_output_set = current_output.is_none();
2271 let new_output = match result {
2272 Ok(RawVc::TaskOutput(output_task_id)) => {
2273 if let Some(OutputValue::Output(current_task_id)) = current_output
2274 && *current_task_id == output_task_id
2275 {
2276 None
2277 } else {
2278 Some(OutputValue::Output(output_task_id))
2279 }
2280 }
2281 Ok(RawVc::TaskCell(output_task_id, cell)) => {
2282 if let Some(OutputValue::Cell(CellRef {
2283 task: current_task_id,
2284 cell: current_cell,
2285 })) = current_output
2286 && *current_task_id == output_task_id
2287 && *current_cell == cell
2288 {
2289 None
2290 } else {
2291 Some(OutputValue::Cell(CellRef {
2292 task: output_task_id,
2293 cell,
2294 }))
2295 }
2296 }
2297 Ok(RawVc::LocalOutput(..)) => {
2298 panic!("Non-local tasks must not return a local Vc");
2299 }
2300 Err(err) => {
2301 if let Some(OutputValue::Error(old_error)) = current_output
2302 && **old_error == err
2303 {
2304 None
2305 } else {
2306 Some(OutputValue::Error(Arc::new((&err).into())))
2307 }
2308 }
2309 };
2310 let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2311 if new_output.is_some() && ctx.should_track_dependencies() {
2313 output_dependent_tasks = task.iter_output_dependent().collect();
2314 }
2315
2316 drop(task);
2317
2318 let mut is_now_immutable = false;
2320 if let Some(dependencies) = task_dependencies_for_immutable
2321 && dependencies
2322 .iter()
2323 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).immutable())
2324 {
2325 is_now_immutable = true;
2326 }
2327 #[cfg(feature = "trace_task_details")]
2328 span.record("immutable", is_immutable || is_now_immutable);
2329
2330 if !queue.is_empty() || !old_edges.is_empty() {
2331 #[cfg(feature = "trace_task_completion")]
2332 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
2333 CleanupOldEdgesOperation::run(task_id, old_edges, queue, ctx);
2337 }
2338
2339 Some(TaskExecutionCompletePrepareResult {
2340 new_children,
2341 is_now_immutable,
2342 #[cfg(feature = "verify_determinism")]
2343 no_output_set,
2344 new_output,
2345 output_dependent_tasks,
2346 })
2347 }
2348
2349 fn task_execution_completed_invalidate_output_dependent(
2350 &self,
2351 ctx: &mut impl ExecuteContext<'_>,
2352 task_id: TaskId,
2353 output_dependent_tasks: SmallVec<[TaskId; 4]>,
2354 ) {
2355 debug_assert!(!output_dependent_tasks.is_empty());
2356
2357 if output_dependent_tasks.len() > 1 {
2358 ctx.prepare_tasks(
2359 output_dependent_tasks
2360 .iter()
2361 .map(|&id| (id, TaskDataCategory::All)),
2362 );
2363 }
2364
2365 fn process_output_dependents(
2366 ctx: &mut impl ExecuteContext<'_>,
2367 task_id: TaskId,
2368 dependent_task_id: TaskId,
2369 queue: &mut AggregationUpdateQueue,
2370 ) {
2371 #[cfg(feature = "trace_task_output_dependencies")]
2372 let span = tracing::trace_span!(
2373 "invalidate output dependency",
2374 task = %task_id,
2375 dependent_task = %dependent_task_id,
2376 result = tracing::field::Empty,
2377 )
2378 .entered();
2379 let mut make_stale = true;
2380 let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2381 let transient_task_type = dependent.get_transient_task_type();
2382 if transient_task_type.is_some_and(|tt| matches!(&**tt, TransientTask::Once(_))) {
2383 #[cfg(feature = "trace_task_output_dependencies")]
2385 span.record("result", "once task");
2386 return;
2387 }
2388 if dependent.outdated_output_dependencies_contains(&task_id) {
2389 #[cfg(feature = "trace_task_output_dependencies")]
2390 span.record("result", "outdated dependency");
2391 make_stale = false;
2396 } else if !dependent.output_dependencies_contains(&task_id) {
2397 #[cfg(feature = "trace_task_output_dependencies")]
2400 span.record("result", "no backward dependency");
2401 return;
2402 }
2403 make_task_dirty_internal(
2404 dependent,
2405 dependent_task_id,
2406 make_stale,
2407 #[cfg(feature = "trace_task_dirty")]
2408 TaskDirtyCause::OutputChange { task_id },
2409 queue,
2410 ctx,
2411 );
2412 #[cfg(feature = "trace_task_output_dependencies")]
2413 span.record("result", "marked dirty");
2414 }
2415
2416 if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2417 let chunk_size = good_chunk_size(output_dependent_tasks.len());
2418 let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
2419 let _ = scope_and_block(chunks.len(), |scope| {
2420 for chunk in chunks {
2421 let child_ctx = ctx.child_context();
2422 scope.spawn(move || {
2423 let mut ctx = child_ctx.create();
2424 let mut queue = AggregationUpdateQueue::new();
2425 for dependent_task_id in chunk {
2426 process_output_dependents(
2427 &mut ctx,
2428 task_id,
2429 dependent_task_id,
2430 &mut queue,
2431 )
2432 }
2433 queue.execute(&mut ctx);
2434 });
2435 }
2436 });
2437 } else {
2438 let mut queue = AggregationUpdateQueue::new();
2439 for dependent_task_id in output_dependent_tasks {
2440 process_output_dependents(ctx, task_id, dependent_task_id, &mut queue);
2441 }
2442 queue.execute(ctx);
2443 }
2444 }
2445
2446 fn task_execution_completed_unfinished_children_dirty(
2447 &self,
2448 ctx: &mut impl ExecuteContext<'_>,
2449 new_children: &FxHashSet<TaskId>,
2450 ) {
2451 debug_assert!(!new_children.is_empty());
2452
2453 let mut queue = AggregationUpdateQueue::new();
2454 ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
2455 if !child_task.has_output() {
2456 let child_id = child_task.id();
2457 make_task_dirty_internal(
2458 child_task,
2459 child_id,
2460 false,
2461 #[cfg(feature = "trace_task_dirty")]
2462 TaskDirtyCause::InitialDirty,
2463 &mut queue,
2464 ctx,
2465 );
2466 }
2467 });
2468
2469 queue.execute(ctx);
2470 }
2471
2472 fn task_execution_completed_connect(
2473 &self,
2474 ctx: &mut impl ExecuteContext<'_>,
2475 task_id: TaskId,
2476 new_children: FxHashSet<TaskId>,
2477 ) -> bool {
2478 debug_assert!(!new_children.is_empty());
2479
2480 let mut task = ctx.task(task_id, TaskDataCategory::All);
2481 let Some(in_progress) = task.get_in_progress() else {
2482 panic!("Task execution completed, but task is not in progress: {task:#?}");
2483 };
2484 if matches!(in_progress, InProgressState::Canceled) {
2485 return false;
2487 }
2488 let InProgressState::InProgress(box InProgressStateInner {
2489 #[cfg(not(feature = "no_fast_stale"))]
2490 stale,
2491 once_task: is_once_task,
2492 ..
2493 }) = in_progress
2494 else {
2495 panic!("Task execution completed, but task is not in progress: {task:#?}");
2496 };
2497
2498 #[cfg(not(feature = "no_fast_stale"))]
2500 if *stale && !is_once_task {
2501 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
2502 task.take_in_progress()
2503 else {
2504 unreachable!();
2505 };
2506 let old = task.set_in_progress(InProgressState::Scheduled {
2507 done_event,
2508 reason: TaskExecutionReason::Stale,
2509 });
2510 debug_assert!(old.is_none(), "InProgress already exists");
2511 drop(task);
2512
2513 AggregationUpdateQueue::run(
2516 AggregationUpdateJob::DecreaseActiveCounts {
2517 task_ids: new_children.into_iter().collect(),
2518 },
2519 ctx,
2520 );
2521 return true;
2522 }
2523
2524 let has_active_count = ctx.should_track_activeness()
2525 && task
2526 .get_activeness()
2527 .is_some_and(|activeness| activeness.active_counter > 0);
2528 connect_children(
2529 ctx,
2530 task_id,
2531 task,
2532 new_children,
2533 has_active_count,
2534 ctx.should_track_activeness(),
2535 );
2536
2537 false
2538 }
2539
2540 fn task_execution_completed_finish(
2541 &self,
2542 ctx: &mut impl ExecuteContext<'_>,
2543 task_id: TaskId,
2544 #[cfg(feature = "verify_determinism")] no_output_set: bool,
2545 new_output: Option<OutputValue>,
2546 is_now_immutable: bool,
2547 ) -> (
2548 bool,
2549 Option<
2550 auto_hash_map::AutoMap<CellId, InProgressCellState, BuildHasherDefault<FxHasher>, 1>,
2551 >,
2552 ) {
2553 let mut task = ctx.task(task_id, TaskDataCategory::All);
2554 let Some(in_progress) = task.take_in_progress() else {
2555 panic!("Task execution completed, but task is not in progress: {task:#?}");
2556 };
2557 if matches!(in_progress, InProgressState::Canceled) {
2558 return (false, None);
2560 }
2561 let InProgressState::InProgress(box InProgressStateInner {
2562 done_event,
2563 once_task: is_once_task,
2564 stale,
2565 session_dependent,
2566 marked_as_completed: _,
2567 new_children,
2568 }) = in_progress
2569 else {
2570 panic!("Task execution completed, but task is not in progress: {task:#?}");
2571 };
2572 debug_assert!(new_children.is_empty());
2573
2574 if stale && !is_once_task {
2576 let old = task.set_in_progress(InProgressState::Scheduled {
2577 done_event,
2578 reason: TaskExecutionReason::Stale,
2579 });
2580 debug_assert!(old.is_none(), "InProgress already exists");
2581 return (true, None);
2582 }
2583
2584 let mut old_content = None;
2586 if let Some(value) = new_output {
2587 old_content = task.set_output(value);
2588 }
2589
2590 if is_now_immutable {
2593 task.set_immutable(true);
2594 }
2595
2596 let in_progress_cells = task.take_in_progress_cells();
2598 if let Some(ref cells) = in_progress_cells {
2599 for state in cells.values() {
2600 state.event.notify(usize::MAX);
2601 }
2602 }
2603
2604 let old_dirtyness = task.get_dirty().cloned();
2606 let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2607 None => (false, false),
2608 Some(Dirtyness::Dirty(_)) => (true, false),
2609 Some(Dirtyness::SessionDependent) => {
2610 let clean_in_current_session = task.current_session_clean();
2611 (true, clean_in_current_session)
2612 }
2613 };
2614
2615 let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2617 (Some(Dirtyness::SessionDependent), true, true)
2618 } else {
2619 (None, false, false)
2620 };
2621
2622 let dirty_changed = old_dirtyness != new_dirtyness;
2624 if dirty_changed {
2625 if let Some(value) = new_dirtyness {
2626 task.set_dirty(value);
2627 } else if old_dirtyness.is_some() {
2628 task.take_dirty();
2629 }
2630 }
2631 if old_current_session_self_clean != new_current_session_self_clean {
2632 if new_current_session_self_clean {
2633 task.set_current_session_clean(true);
2634 } else if old_current_session_self_clean {
2635 task.set_current_session_clean(false);
2636 }
2637 }
2638
2639 let data_update = if old_self_dirty != new_self_dirty
2641 || old_current_session_self_clean != new_current_session_self_clean
2642 {
2643 let dirty_container_count = task
2644 .get_aggregated_dirty_container_count()
2645 .cloned()
2646 .unwrap_or_default();
2647 let current_session_clean_container_count = task
2648 .get_aggregated_current_session_clean_container_count()
2649 .copied()
2650 .unwrap_or_default();
2651 let result = ComputeDirtyAndCleanUpdate {
2652 old_dirty_container_count: dirty_container_count,
2653 new_dirty_container_count: dirty_container_count,
2654 old_current_session_clean_container_count: current_session_clean_container_count,
2655 new_current_session_clean_container_count: current_session_clean_container_count,
2656 old_self_dirty,
2657 new_self_dirty,
2658 old_current_session_self_clean,
2659 new_current_session_self_clean,
2660 }
2661 .compute();
2662 if result.dirty_count_update - result.current_session_clean_update < 0 {
2663 if let Some(activeness_state) = task.get_activeness_mut() {
2665 activeness_state.all_clean_event.notify(usize::MAX);
2666 activeness_state.unset_active_until_clean();
2667 if activeness_state.is_empty() {
2668 task.take_activeness();
2669 }
2670 }
2671 }
2672 result
2673 .aggregated_update(task_id)
2674 .and_then(|aggregated_update| {
2675 AggregationUpdateJob::data_update(&mut task, aggregated_update)
2676 })
2677 } else {
2678 None
2679 };
2680
2681 #[cfg(feature = "verify_determinism")]
2682 let reschedule =
2683 (dirty_changed || no_output_set) && !task_id.is_transient() && !is_once_task;
2684 #[cfg(not(feature = "verify_determinism"))]
2685 let reschedule = false;
2686 if reschedule {
2687 let old = task.set_in_progress(InProgressState::Scheduled {
2688 done_event,
2689 reason: TaskExecutionReason::Stale,
2690 });
2691 debug_assert!(old.is_none(), "InProgress already exists");
2692 drop(task);
2693 } else {
2694 drop(task);
2695
2696 done_event.notify(usize::MAX);
2698 }
2699
2700 drop(old_content);
2701
2702 if let Some(data_update) = data_update {
2703 AggregationUpdateQueue::run(data_update, ctx);
2704 }
2705
2706 (reschedule, in_progress_cells)
2708 }
2709
2710 fn task_execution_completed_cleanup(
2711 &self,
2712 ctx: &mut impl ExecuteContext<'_>,
2713 task_id: TaskId,
2714 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2715 is_error: bool,
2716 ) -> Vec<SharedReference> {
2717 let mut task = ctx.task(task_id, TaskDataCategory::All);
2718 let mut removed_cell_data = Vec::new();
2719 if !is_error {
2723 let to_remove_persistent: Vec<_> = task
2730 .iter_persistent_cell_data()
2731 .filter_map(|(cell, _)| {
2732 cell_counters
2733 .get(&cell.type_id)
2734 .is_none_or(|start_index| cell.index >= *start_index)
2735 .then_some(*cell)
2736 })
2737 .collect();
2738
2739 let to_remove_transient: Vec<_> = task
2741 .iter_transient_cell_data()
2742 .filter_map(|(cell, _)| {
2743 cell_counters
2744 .get(&cell.type_id)
2745 .is_none_or(|start_index| cell.index >= *start_index)
2746 .then_some(*cell)
2747 })
2748 .collect();
2749 removed_cell_data.reserve_exact(to_remove_persistent.len() + to_remove_transient.len());
2750 for cell in to_remove_persistent {
2751 if let Some(data) = task.remove_persistent_cell_data(&cell) {
2752 removed_cell_data.push(data.into_untyped());
2753 }
2754 }
2755 for cell in to_remove_transient {
2756 if let Some(data) = task.remove_transient_cell_data(&cell) {
2757 removed_cell_data.push(data);
2758 }
2759 }
2760 }
2761
2762 task.cleanup_after_execution();
2766
2767 drop(task);
2768
2769 removed_cell_data
2771 }
2772
2773 fn run_backend_job<'a>(
2774 self: &'a Arc<Self>,
2775 job: TurboTasksBackendJob,
2776 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2777 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2778 Box::pin(async move {
2779 match job {
2780 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2781 debug_assert!(self.should_persist());
2782
2783 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2784 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2785 let mut idle_start_listener = self.idle_start_event.listen();
2786 let mut idle_end_listener = self.idle_end_event.listen();
2787 let mut fresh_idle = true;
2788 loop {
2789 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2790 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2791 let idle_timeout = *IDLE_TIMEOUT;
2792 let (time, mut reason) =
2793 if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2794 (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout")
2795 } else {
2796 (SNAPSHOT_INTERVAL, "regular snapshot interval")
2797 };
2798
2799 let until = last_snapshot + time;
2800 if until > Instant::now() {
2801 let mut stop_listener = self.stopping_event.listen();
2802 if self.stopping.load(Ordering::Acquire) {
2803 return;
2804 }
2805 let mut idle_time = if turbo_tasks.is_idle() && fresh_idle {
2806 Instant::now() + idle_timeout
2807 } else {
2808 far_future()
2809 };
2810 loop {
2811 tokio::select! {
2812 _ = &mut stop_listener => {
2813 return;
2814 },
2815 _ = &mut idle_start_listener => {
2816 fresh_idle = true;
2817 idle_time = Instant::now() + idle_timeout;
2818 idle_start_listener = self.idle_start_event.listen()
2819 },
2820 _ = &mut idle_end_listener => {
2821 idle_time = until + idle_timeout;
2822 idle_end_listener = self.idle_end_event.listen()
2823 },
2824 _ = tokio::time::sleep_until(until) => {
2825 break;
2826 },
2827 _ = tokio::time::sleep_until(idle_time) => {
2828 if turbo_tasks.is_idle() {
2829 reason = "idle timeout";
2830 break;
2831 }
2832 },
2833 }
2834 }
2835 }
2836
2837 let this = self.clone();
2838 let background_span =
2842 tracing::info_span!(parent: None, "background snapshot");
2843 let snapshot =
2844 this.snapshot_and_persist(background_span.id(), reason, turbo_tasks);
2845 if let Some((snapshot_start, new_data)) = snapshot {
2846 last_snapshot = snapshot_start;
2847
2848 const MAX_IDLE_COMPACTION_PASSES: usize = 10;
2855 for _ in 0..MAX_IDLE_COMPACTION_PASSES {
2856 let idle_ended = tokio::select! {
2857 biased;
2858 _ = &mut idle_end_listener => {
2859 idle_end_listener = self.idle_end_event.listen();
2860 true
2861 },
2862 _ = std::future::ready(()) => false,
2863 };
2864 if idle_ended {
2865 break;
2866 }
2867 let _compact_span = tracing::info_span!(
2871 parent: background_span.id(),
2872 "compact database"
2873 )
2874 .entered();
2875 match self.backing_storage.compact() {
2876 Ok(true) => {}
2877 Ok(false) => break,
2878 Err(err) => {
2879 eprintln!("Compaction failed: {err:?}");
2880 break;
2881 }
2882 }
2883 }
2884
2885 if !new_data {
2886 fresh_idle = false;
2887 continue;
2888 }
2889 let last_snapshot = last_snapshot.duration_since(self.start_time);
2890 self.last_snapshot.store(
2891 last_snapshot.as_millis().try_into().unwrap(),
2892 Ordering::Relaxed,
2893 );
2894
2895 turbo_tasks.schedule_backend_background_job(
2896 TurboTasksBackendJob::FollowUpSnapshot,
2897 );
2898 return;
2899 }
2900 }
2901 }
2902 }
2903 })
2904 }
2905
2906 fn try_read_own_task_cell(
2907 &self,
2908 task_id: TaskId,
2909 cell: CellId,
2910 options: ReadCellOptions,
2911 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2912 ) -> Result<TypedCellContent> {
2913 let mut ctx = self.execute_context(turbo_tasks);
2914 let task = ctx.task(task_id, TaskDataCategory::Data);
2915 if let Some(content) = task.get_cell_data(options.is_serializable_cell_content, cell) {
2916 debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
2917 Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
2918 } else {
2919 Ok(CellContent(None).into_typed(cell.type_id))
2920 }
2921 }
2922
2923 fn read_task_collectibles(
2924 &self,
2925 task_id: TaskId,
2926 collectible_type: TraitTypeId,
2927 reader_id: Option<TaskId>,
2928 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2929 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2930 let mut ctx = self.execute_context(turbo_tasks);
2931 let mut collectibles = AutoMap::default();
2932 {
2933 let mut task = ctx.task(task_id, TaskDataCategory::All);
2934 loop {
2936 let aggregation_number = get_aggregation_number(&task);
2937 if is_root_node(aggregation_number) {
2938 break;
2939 }
2940 drop(task);
2941 AggregationUpdateQueue::run(
2942 AggregationUpdateJob::UpdateAggregationNumber {
2943 task_id,
2944 base_aggregation_number: u32::MAX,
2945 distance: None,
2946 },
2947 &mut ctx,
2948 );
2949 task = ctx.task(task_id, TaskDataCategory::All);
2950 }
2951 for (collectible, count) in task.iter_aggregated_collectibles() {
2952 if *count > 0 && collectible.collectible_type == collectible_type {
2953 *collectibles
2954 .entry(RawVc::TaskCell(
2955 collectible.cell.task,
2956 collectible.cell.cell,
2957 ))
2958 .or_insert(0) += 1;
2959 }
2960 }
2961 for (&collectible, &count) in task.iter_collectibles() {
2962 if collectible.collectible_type == collectible_type {
2963 *collectibles
2964 .entry(RawVc::TaskCell(
2965 collectible.cell.task,
2966 collectible.cell.cell,
2967 ))
2968 .or_insert(0) += count;
2969 }
2970 }
2971 if let Some(reader_id) = reader_id {
2972 let _ = task.add_collectibles_dependents((collectible_type, reader_id));
2973 }
2974 }
2975 if let Some(reader_id) = reader_id {
2976 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2977 let target = CollectiblesRef {
2978 task: task_id,
2979 collectible_type,
2980 };
2981 if !reader.remove_outdated_collectibles_dependencies(&target) {
2982 let _ = reader.add_collectibles_dependencies(target);
2983 }
2984 }
2985 collectibles
2986 }
2987
2988 fn emit_collectible(
2989 &self,
2990 collectible_type: TraitTypeId,
2991 collectible: RawVc,
2992 task_id: TaskId,
2993 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2994 ) {
2995 self.assert_valid_collectible(task_id, collectible);
2996
2997 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2998 panic!("Collectibles need to be resolved");
2999 };
3000 let cell = CellRef {
3001 task: collectible_task,
3002 cell,
3003 };
3004 operation::UpdateCollectibleOperation::run(
3005 task_id,
3006 CollectibleRef {
3007 collectible_type,
3008 cell,
3009 },
3010 1,
3011 self.execute_context(turbo_tasks),
3012 );
3013 }
3014
3015 fn unemit_collectible(
3016 &self,
3017 collectible_type: TraitTypeId,
3018 collectible: RawVc,
3019 count: u32,
3020 task_id: TaskId,
3021 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3022 ) {
3023 self.assert_valid_collectible(task_id, collectible);
3024
3025 let RawVc::TaskCell(collectible_task, cell) = collectible else {
3026 panic!("Collectibles need to be resolved");
3027 };
3028 let cell = CellRef {
3029 task: collectible_task,
3030 cell,
3031 };
3032 operation::UpdateCollectibleOperation::run(
3033 task_id,
3034 CollectibleRef {
3035 collectible_type,
3036 cell,
3037 },
3038 -(i32::try_from(count).unwrap()),
3039 self.execute_context(turbo_tasks),
3040 );
3041 }
3042
3043 fn update_task_cell(
3044 &self,
3045 task_id: TaskId,
3046 cell: CellId,
3047 is_serializable_cell_content: bool,
3048 content: CellContent,
3049 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3050 content_hash: Option<[u8; 16]>,
3051 verification_mode: VerificationMode,
3052 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3053 ) {
3054 operation::UpdateCellOperation::run(
3055 task_id,
3056 cell,
3057 content,
3058 is_serializable_cell_content,
3059 updated_key_hashes,
3060 content_hash,
3061 verification_mode,
3062 self.execute_context(turbo_tasks),
3063 );
3064 }
3065
3066 fn mark_own_task_as_session_dependent(
3067 &self,
3068 task_id: TaskId,
3069 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3070 ) {
3071 if !self.should_track_dependencies() {
3072 return;
3074 }
3075 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3076 let mut ctx = self.execute_context(turbo_tasks);
3077 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3078 let aggregation_number = get_aggregation_number(&task);
3079 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3080 drop(task);
3081 AggregationUpdateQueue::run(
3084 AggregationUpdateJob::UpdateAggregationNumber {
3085 task_id,
3086 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3087 distance: None,
3088 },
3089 &mut ctx,
3090 );
3091 task = ctx.task(task_id, TaskDataCategory::Meta);
3092 }
3093 if let Some(InProgressState::InProgress(box InProgressStateInner {
3094 session_dependent,
3095 ..
3096 })) = task.get_in_progress_mut()
3097 {
3098 *session_dependent = true;
3099 }
3100 }
3101
3102 fn mark_own_task_as_finished(
3103 &self,
3104 task: TaskId,
3105 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3106 ) {
3107 let mut ctx = self.execute_context(turbo_tasks);
3108 let mut task = ctx.task(task, TaskDataCategory::Data);
3109 if let Some(InProgressState::InProgress(box InProgressStateInner {
3110 marked_as_completed,
3111 ..
3112 })) = task.get_in_progress_mut()
3113 {
3114 *marked_as_completed = true;
3115 }
3120 }
3121
3122 fn connect_task(
3123 &self,
3124 task: TaskId,
3125 parent_task: Option<TaskId>,
3126 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3127 ) {
3128 self.assert_not_persistent_calling_transient(parent_task, task, None);
3129 ConnectChildOperation::run(parent_task, task, None, self.execute_context(turbo_tasks));
3130 }
3131
3132 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
3133 let task_id = self.transient_task_id_factory.get();
3134 {
3135 let mut task = self.storage.access_mut(task_id);
3136 task.init_transient_task(task_id, task_type, self.should_track_activeness());
3137 }
3138 #[cfg(feature = "verify_aggregation_graph")]
3139 self.root_tasks.lock().insert(task_id);
3140 task_id
3141 }
3142
3143 fn dispose_root_task(
3144 &self,
3145 task_id: TaskId,
3146 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3147 ) {
3148 #[cfg(feature = "verify_aggregation_graph")]
3149 self.root_tasks.lock().remove(&task_id);
3150
3151 let mut ctx = self.execute_context(turbo_tasks);
3152 let mut task = ctx.task(task_id, TaskDataCategory::All);
3153 let is_dirty = task.is_dirty();
3154 let has_dirty_containers = task.has_dirty_containers();
3155 if is_dirty.is_some() || has_dirty_containers {
3156 if let Some(activeness_state) = task.get_activeness_mut() {
3157 activeness_state.unset_root_type();
3159 activeness_state.set_active_until_clean();
3160 };
3161 } else if let Some(activeness_state) = task.take_activeness() {
3162 activeness_state.all_clean_event.notify(usize::MAX);
3165 }
3166 }
3167
3168 #[cfg(feature = "verify_aggregation_graph")]
3169 fn verify_aggregation_graph(
3170 &self,
3171 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3172 idle: bool,
3173 ) {
3174 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
3175 return;
3176 }
3177 use std::{collections::VecDeque, env, io::stdout};
3178
3179 use crate::backend::operation::{get_uppers, is_aggregating_node};
3180
3181 let mut ctx = self.execute_context(turbo_tasks);
3182 let root_tasks = self.root_tasks.lock().clone();
3183
3184 for task_id in root_tasks.into_iter() {
3185 let mut queue = VecDeque::new();
3186 let mut visited = FxHashSet::default();
3187 let mut aggregated_nodes = FxHashSet::default();
3188 let mut collectibles = FxHashMap::default();
3189 let root_task_id = task_id;
3190 visited.insert(task_id);
3191 aggregated_nodes.insert(task_id);
3192 queue.push_back(task_id);
3193 let mut counter = 0;
3194 while let Some(task_id) = queue.pop_front() {
3195 counter += 1;
3196 if counter % 100000 == 0 {
3197 println!(
3198 "queue={}, visited={}, aggregated_nodes={}",
3199 queue.len(),
3200 visited.len(),
3201 aggregated_nodes.len()
3202 );
3203 }
3204 let task = ctx.task(task_id, TaskDataCategory::All);
3205 if idle && !self.is_idle.load(Ordering::Relaxed) {
3206 return;
3207 }
3208
3209 let uppers = get_uppers(&task);
3210 if task_id != root_task_id
3211 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
3212 {
3213 panic!(
3214 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
3215 {:?})",
3216 task_id,
3217 task.get_task_description(),
3218 uppers
3219 );
3220 }
3221
3222 for (collectible, _) in task.iter_aggregated_collectibles() {
3223 collectibles
3224 .entry(*collectible)
3225 .or_insert_with(|| (false, Vec::new()))
3226 .1
3227 .push(task_id);
3228 }
3229
3230 for (&collectible, &value) in task.iter_collectibles() {
3231 if value > 0 {
3232 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
3233 *flag = true
3234 } else {
3235 panic!(
3236 "Task {} has a collectible {:?} that is not in any upper task",
3237 task_id, collectible
3238 );
3239 }
3240 }
3241 }
3242
3243 let is_dirty = task.has_dirty();
3244 let has_dirty_container = task.has_dirty_containers();
3245 let should_be_in_upper = is_dirty || has_dirty_container;
3246
3247 let aggregation_number = get_aggregation_number(&task);
3248 if is_aggregating_node(aggregation_number) {
3249 aggregated_nodes.insert(task_id);
3250 }
3251 for child_id in task.iter_children() {
3258 if visited.insert(child_id) {
3260 queue.push_back(child_id);
3261 }
3262 }
3263 drop(task);
3264
3265 if should_be_in_upper {
3266 for upper_id in uppers {
3267 let upper = ctx.task(upper_id, TaskDataCategory::All);
3268 let in_upper = upper
3269 .get_aggregated_dirty_containers(&task_id)
3270 .is_some_and(|&dirty| dirty > 0);
3271 if !in_upper {
3272 let containers: Vec<_> = upper
3273 .iter_aggregated_dirty_containers()
3274 .map(|(&k, &v)| (k, v))
3275 .collect();
3276 let upper_task_desc = upper.get_task_description();
3277 drop(upper);
3278 panic!(
3279 "Task {} ({}) is dirty, but is not listed in the upper task {} \
3280 ({})\nThese dirty containers are present:\n{:#?}",
3281 task_id,
3282 ctx.task(task_id, TaskDataCategory::Data)
3283 .get_task_description(),
3284 upper_id,
3285 upper_task_desc,
3286 containers,
3287 );
3288 }
3289 }
3290 }
3291 }
3292
3293 for (collectible, (flag, task_ids)) in collectibles {
3294 if !flag {
3295 use std::io::Write;
3296 let mut stdout = stdout().lock();
3297 writeln!(
3298 stdout,
3299 "{:?} that is not emitted in any child task but in these aggregated \
3300 tasks: {:#?}",
3301 collectible,
3302 task_ids
3303 .iter()
3304 .map(|t| format!(
3305 "{t} {}",
3306 ctx.task(*t, TaskDataCategory::Data).get_task_description()
3307 ))
3308 .collect::<Vec<_>>()
3309 )
3310 .unwrap();
3311
3312 let task_id = collectible.cell.task;
3313 let mut queue = {
3314 let task = ctx.task(task_id, TaskDataCategory::All);
3315 get_uppers(&task)
3316 };
3317 let mut visited = FxHashSet::default();
3318 for &upper_id in queue.iter() {
3319 visited.insert(upper_id);
3320 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3321 }
3322 while let Some(task_id) = queue.pop() {
3323 let task = ctx.task(task_id, TaskDataCategory::All);
3324 let desc = task.get_task_description();
3325 let aggregated_collectible = task
3326 .get_aggregated_collectibles(&collectible)
3327 .copied()
3328 .unwrap_or_default();
3329 let uppers = get_uppers(&task);
3330 drop(task);
3331 writeln!(
3332 stdout,
3333 "upper {task_id} {desc} collectible={aggregated_collectible}"
3334 )
3335 .unwrap();
3336 if task_ids.contains(&task_id) {
3337 writeln!(
3338 stdout,
3339 "Task has an upper connection to an aggregated task that doesn't \
3340 reference it. Upper connection is invalid!"
3341 )
3342 .unwrap();
3343 }
3344 for upper_id in uppers {
3345 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
3346 if !visited.contains(&upper_id) {
3347 queue.push(upper_id);
3348 }
3349 }
3350 }
3351 panic!("See stdout for more details");
3352 }
3353 }
3354 }
3355 }
3356
3357 fn assert_not_persistent_calling_transient(
3358 &self,
3359 parent_id: Option<TaskId>,
3360 child_id: TaskId,
3361 cell_id: Option<CellId>,
3362 ) {
3363 if let Some(parent_id) = parent_id
3364 && !parent_id.is_transient()
3365 && child_id.is_transient()
3366 {
3367 self.panic_persistent_calling_transient(
3368 self.debug_get_task_description(parent_id),
3369 self.debug_get_cached_task_type(child_id).as_deref(),
3370 cell_id,
3371 );
3372 }
3373 }
3374
3375 fn panic_persistent_calling_transient(
3376 &self,
3377 parent: String,
3378 child: Option<&CachedTaskType>,
3379 cell_id: Option<CellId>,
3380 ) {
3381 let transient_reason = if let Some(child) = child {
3382 Cow::Owned(format!(
3383 " The callee is transient because it depends on:\n{}",
3384 self.debug_trace_transient_task(child, cell_id),
3385 ))
3386 } else {
3387 Cow::Borrowed("")
3388 };
3389 panic!(
3390 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
3391 parent,
3392 child.map_or("unknown", |t| t.get_name()),
3393 transient_reason,
3394 );
3395 }
3396
3397 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
3398 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
3400 let task_info = if let Some(col_task_ty) = collectible
3402 .try_get_task_id()
3403 .map(|t| self.debug_get_task_description(t))
3404 {
3405 Cow::Owned(format!(" (return type of {col_task_ty})"))
3406 } else {
3407 Cow::Borrowed("")
3408 };
3409 panic!("Collectible{task_info} must be a ResolvedVc")
3410 };
3411 if col_task_id.is_transient() && !task_id.is_transient() {
3412 let transient_reason =
3413 if let Some(col_task_ty) = self.debug_get_cached_task_type(col_task_id) {
3414 Cow::Owned(format!(
3415 ". The collectible is transient because it depends on:\n{}",
3416 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
3417 ))
3418 } else {
3419 Cow::Borrowed("")
3420 };
3421 panic!(
3423 "Collectible is transient, transient collectibles cannot be emitted from \
3424 persistent tasks{transient_reason}",
3425 )
3426 }
3427 }
3428}
3429
3430impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
3431 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3432 self.0.startup(turbo_tasks);
3433 }
3434
3435 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3436 self.0.stopping();
3437 }
3438
3439 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3440 self.0.stop(turbo_tasks);
3441 }
3442
3443 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3444 self.0.idle_start(turbo_tasks);
3445 }
3446
3447 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3448 self.0.idle_end();
3449 }
3450
3451 fn get_or_create_persistent_task(
3452 &self,
3453 task_type: CachedTaskType,
3454 parent_task: Option<TaskId>,
3455 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3456 ) -> TaskId {
3457 self.0
3458 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
3459 }
3460
3461 fn get_or_create_transient_task(
3462 &self,
3463 task_type: CachedTaskType,
3464 parent_task: Option<TaskId>,
3465 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3466 ) -> TaskId {
3467 self.0
3468 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
3469 }
3470
3471 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3472 self.0.invalidate_task(task_id, turbo_tasks);
3473 }
3474
3475 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3476 self.0.invalidate_tasks(tasks, turbo_tasks);
3477 }
3478
3479 fn invalidate_tasks_set(
3480 &self,
3481 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
3482 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3483 ) {
3484 self.0.invalidate_tasks_set(tasks, turbo_tasks);
3485 }
3486
3487 fn invalidate_serialization(
3488 &self,
3489 task_id: TaskId,
3490 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3491 ) {
3492 self.0.invalidate_serialization(task_id, turbo_tasks);
3493 }
3494
3495 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3496 self.0.task_execution_canceled(task, turbo_tasks)
3497 }
3498
3499 fn try_start_task_execution(
3500 &self,
3501 task_id: TaskId,
3502 priority: TaskPriority,
3503 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3504 ) -> Option<TaskExecutionSpec<'_>> {
3505 self.0
3506 .try_start_task_execution(task_id, priority, turbo_tasks)
3507 }
3508
3509 fn task_execution_completed(
3510 &self,
3511 task_id: TaskId,
3512 result: Result<RawVc, TurboTasksExecutionError>,
3513 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
3514 #[cfg(feature = "verify_determinism")] stateful: bool,
3515 has_invalidator: bool,
3516 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3517 ) -> bool {
3518 self.0.task_execution_completed(
3519 task_id,
3520 result,
3521 cell_counters,
3522 #[cfg(feature = "verify_determinism")]
3523 stateful,
3524 has_invalidator,
3525 turbo_tasks,
3526 )
3527 }
3528
3529 type BackendJob = TurboTasksBackendJob;
3530
3531 fn run_backend_job<'a>(
3532 &'a self,
3533 job: Self::BackendJob,
3534 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
3535 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
3536 self.0.run_backend_job(job, turbo_tasks)
3537 }
3538
3539 fn try_read_task_output(
3540 &self,
3541 task_id: TaskId,
3542 reader: Option<TaskId>,
3543 options: ReadOutputOptions,
3544 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3545 ) -> Result<Result<RawVc, EventListener>> {
3546 self.0
3547 .try_read_task_output(task_id, reader, options, turbo_tasks)
3548 }
3549
3550 fn try_read_task_cell(
3551 &self,
3552 task_id: TaskId,
3553 cell: CellId,
3554 reader: Option<TaskId>,
3555 options: ReadCellOptions,
3556 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3557 ) -> Result<Result<TypedCellContent, EventListener>> {
3558 self.0
3559 .try_read_task_cell(task_id, reader, cell, options, turbo_tasks)
3560 }
3561
3562 fn try_read_own_task_cell(
3563 &self,
3564 task_id: TaskId,
3565 cell: CellId,
3566 options: ReadCellOptions,
3567 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3568 ) -> Result<TypedCellContent> {
3569 self.0
3570 .try_read_own_task_cell(task_id, cell, options, turbo_tasks)
3571 }
3572
3573 fn read_task_collectibles(
3574 &self,
3575 task_id: TaskId,
3576 collectible_type: TraitTypeId,
3577 reader: Option<TaskId>,
3578 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3579 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3580 self.0
3581 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3582 }
3583
3584 fn emit_collectible(
3585 &self,
3586 collectible_type: TraitTypeId,
3587 collectible: RawVc,
3588 task_id: TaskId,
3589 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3590 ) {
3591 self.0
3592 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3593 }
3594
3595 fn unemit_collectible(
3596 &self,
3597 collectible_type: TraitTypeId,
3598 collectible: RawVc,
3599 count: u32,
3600 task_id: TaskId,
3601 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3602 ) {
3603 self.0
3604 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3605 }
3606
3607 fn update_task_cell(
3608 &self,
3609 task_id: TaskId,
3610 cell: CellId,
3611 is_serializable_cell_content: bool,
3612 content: CellContent,
3613 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
3614 content_hash: Option<[u8; 16]>,
3615 verification_mode: VerificationMode,
3616 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3617 ) {
3618 self.0.update_task_cell(
3619 task_id,
3620 cell,
3621 is_serializable_cell_content,
3622 content,
3623 updated_key_hashes,
3624 content_hash,
3625 verification_mode,
3626 turbo_tasks,
3627 );
3628 }
3629
3630 fn mark_own_task_as_finished(
3631 &self,
3632 task_id: TaskId,
3633 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3634 ) {
3635 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3636 }
3637
3638 fn mark_own_task_as_session_dependent(
3639 &self,
3640 task: TaskId,
3641 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3642 ) {
3643 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3644 }
3645
3646 fn connect_task(
3647 &self,
3648 task: TaskId,
3649 parent_task: Option<TaskId>,
3650 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3651 ) {
3652 self.0.connect_task(task, parent_task, turbo_tasks);
3653 }
3654
3655 fn create_transient_task(
3656 &self,
3657 task_type: TransientTaskType,
3658 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3659 ) -> TaskId {
3660 self.0.create_transient_task(task_type)
3661 }
3662
3663 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3664 self.0.dispose_root_task(task_id, turbo_tasks);
3665 }
3666
3667 fn task_statistics(&self) -> &TaskStatisticsApi {
3668 &self.0.task_statistics
3669 }
3670
3671 fn is_tracking_dependencies(&self) -> bool {
3672 self.0.options.dependency_tracking
3673 }
3674
3675 fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String {
3676 self.0.get_task_name(task, turbo_tasks)
3677 }
3678}
3679
3680enum DebugTraceTransientTask {
3681 Cached {
3682 task_name: &'static str,
3683 cell_type_id: Option<ValueTypeId>,
3684 cause_self: Option<Box<DebugTraceTransientTask>>,
3685 cause_args: Vec<DebugTraceTransientTask>,
3686 },
3687 Collapsed {
3689 task_name: &'static str,
3690 cell_type_id: Option<ValueTypeId>,
3691 },
3692 Uncached {
3693 cell_type_id: Option<ValueTypeId>,
3694 },
3695}
3696
3697impl DebugTraceTransientTask {
3698 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3699 let indent = " ".repeat(level);
3700 f.write_str(&indent)?;
3701
3702 fn fmt_cell_type_id(
3703 f: &mut fmt::Formatter<'_>,
3704 cell_type_id: Option<ValueTypeId>,
3705 ) -> fmt::Result {
3706 if let Some(ty) = cell_type_id {
3707 write!(
3708 f,
3709 " (read cell of type {})",
3710 get_value_type(ty).ty.global_name
3711 )
3712 } else {
3713 Ok(())
3714 }
3715 }
3716
3717 match self {
3719 Self::Cached {
3720 task_name,
3721 cell_type_id,
3722 ..
3723 }
3724 | Self::Collapsed {
3725 task_name,
3726 cell_type_id,
3727 ..
3728 } => {
3729 f.write_str(task_name)?;
3730 fmt_cell_type_id(f, *cell_type_id)?;
3731 if matches!(self, Self::Collapsed { .. }) {
3732 f.write_str(" (collapsed)")?;
3733 }
3734 }
3735 Self::Uncached { cell_type_id } => {
3736 f.write_str("unknown transient task")?;
3737 fmt_cell_type_id(f, *cell_type_id)?;
3738 }
3739 }
3740 f.write_char('\n')?;
3741
3742 if let Self::Cached {
3744 cause_self,
3745 cause_args,
3746 ..
3747 } = self
3748 {
3749 if let Some(c) = cause_self {
3750 writeln!(f, "{indent} self:")?;
3751 c.fmt_indented(f, level + 1)?;
3752 }
3753 if !cause_args.is_empty() {
3754 writeln!(f, "{indent} args:")?;
3755 for c in cause_args {
3756 c.fmt_indented(f, level + 1)?;
3757 }
3758 }
3759 }
3760 Ok(())
3761 }
3762}
3763
3764impl fmt::Display for DebugTraceTransientTask {
3765 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3766 self.fmt_indented(f, 0)
3767 }
3768}
3769
3770fn far_future() -> Instant {
3772 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3777}
3778
3779fn encode_task_data(
3784 task: TaskId,
3785 data: &TaskStorage,
3786 category: SpecificTaskDataCategory,
3787 scratch_buffer: &mut TurboBincodeBuffer,
3788) -> Result<TurboBincodeBuffer> {
3789 scratch_buffer.clear();
3790 let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
3791 data.encode(category, &mut encoder)?;
3792
3793 if cfg!(feature = "verify_serialization") {
3794 TaskStorage::new()
3795 .decode(
3796 category,
3797 &mut new_turbo_bincode_decoder(&scratch_buffer[..]),
3798 )
3799 .with_context(|| {
3800 format!(
3801 "expected to be able to decode serialized data for '{category:?}' information \
3802 for {task}"
3803 )
3804 })?;
3805 }
3806 Ok(SmallVec::from_slice(scratch_buffer))
3807}