1mod dynamic_storage;
2mod operation;
3mod storage;
4
5use std::{
6 borrow::Cow,
7 cmp::min,
8 fmt::{self, Write},
9 future::Future,
10 hash::BuildHasherDefault,
11 mem::take,
12 ops::Range,
13 pin::Pin,
14 sync::{
15 Arc,
16 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17 },
18 thread::available_parallelism,
19};
20
21use anyhow::{Result, bail};
22use auto_hash_map::{AutoMap, AutoSet};
23use indexmap::IndexSet;
24use parking_lot::{Condvar, Mutex};
25use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
26use smallvec::{SmallVec, smallvec};
27use tokio::time::{Duration, Instant};
28use tracing::{field::Empty, info_span};
29use turbo_tasks::{
30 CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
31 SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
32 ValueTypeId,
33 backend::{
34 Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
35 TransientTaskType, TurboTasksExecutionError, TypedCellContent,
36 },
37 event::{Event, EventListener},
38 message_queue::TimingEvent,
39 registry::{self, get_value_type_global_name},
40 task_statistics::TaskStatisticsApi,
41 trace::TraceRawVcs,
42 turbo_tasks,
43 util::{IdFactoryWithReuse, good_chunk_size},
44};
45
46pub use self::{operation::AnyOperation, storage::TaskDataCategory};
47#[cfg(feature = "trace_task_dirty")]
48use crate::backend::operation::TaskDirtyCause;
49use crate::{
50 backend::{
51 operation::{
52 AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
53 CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
54 Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
55 get_uppers, is_root_node, prepare_new_children,
56 },
57 storage::{
58 InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
59 iter_many, remove,
60 },
61 },
62 backing_storage::BackingStorage,
63 data::{
64 ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
65 CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
66 InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
67 },
68 utils::{
69 bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
70 ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
71 },
72};
73
74const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
75
76struct SnapshotRequest {
77 snapshot_requested: bool,
78 suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
79}
80
81impl SnapshotRequest {
82 fn new() -> Self {
83 Self {
84 snapshot_requested: false,
85 suspended_operations: FxHashSet::default(),
86 }
87 }
88}
89
90type TransientTaskOnce =
91 Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;
92
93pub enum TransientTask {
94 Root(TransientTaskRoot),
100
101 Once(TransientTaskOnce),
110}
111
112pub enum StorageMode {
113 ReadOnly,
115 ReadWrite,
118}
119
120pub struct BackendOptions {
121 pub dependency_tracking: bool,
126
127 pub children_tracking: bool,
132
133 pub active_tracking: bool,
139
140 pub storage_mode: Option<StorageMode>,
142
143 pub small_preallocation: bool,
145}
146
147impl Default for BackendOptions {
148 fn default() -> Self {
149 Self {
150 dependency_tracking: true,
151 children_tracking: true,
152 active_tracking: true,
153 storage_mode: Some(StorageMode::ReadWrite),
154 small_preallocation: false,
155 }
156 }
157}
158
159pub enum TurboTasksBackendJob {
160 InitialSnapshot,
161 FollowUpSnapshot,
162 Prefetch {
163 data: Arc<FxIndexMap<TaskId, bool>>,
164 range: Option<Range<usize>>,
165 },
166}
167
168pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169
170type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171
172struct TurboTasksBackendInner<B: BackingStorage> {
173 options: BackendOptions,
174
175 start_time: Instant,
176 session_id: SessionId,
177
178 persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
179 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
180
181 persisted_task_cache_log: Option<TaskCacheLog>,
182 task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
183 transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
184
185 storage: Storage,
186
187 local_is_partial: AtomicBool,
189
190 in_progress_operations: AtomicUsize,
196
197 snapshot_request: Mutex<SnapshotRequest>,
198 operations_suspended: Condvar,
202 snapshot_completed: Condvar,
205 last_snapshot: AtomicU64,
207
208 stopping: AtomicBool,
209 stopping_event: Event,
210 idle_start_event: Event,
211 idle_end_event: Event,
212 #[cfg(feature = "verify_aggregation_graph")]
213 is_idle: AtomicBool,
214
215 task_statistics: TaskStatisticsApi,
216
217 backing_storage: B,
218
219 #[cfg(feature = "verify_aggregation_graph")]
220 root_tasks: Mutex<FxHashSet<TaskId>>,
221}
222
223impl<B: BackingStorage> TurboTasksBackend<B> {
224 pub fn new(options: BackendOptions, backing_storage: B) -> Self {
225 Self(Arc::new(TurboTasksBackendInner::new(
226 options,
227 backing_storage,
228 )))
229 }
230
231 pub fn backing_storage(&self) -> &B {
232 &self.0.backing_storage
233 }
234}
235
236impl<B: BackingStorage> TurboTasksBackendInner<B> {
237 pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
238 let shard_amount =
239 (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
240 let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
241 if !options.dependency_tracking {
242 options.active_tracking = false;
243 }
244 let small_preallocation = options.small_preallocation;
245 let next_task_id = backing_storage
246 .next_free_task_id()
247 .expect("Failed to get task id");
248 Self {
249 options,
250 start_time: Instant::now(),
251 session_id: backing_storage
252 .next_session_id()
253 .expect("Failed get session id"),
254 persisted_task_id_factory: IdFactoryWithReuse::new(
255 next_task_id,
256 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
257 ),
258 transient_task_id_factory: IdFactoryWithReuse::new(
259 TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
260 TaskId::MAX,
261 ),
262 persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
263 task_cache: BiMap::new(),
264 transient_tasks: FxDashMap::default(),
265 local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
266 storage: Storage::new(small_preallocation),
267 in_progress_operations: AtomicUsize::new(0),
268 snapshot_request: Mutex::new(SnapshotRequest::new()),
269 operations_suspended: Condvar::new(),
270 snapshot_completed: Condvar::new(),
271 last_snapshot: AtomicU64::new(0),
272 stopping: AtomicBool::new(false),
273 stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
274 idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()),
275 idle_end_event: Event::new(|| || "TurboTasksBackend::idle_end_event".to_string()),
276 #[cfg(feature = "verify_aggregation_graph")]
277 is_idle: AtomicBool::new(false),
278 task_statistics: TaskStatisticsApi::default(),
279 backing_storage,
280 #[cfg(feature = "verify_aggregation_graph")]
281 root_tasks: Default::default(),
282 }
283 }
284
285 fn execute_context<'a>(
286 &'a self,
287 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
288 ) -> impl ExecuteContext<'a> {
289 ExecuteContextImpl::new(self, turbo_tasks)
290 }
291
292 fn session_id(&self) -> SessionId {
293 self.session_id
294 }
295
296 unsafe fn execute_context_with_tx<'e, 'tx>(
300 &'e self,
301 tx: Option<&'e B::ReadTransaction<'tx>>,
302 turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
303 ) -> impl ExecuteContext<'e> + use<'e, 'tx, B>
304 where
305 'tx: 'e,
306 {
307 unsafe { ExecuteContextImpl::new_with_tx(self, tx, turbo_tasks) }
309 }
310
311 fn suspending_requested(&self) -> bool {
312 self.should_persist()
313 && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
314 }
315
316 fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
317 #[cold]
318 fn operation_suspend_point_cold<B: BackingStorage>(
319 this: &TurboTasksBackendInner<B>,
320 suspend: impl FnOnce() -> AnyOperation,
321 ) {
322 let operation = Arc::new(suspend());
323 let mut snapshot_request = this.snapshot_request.lock();
324 if snapshot_request.snapshot_requested {
325 snapshot_request
326 .suspended_operations
327 .insert(operation.clone().into());
328 let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
329 assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
330 if value == SNAPSHOT_REQUESTED_BIT {
331 this.operations_suspended.notify_all();
332 }
333 this.snapshot_completed
334 .wait_while(&mut snapshot_request, |snapshot_request| {
335 snapshot_request.snapshot_requested
336 });
337 this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
338 snapshot_request
339 .suspended_operations
340 .remove(&operation.into());
341 }
342 }
343
344 if self.suspending_requested() {
345 operation_suspend_point_cold(self, suspend);
346 }
347 }
348
349 pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
350 if !self.should_persist() {
351 return OperationGuard { backend: None };
352 }
353 let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
354 if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
355 let mut snapshot_request = self.snapshot_request.lock();
356 if snapshot_request.snapshot_requested {
357 let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
358 if value == SNAPSHOT_REQUESTED_BIT {
359 self.operations_suspended.notify_all();
360 }
361 self.snapshot_completed
362 .wait_while(&mut snapshot_request, |snapshot_request| {
363 snapshot_request.snapshot_requested
364 });
365 self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
366 }
367 }
368 OperationGuard {
369 backend: Some(self),
370 }
371 }
372
373 fn should_persist(&self) -> bool {
374 matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
375 }
376
377 fn should_restore(&self) -> bool {
378 self.options.storage_mode.is_some()
379 }
380
381 fn should_track_dependencies(&self) -> bool {
382 self.options.dependency_tracking
383 }
384
385 fn should_track_activeness(&self) -> bool {
386 self.options.active_tracking
387 }
388
389 fn should_track_children(&self) -> bool {
390 self.options.children_tracking
391 }
392
393 fn track_cache_hit(&self, task_type: &CachedTaskType) {
394 self.task_statistics
395 .map(|stats| stats.increment_cache_hit(task_type.native_fn));
396 }
397
398 fn track_cache_miss(&self, task_type: &CachedTaskType) {
399 self.task_statistics
400 .map(|stats| stats.increment_cache_miss(task_type.native_fn));
401 }
402}
403
404pub(crate) struct OperationGuard<'a, B: BackingStorage> {
405 backend: Option<&'a TurboTasksBackendInner<B>>,
406}
407
408impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
409 fn drop(&mut self) {
410 if let Some(backend) = self.backend {
411 let fetch_sub = backend
412 .in_progress_operations
413 .fetch_sub(1, Ordering::AcqRel);
414 if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
415 backend.operations_suspended.notify_all();
416 }
417 }
418 }
419}
420
421impl<B: BackingStorage> TurboTasksBackendInner<B> {
423 unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
427 &'l self,
428 tx: Option<&'l B::ReadTransaction<'tx>>,
429 parent_task: TaskId,
430 child_task: TaskId,
431 turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
432 ) {
433 operation::ConnectChildOperation::run(parent_task, child_task, unsafe {
434 self.execute_context_with_tx(tx, turbo_tasks)
435 });
436 }
437
438 fn connect_child(
439 &self,
440 parent_task: TaskId,
441 child_task: TaskId,
442 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
443 ) {
444 operation::ConnectChildOperation::run(
445 parent_task,
446 child_task,
447 self.execute_context(turbo_tasks),
448 );
449 }
450
451 fn try_read_task_output(
452 self: &Arc<Self>,
453 task_id: TaskId,
454 reader: Option<TaskId>,
455 consistency: ReadConsistency,
456 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
457 ) -> Result<Result<RawVc, EventListener>> {
458 if let Some(reader) = reader {
459 self.assert_not_persistent_calling_transient(reader, task_id, None);
460 }
461
462 let mut ctx = self.execute_context(turbo_tasks);
463 let mut task = ctx.task(task_id, TaskDataCategory::All);
464
465 fn listen_to_done_event<B: BackingStorage>(
466 this: &TurboTasksBackendInner<B>,
467 reader: Option<TaskId>,
468 done_event: &Event,
469 ) -> EventListener {
470 done_event.listen_with_note(move || {
471 let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
472 move || {
473 if let Some(reader_desc) = reader_desc.as_ref() {
474 format!("try_read_task_output from {}", reader_desc())
475 } else {
476 "try_read_task_output (untracked)".to_string()
477 }
478 }
479 })
480 }
481
482 fn check_in_progress<B: BackingStorage>(
483 this: &TurboTasksBackendInner<B>,
484 task: &impl TaskGuard,
485 reader: Option<TaskId>,
486 ctx: &impl ExecuteContext<'_>,
487 ) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
488 {
489 match get!(task, InProgress) {
490 Some(InProgressState::Scheduled { done_event, .. }) => {
491 Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
492 }
493 Some(InProgressState::InProgress(box InProgressStateInner {
494 done,
495 done_event,
496 ..
497 })) => {
498 if !*done {
499 Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
500 } else {
501 None
502 }
503 }
504 Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
505 "{} was canceled",
506 ctx.get_task_description(task.id())
507 ))),
508 None => None,
509 }
510 }
511
512 if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
513 loop {
515 let aggregation_number = get_aggregation_number(&task);
516 if is_root_node(aggregation_number) {
517 break;
518 }
519 drop(task);
520 {
521 let _span = tracing::trace_span!(
522 "make root node for strongly consistent read",
523 %task_id
524 )
525 .entered();
526 AggregationUpdateQueue::run(
527 AggregationUpdateJob::UpdateAggregationNumber {
528 task_id,
529 base_aggregation_number: u32::MAX,
530 distance: None,
531 },
532 &mut ctx,
533 );
534 }
535 task = ctx.task(task_id, TaskDataCategory::All);
536 }
537
538 let is_dirty =
539 get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
540
541 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
543 .cloned()
544 .unwrap_or_default()
545 .get(self.session_id);
546 if dirty_tasks > 0 || is_dirty {
547 let activeness = get_mut!(task, Activeness);
548 let mut task_ids_to_schedule: Vec<_> = Vec::new();
549 let activeness = if let Some(activeness) = activeness {
551 activeness.set_active_until_clean();
555 activeness
556 } else {
557 get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
561 .set_active_until_clean();
562 if ctx.should_track_activeness() {
563 task_ids_to_schedule = get_many!(
565 task,
566 AggregatedDirtyContainer {
567 task
568 } count if count.get(self.session_id) > 0 => {
569 task
570 }
571 );
572 task_ids_to_schedule.push(task_id);
573 }
574 get!(task, Activeness).unwrap()
575 };
576 let listener = activeness.all_clean_event.listen_with_note(move || {
577 let this = self.clone();
578 let tt = turbo_tasks.pin();
579 move || {
580 let tt: &dyn TurboTasksBackendApi<TurboTasksBackend<B>> = &*tt;
581 let mut ctx = this.execute_context(tt);
582 let mut visited = FxHashSet::default();
583 fn indent(s: &str) -> String {
584 s.split_inclusive('\n')
585 .flat_map(|line: &str| [" ", line].into_iter())
586 .collect::<String>()
587 }
588 fn get_info(
589 ctx: &mut impl ExecuteContext<'_>,
590 task_id: TaskId,
591 parent_and_count: Option<(TaskId, i32)>,
592 visited: &mut FxHashSet<TaskId>,
593 ) -> String {
594 let task = ctx.task(task_id, TaskDataCategory::Data);
595 let is_dirty = get!(task, Dirty)
596 .map_or(false, |dirty_state| dirty_state.get(ctx.session_id()));
597 let in_progress =
598 get!(task, InProgress).map_or("not in progress", |p| match p {
599 InProgressState::InProgress(_) => "in progress",
600 InProgressState::Scheduled { .. } => "scheduled",
601 InProgressState::Canceled => "canceled",
602 });
603 let activeness = get!(task, Activeness).map_or_else(
604 || "not active".to_string(),
605 |activeness| format!("{activeness:?}"),
606 );
607 let aggregation_number = get_aggregation_number(&task);
608 let missing_upper = if let Some((parent_task_id, _)) = parent_and_count
609 {
610 let uppers = get_uppers(&task);
611 !uppers.contains(&parent_task_id)
612 } else {
613 false
614 };
615
616 let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
618 .cloned()
619 .unwrap_or_default()
620 .get(ctx.session_id());
621
622 let task_description = ctx.get_task_description(task_id);
623 let is_dirty = if is_dirty { ", dirty" } else { "" };
624 let count = if let Some((_, count)) = parent_and_count {
625 format!(" {count}")
626 } else {
627 String::new()
628 };
629 let mut info = format!(
630 "{task_id} {task_description}{count} (aggr={aggregation_number}, \
631 {in_progress}, {activeness}{is_dirty})",
632 );
633 let children: Vec<_> = iter_many!(
634 task,
635 AggregatedDirtyContainer {
636 task
637 } count => {
638 (task, count.get(ctx.session_id()))
639 }
640 )
641 .filter(|(_, count)| *count > 0)
642 .collect();
643 drop(task);
644
645 if missing_upper {
646 info.push_str("\n ERROR: missing upper connection");
647 }
648
649 if dirty_tasks > 0 || !children.is_empty() {
650 writeln!(info, "\n {dirty_tasks} dirty tasks:").unwrap();
651
652 for (child_task_id, count) in children {
653 let task_description = ctx.get_task_description(child_task_id);
654 if visited.insert(child_task_id) {
655 let child_info = get_info(
656 ctx,
657 child_task_id,
658 Some((task_id, count)),
659 visited,
660 );
661 info.push_str(&indent(&child_info));
662 if !info.ends_with('\n') {
663 info.push('\n');
664 }
665 } else {
666 writeln!(
667 info,
668 " {child_task_id} {task_description} {count} \
669 (already visited)"
670 )
671 .unwrap();
672 }
673 }
674 }
675 info
676 }
677 let info = get_info(&mut ctx, task_id, None, &mut visited);
678 format!(
679 "try_read_task_output (strongly consistent) from {reader:?}\n{info}"
680 )
681 }
682 });
683 drop(task);
684 if !task_ids_to_schedule.is_empty() {
685 let mut queue = AggregationUpdateQueue::new();
686 queue.extend_find_and_schedule_dirty(task_ids_to_schedule);
687 queue.execute(&mut ctx);
688 }
689
690 return Ok(Err(listener));
691 }
692 }
693
694 if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
695 return value;
696 }
697
698 if let Some(output) = get!(task, Output) {
699 let result = match output {
700 OutputValue::Cell(cell) => Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
701 OutputValue::Output(task) => Ok(Ok(RawVc::TaskOutput(*task))),
702 OutputValue::Error(error) => {
703 let err: anyhow::Error = error.clone().into();
704 Err(err.context(format!(
705 "Execution of {} failed",
706 ctx.get_task_description(task_id)
707 )))
708 }
709 };
710 if self.should_track_dependencies()
711 && let Some(reader) = reader
712 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
713 {
714 let _ = task.add(CachedDataItem::OutputDependent {
715 task: reader,
716 value: (),
717 });
718 drop(task);
719
720 let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
721 if reader_task
722 .remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
723 .is_none()
724 {
725 let _ = reader_task.add(CachedDataItem::OutputDependency {
726 target: task_id,
727 value: (),
728 });
729 }
730 }
731
732 return result;
733 }
734
735 let note = move || {
736 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
737 move || {
738 if let Some(reader_desc) = reader_desc.as_ref() {
739 format!("try_read_task_output (recompute) from {}", (reader_desc)())
740 } else {
741 "try_read_task_output (recompute, untracked)".to_string()
742 }
743 }
744 };
745
746 let (item, listener) = CachedDataItem::new_scheduled_with_listener(
748 TaskExecutionReason::OutputNotAvailable,
749 || self.get_task_desc_fn(task_id),
750 note,
751 );
752 task.add_new(item);
755 ctx.schedule_task(task);
756
757 Ok(Err(listener))
758 }
759
760 fn try_read_task_cell(
761 &self,
762 task_id: TaskId,
763 reader: Option<TaskId>,
764 cell: CellId,
765 options: ReadCellOptions,
766 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
767 ) -> Result<Result<TypedCellContent, EventListener>> {
768 if let Some(reader) = reader {
769 self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
770 }
771
772 fn add_cell_dependency<B: BackingStorage>(
773 backend: &TurboTasksBackendInner<B>,
774 mut task: impl TaskGuard,
775 reader: Option<TaskId>,
776 cell: CellId,
777 task_id: TaskId,
778 ctx: &mut impl ExecuteContext<'_>,
779 ) {
780 if backend.should_track_dependencies()
781 && let Some(reader) = reader
782 && reader != task_id
785 && (!task.is_immutable() || cfg!(feature = "verify_immutable"))
786 {
787 let _ = task.add(CachedDataItem::CellDependent {
788 cell,
789 task: reader,
790 value: (),
791 });
792 drop(task);
793
794 let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
795 let target = CellRef {
796 task: task_id,
797 cell,
798 };
799 if reader_task
800 .remove(&CachedDataItemKey::OutdatedCellDependency { target })
801 .is_none()
802 {
803 let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () });
804 }
805 }
806 }
807
808 let mut ctx = self.execute_context(turbo_tasks);
809 let mut task = ctx.task(task_id, TaskDataCategory::Data);
810 let content = if options.final_read_hint {
811 remove!(task, CellData { cell })
812 } else if let Some(content) = get!(task, CellData { cell }) {
813 let content = content.clone();
814 Some(content)
815 } else {
816 None
817 };
818 if let Some(content) = content {
819 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
820 return Ok(Ok(TypedCellContent(
821 cell.type_id,
822 CellContent(Some(content.reference)),
823 )));
824 }
825
826 let in_progress = get!(task, InProgress);
827 if matches!(
828 in_progress,
829 Some(InProgressState::InProgress(..) | InProgressState::Scheduled { .. })
830 ) {
831 return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
832 }
833 let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
834 let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
835
836 let max_id = get!(
838 task,
839 CellTypeMaxIndex {
840 cell_type: cell.type_id
841 }
842 )
843 .copied();
844 let Some(max_id) = max_id else {
845 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
846 bail!(
847 "Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
848 ctx.get_task_description(task_id)
849 );
850 };
851 if cell.index >= max_id {
852 add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
853 bail!(
854 "Cell {cell:?} no longer exists in task {} (index out of bounds)",
855 ctx.get_task_description(task_id)
856 );
857 }
858
859 let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
864 if !new_listener {
865 return Ok(Err(listener));
866 }
867
868 let _span = tracing::trace_span!(
869 "recomputation",
870 cell_type = registry::get_value_type_global_name(cell.type_id),
871 cell_index = cell.index
872 )
873 .entered();
874
875 if is_cancelled {
877 bail!("{} was canceled", ctx.get_task_description(task_id));
878 } else if !is_scheduled
879 && task.add(CachedDataItem::new_scheduled(
880 TaskExecutionReason::CellNotAvailable,
881 || self.get_task_desc_fn(task_id),
882 ))
883 {
884 ctx.schedule_task(task);
885 }
886
887 Ok(Err(listener))
888 }
889
890 fn listen_to_cell(
891 &self,
892 task: &mut impl TaskGuard,
893 task_id: TaskId,
894 reader: Option<TaskId>,
895 cell: CellId,
896 ) -> (EventListener, bool) {
897 let note = move || {
898 let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
899 move || {
900 if let Some(reader_desc) = reader_desc.as_ref() {
901 format!("try_read_task_cell (in progress) from {}", (reader_desc)())
902 } else {
903 "try_read_task_cell (in progress, untracked)".to_string()
904 }
905 }
906 };
907 if let Some(in_progress) = get!(task, InProgressCell { cell }) {
908 let listener = in_progress.event.listen_with_note(note);
910 return (listener, false);
911 }
912 let in_progress = InProgressCellState::new(task_id, cell);
913 let listener = in_progress.event.listen_with_note(note);
914 task.add_new(CachedDataItem::InProgressCell {
915 cell,
916 value: in_progress,
917 });
918 (listener, true)
919 }
920
921 fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
922 if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
923 return Some(task_type);
924 }
925 if self.should_restore()
926 && self.local_is_partial.load(Ordering::Acquire)
927 && !task_id.is_transient()
928 && let Some(task_type) = unsafe {
929 self.backing_storage
930 .reverse_lookup_task_cache(None, task_id)
931 .expect("Failed to lookup task type")
932 }
933 {
934 let _ = self.task_cache.try_insert(task_type.clone(), task_id);
935 return Some(task_type);
936 }
937 None
938 }
939
940 fn get_task_desc_fn(&self, task_id: TaskId) -> impl Fn() -> String + Send + Sync + 'static {
941 let task_type = self.lookup_task_type(task_id);
942 move || {
943 task_type.as_ref().map_or_else(
944 || format!("{task_id:?} transient"),
945 |task_type| format!("{task_id:?} {task_type}"),
946 )
947 }
948 }
949
950 fn snapshot(&self) -> Option<(Instant, bool)> {
951 let start = Instant::now();
952 debug_assert!(self.should_persist());
953 let mut snapshot_request = self.snapshot_request.lock();
954 snapshot_request.snapshot_requested = true;
955 let active_operations = self
956 .in_progress_operations
957 .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
958 if active_operations != 0 {
959 self.operations_suspended
960 .wait_while(&mut snapshot_request, |_| {
961 self.in_progress_operations.load(Ordering::Relaxed) != SNAPSHOT_REQUESTED_BIT
962 });
963 }
964 let suspended_operations = snapshot_request
965 .suspended_operations
966 .iter()
967 .map(|op| op.arc().clone())
968 .collect::<Vec<_>>();
969 drop(snapshot_request);
970 let mut persisted_task_cache_log = self
971 .persisted_task_cache_log
972 .as_ref()
973 .map(|l| l.take(|i| i))
974 .unwrap_or_default();
975 self.storage.start_snapshot();
976 let mut snapshot_request = self.snapshot_request.lock();
977 snapshot_request.snapshot_requested = false;
978 self.in_progress_operations
979 .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
980 self.snapshot_completed.notify_all();
981 let snapshot_time = Instant::now();
982 drop(snapshot_request);
983
984 let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
985 if task_id.is_transient() {
986 return (None, None);
987 }
988 let len = inner.len();
989
990 let meta_restored = inner.state().meta_restored();
991 let data_restored = inner.state().data_restored();
992
993 let mut meta = meta_restored.then(|| Vec::with_capacity(len));
994 let mut data = data_restored.then(|| Vec::with_capacity(len));
995 for (key, value) in inner.iter_all() {
996 if key.is_persistent() && value.is_persistent() {
997 match key.category() {
998 TaskDataCategory::Meta => {
999 if let Some(meta) = &mut meta {
1000 meta.push(CachedDataItem::from_key_and_value_ref(key, value))
1001 }
1002 }
1003 TaskDataCategory::Data => {
1004 if let Some(data) = &mut data {
1005 data.push(CachedDataItem::from_key_and_value_ref(key, value))
1006 }
1007 }
1008 _ => {}
1009 }
1010 }
1011 }
1012
1013 (meta, data)
1014 };
1015 let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
1016 (
1017 task_id,
1018 meta.map(|d| self.backing_storage.serialize(task_id, &d)),
1019 data.map(|d| self.backing_storage.serialize(task_id, &d)),
1020 )
1021 };
1022 let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
1023 if task_id.is_transient() {
1024 return (task_id, None, None);
1025 }
1026 let len = inner.len();
1027 let mut meta = inner.meta_modified.then(|| Vec::with_capacity(len));
1028 let mut data = inner.data_modified.then(|| Vec::with_capacity(len));
1029 for (key, value) in inner.iter_all() {
1030 if key.is_persistent() && value.is_persistent() {
1031 match key.category() {
1032 TaskDataCategory::Meta => {
1033 if let Some(meta) = &mut meta {
1034 meta.push(CachedDataItem::from_key_and_value_ref(key, value));
1035 }
1036 }
1037 TaskDataCategory::Data => {
1038 if let Some(data) = &mut data {
1039 data.push(CachedDataItem::from_key_and_value_ref(key, value));
1040 }
1041 }
1042 _ => {}
1043 }
1044 }
1045 }
1046 (
1047 task_id,
1048 meta.map(|meta| self.backing_storage.serialize(task_id, &meta)),
1049 data.map(|data| self.backing_storage.serialize(task_id, &data)),
1050 )
1051 };
1052
1053 let snapshot = {
1054 let _span = tracing::trace_span!("take snapshot");
1055 self.storage
1056 .take_snapshot(&preprocess, &process, &process_snapshot)
1057 };
1058
1059 #[cfg(feature = "print_cache_item_size")]
1060 #[derive(Default)]
1061 struct TaskCacheStats {
1062 data: usize,
1063 data_count: usize,
1064 meta: usize,
1065 meta_count: usize,
1066 }
1067 #[cfg(feature = "print_cache_item_size")]
1068 impl TaskCacheStats {
1069 fn add_data(&mut self, len: usize) {
1070 self.data += len;
1071 self.data_count += 1;
1072 }
1073
1074 fn add_meta(&mut self, len: usize) {
1075 self.meta += len;
1076 self.meta_count += 1;
1077 }
1078 }
1079 #[cfg(feature = "print_cache_item_size")]
1080 let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
1081 Mutex::new(FxHashMap::default());
1082
1083 let task_snapshots = snapshot
1084 .into_iter()
1085 .filter_map(|iter| {
1086 let mut iter = iter
1087 .filter_map(
1088 |(task_id, meta, data): (
1089 _,
1090 Option<Result<SmallVec<_>>>,
1091 Option<Result<SmallVec<_>>>,
1092 )| {
1093 let meta = match meta {
1094 Some(Ok(meta)) => {
1095 #[cfg(feature = "print_cache_item_size")]
1096 task_cache_stats
1097 .lock()
1098 .entry(self.get_task_description(task_id))
1099 .or_default()
1100 .add_meta(meta.len());
1101 Some(meta)
1102 }
1103 None => None,
1104 Some(Err(err)) => {
1105 println!(
1106 "Serializing task {} failed (meta): {:?}",
1107 self.get_task_description(task_id),
1108 err
1109 );
1110 None
1111 }
1112 };
1113 let data = match data {
1114 Some(Ok(data)) => {
1115 #[cfg(feature = "print_cache_item_size")]
1116 task_cache_stats
1117 .lock()
1118 .entry(self.get_task_description(task_id))
1119 .or_default()
1120 .add_data(data.len());
1121 Some(data)
1122 }
1123 None => None,
1124 Some(Err(err)) => {
1125 println!(
1126 "Serializing task {} failed (data): {:?}",
1127 self.get_task_description(task_id),
1128 err
1129 );
1130 None
1131 }
1132 };
1133 (meta.is_some() || data.is_some()).then_some((task_id, meta, data))
1134 },
1135 )
1136 .peekable();
1137 iter.peek().is_some().then_some(iter)
1138 })
1139 .collect::<Vec<_>>();
1140
1141 swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1142
1143 let mut new_items = false;
1144
1145 if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
1146 new_items = true;
1147 if let Err(err) = self.backing_storage.save_snapshot(
1148 self.session_id,
1149 suspended_operations,
1150 persisted_task_cache_log,
1151 task_snapshots,
1152 ) {
1153 println!("Persisting failed: {err:?}");
1154 return None;
1155 }
1156 #[cfg(feature = "print_cache_item_size")]
1157 {
1158 let mut task_cache_stats = task_cache_stats
1159 .into_inner()
1160 .into_iter()
1161 .collect::<Vec<_>>();
1162 if !task_cache_stats.is_empty() {
1163 task_cache_stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
1164 (stats_b.data + stats_b.meta, key_b)
1165 .cmp(&(stats_a.data + stats_a.meta, key_a))
1166 });
1167 println!("Task cache stats:");
1168 for (task_desc, stats) in task_cache_stats {
1169 use std::ops::Div;
1170
1171 use turbo_tasks::util::FormatBytes;
1172
1173 println!(
1174 " {} {task_desc} = {} meta ({} x {}), {} data ({} x {})",
1175 FormatBytes(stats.data + stats.meta),
1176 FormatBytes(stats.meta),
1177 stats.meta_count,
1178 FormatBytes(stats.meta.checked_div(stats.meta_count).unwrap_or(0)),
1179 FormatBytes(stats.data),
1180 stats.data_count,
1181 FormatBytes(stats.data.checked_div(stats.data_count).unwrap_or(0)),
1182 );
1183 }
1184 }
1185 }
1186 }
1187
1188 if new_items {
1189 let elapsed = start.elapsed();
1190 turbo_tasks().send_compilation_event(Arc::new(TimingEvent::new(
1191 "Finished writing to persistent cache".to_string(),
1192 elapsed,
1193 )));
1194 }
1195
1196 Some((snapshot_time, new_items))
1197 }
1198
1199 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1200 if self.should_restore() {
1201 let uncompleted_operations = self
1205 .backing_storage
1206 .uncompleted_operations()
1207 .expect("Failed to get uncompleted operations");
1208 if !uncompleted_operations.is_empty() {
1209 let mut ctx = self.execute_context(turbo_tasks);
1210 for op in uncompleted_operations {
1211 op.execute(&mut ctx);
1212 }
1213 }
1214 }
1215
1216 if self.should_persist() {
1217 turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
1219 }
1220 }
1221
1222 fn stopping(&self) {
1223 self.stopping.store(true, Ordering::Release);
1224 self.stopping_event.notify(usize::MAX);
1225 }
1226
1227 #[allow(unused_variables)]
1228 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1229 #[cfg(feature = "verify_aggregation_graph")]
1230 {
1231 self.is_idle.store(false, Ordering::Release);
1232 self.verify_aggregation_graph(turbo_tasks, false);
1233 }
1234 if self.should_persist() {
1235 self.snapshot();
1236 }
1237 self.task_cache.drop_contents();
1238 drop_contents(&self.transient_tasks);
1239 self.storage.drop_contents();
1240 if let Err(err) = self.backing_storage.shutdown() {
1241 println!("Shutting down failed: {err}");
1242 }
1243 }
1244
1245 #[allow(unused_variables)]
1246 fn idle_start(self: &Arc<Self>, turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>) {
1247 self.idle_start_event.notify(usize::MAX);
1248
1249 #[cfg(feature = "verify_aggregation_graph")]
1250 {
1251 use tokio::select;
1252
1253 self.is_idle.store(true, Ordering::Release);
1254 let this = self.clone();
1255 let turbo_tasks = turbo_tasks.pin();
1256 tokio::task::spawn(async move {
1257 select! {
1258 _ = tokio::time::sleep(Duration::from_secs(5)) => {
1259 }
1261 _ = this.idle_end_event.listen() => {
1262 return;
1263 }
1264 }
1265 if !this.is_idle.load(Ordering::Relaxed) {
1266 return;
1267 }
1268 this.verify_aggregation_graph(&*turbo_tasks, true);
1269 });
1270 }
1271 }
1272
1273 fn idle_end(&self) {
1274 #[cfg(feature = "verify_aggregation_graph")]
1275 self.is_idle.store(false, Ordering::Release);
1276 self.idle_end_event.notify(usize::MAX);
1277 }
1278
1279 fn get_or_create_persistent_task(
1280 &self,
1281 task_type: CachedTaskType,
1282 parent_task: TaskId,
1283 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1284 ) -> TaskId {
1285 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1286 self.track_cache_hit(&task_type);
1287 self.connect_child(parent_task, task_id, turbo_tasks);
1288 return task_id;
1289 }
1290
1291 let check_backing_storage =
1292 self.should_restore() && self.local_is_partial.load(Ordering::Acquire);
1293 let tx = check_backing_storage
1294 .then(|| self.backing_storage.start_read_transaction())
1295 .flatten();
1296 let task_id = {
1297 if let Some(task_id) = unsafe {
1299 check_backing_storage
1300 .then(|| {
1301 self.backing_storage
1302 .forward_lookup_task_cache(tx.as_ref(), &task_type)
1303 .expect("Failed to lookup task id")
1304 })
1305 .flatten()
1306 } {
1307 self.track_cache_hit(&task_type);
1308 let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
1309 task_id
1310 } else {
1311 let task_type = Arc::new(task_type);
1312 let task_id = self.persisted_task_id_factory.get();
1313 let task_id = if let Err(existing_task_id) =
1314 self.task_cache.try_insert(task_type.clone(), task_id)
1315 {
1316 self.track_cache_hit(&task_type);
1317 unsafe {
1319 self.persisted_task_id_factory.reuse(task_id);
1320 }
1321 existing_task_id
1322 } else {
1323 self.track_cache_miss(&task_type);
1324 task_id
1325 };
1326 if let Some(log) = &self.persisted_task_cache_log {
1327 log.lock(task_id).push((task_type, task_id));
1328 }
1329 task_id
1330 }
1331 };
1332
1333 unsafe { self.connect_child_with_tx(tx.as_ref(), parent_task, task_id, turbo_tasks) };
1335
1336 task_id
1337 }
1338
1339 fn get_or_create_transient_task(
1340 &self,
1341 task_type: CachedTaskType,
1342 parent_task: TaskId,
1343 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1344 ) -> TaskId {
1345 if !parent_task.is_transient() {
1346 self.panic_persistent_calling_transient(
1347 self.lookup_task_type(parent_task).as_deref(),
1348 Some(&task_type),
1349 None,
1350 );
1351 }
1352 if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
1353 self.track_cache_hit(&task_type);
1354 self.connect_child(parent_task, task_id, turbo_tasks);
1355 return task_id;
1356 }
1357
1358 let task_type = Arc::new(task_type);
1359 let task_id = self.transient_task_id_factory.get();
1360 if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
1361 self.track_cache_hit(&task_type);
1362 unsafe {
1364 self.transient_task_id_factory.reuse(task_id);
1365 }
1366 self.connect_child(parent_task, existing_task_id, turbo_tasks);
1367 return existing_task_id;
1368 }
1369
1370 self.track_cache_miss(&task_type);
1371 self.connect_child(parent_task, task_id, turbo_tasks);
1372
1373 task_id
1374 }
1375
1376 fn debug_trace_transient_task(
1379 &self,
1380 task_type: &CachedTaskType,
1381 cell_id: Option<CellId>,
1382 ) -> DebugTraceTransientTask {
1383 fn inner_id(
1386 backend: &TurboTasksBackendInner<impl BackingStorage>,
1387 task_id: TaskId,
1388 cell_type_id: Option<ValueTypeId>,
1389 visited_set: &mut FxHashSet<TaskId>,
1390 ) -> DebugTraceTransientTask {
1391 if let Some(task_type) = backend.lookup_task_type(task_id) {
1392 if visited_set.contains(&task_id) {
1393 let task_name = task_type.get_name();
1394 DebugTraceTransientTask::Collapsed {
1395 task_name,
1396 cell_type_id,
1397 }
1398 } else {
1399 inner_cached(backend, &task_type, cell_type_id, visited_set)
1400 }
1401 } else {
1402 DebugTraceTransientTask::Uncached { cell_type_id }
1403 }
1404 }
1405 fn inner_cached(
1406 backend: &TurboTasksBackendInner<impl BackingStorage>,
1407 task_type: &CachedTaskType,
1408 cell_type_id: Option<ValueTypeId>,
1409 visited_set: &mut FxHashSet<TaskId>,
1410 ) -> DebugTraceTransientTask {
1411 let task_name = task_type.get_name();
1412
1413 let cause_self = task_type.this.and_then(|cause_self_raw_vc| {
1414 let Some(task_id) = cause_self_raw_vc.try_get_task_id() else {
1415 return None;
1419 };
1420 if task_id.is_transient() {
1421 Some(Box::new(inner_id(
1422 backend,
1423 task_id,
1424 cause_self_raw_vc.try_get_type_id(),
1425 visited_set,
1426 )))
1427 } else {
1428 None
1429 }
1430 });
1431 let cause_args = task_type
1432 .arg
1433 .get_raw_vcs()
1434 .into_iter()
1435 .filter_map(|raw_vc| {
1436 let Some(task_id) = raw_vc.try_get_task_id() else {
1437 return None;
1439 };
1440 if !task_id.is_transient() {
1441 return None;
1442 }
1443 Some((task_id, raw_vc.try_get_type_id()))
1444 })
1445 .collect::<IndexSet<_>>() .into_iter()
1447 .map(|(task_id, cell_type_id)| {
1448 inner_id(backend, task_id, cell_type_id, visited_set)
1449 })
1450 .collect();
1451
1452 DebugTraceTransientTask::Cached {
1453 task_name,
1454 cell_type_id,
1455 cause_self,
1456 cause_args,
1457 }
1458 }
1459 inner_cached(
1460 self,
1461 task_type,
1462 cell_id.map(|c| c.type_id),
1463 &mut FxHashSet::default(),
1464 )
1465 }
1466
1467 fn invalidate_task(
1468 &self,
1469 task_id: TaskId,
1470 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1471 ) {
1472 if !self.should_track_dependencies() {
1473 panic!("Dependency tracking is disabled so invalidation is not allowed");
1474 }
1475 operation::InvalidateOperation::run(
1476 smallvec![task_id],
1477 #[cfg(feature = "trace_task_dirty")]
1478 TaskDirtyCause::Invalidator,
1479 self.execute_context(turbo_tasks),
1480 );
1481 }
1482
1483 fn invalidate_tasks(
1484 &self,
1485 tasks: &[TaskId],
1486 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1487 ) {
1488 if !self.should_track_dependencies() {
1489 panic!("Dependency tracking is disabled so invalidation is not allowed");
1490 }
1491 operation::InvalidateOperation::run(
1492 tasks.iter().copied().collect(),
1493 #[cfg(feature = "trace_task_dirty")]
1494 TaskDirtyCause::Unknown,
1495 self.execute_context(turbo_tasks),
1496 );
1497 }
1498
1499 fn invalidate_tasks_set(
1500 &self,
1501 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
1502 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1503 ) {
1504 if !self.should_track_dependencies() {
1505 panic!("Dependency tracking is disabled so invalidation is not allowed");
1506 }
1507 operation::InvalidateOperation::run(
1508 tasks.iter().copied().collect(),
1509 #[cfg(feature = "trace_task_dirty")]
1510 TaskDirtyCause::Unknown,
1511 self.execute_context(turbo_tasks),
1512 );
1513 }
1514
1515 fn invalidate_serialization(
1516 &self,
1517 task_id: TaskId,
1518 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1519 ) {
1520 if task_id.is_transient() {
1521 return;
1522 }
1523 let mut ctx = self.execute_context(turbo_tasks);
1524 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1525 task.invalidate_serialization();
1526 }
1527
1528 fn get_task_description(&self, task_id: TaskId) -> String {
1529 self.lookup_task_type(task_id).map_or_else(
1530 || format!("{task_id:?} transient"),
1531 |task_type| task_type.to_string(),
1532 )
1533 }
1534
1535 fn task_execution_canceled(
1536 &self,
1537 task_id: TaskId,
1538 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1539 ) {
1540 let mut ctx = self.execute_context(turbo_tasks);
1541 let mut task = ctx.task(task_id, TaskDataCategory::Data);
1542 if let Some(in_progress) = remove!(task, InProgress) {
1543 match in_progress {
1544 InProgressState::Scheduled {
1545 done_event,
1546 reason: _,
1547 } => done_event.notify(usize::MAX),
1548 InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1549 done_event.notify(usize::MAX)
1550 }
1551 InProgressState::Canceled => {}
1552 }
1553 }
1554 task.add_new(CachedDataItem::InProgress {
1555 value: InProgressState::Canceled,
1556 });
1557 }
1558
1559 fn try_start_task_execution(
1560 &self,
1561 task_id: TaskId,
1562 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1563 ) -> Option<TaskExecutionSpec<'_>> {
1564 enum TaskType {
1565 Cached(Arc<CachedTaskType>),
1566 Transient(Arc<TransientTask>),
1567 }
1568 let (task_type, once_task) = if let Some(task_type) = self.lookup_task_type(task_id) {
1569 (TaskType::Cached(task_type), false)
1570 } else if let Some(task_type) = self.transient_tasks.get(&task_id) {
1571 (
1572 TaskType::Transient(task_type.clone()),
1573 matches!(**task_type, TransientTask::Once(_)),
1574 )
1575 } else {
1576 return None;
1577 };
1578 let execution_reason;
1579 {
1580 let mut ctx = self.execute_context(turbo_tasks);
1581 let mut task = ctx.task(task_id, TaskDataCategory::All);
1582 let in_progress = remove!(task, InProgress)?;
1583 let InProgressState::Scheduled { done_event, reason } = in_progress else {
1584 task.add_new(CachedDataItem::InProgress { value: in_progress });
1585 return None;
1586 };
1587 execution_reason = reason;
1588 task.add_new(CachedDataItem::InProgress {
1589 value: InProgressState::InProgress(Box::new(InProgressStateInner {
1590 stale: false,
1591 once_task,
1592 done_event,
1593 session_dependent: false,
1594 marked_as_completed: false,
1595 done: false,
1596 new_children: Default::default(),
1597 })),
1598 });
1599
1600 if self.should_track_children() {
1601 enum Collectible {
1603 Current(CollectibleRef, i32),
1604 Outdated(CollectibleRef),
1605 }
1606 let collectibles = iter_many!(task, Collectible { collectible } value => Collectible::Current(collectible, *value))
1607 .chain(iter_many!(task, OutdatedCollectible { collectible } => Collectible::Outdated(collectible)))
1608 .collect::<Vec<_>>();
1609 for collectible in collectibles {
1610 match collectible {
1611 Collectible::Current(collectible, value) => {
1612 let _ = task
1613 .insert(CachedDataItem::OutdatedCollectible { collectible, value });
1614 }
1615 Collectible::Outdated(collectible) => {
1616 if !task.has_key(&CachedDataItemKey::Collectible { collectible }) {
1617 task.remove(&CachedDataItemKey::OutdatedCollectible {
1618 collectible,
1619 });
1620 }
1621 }
1622 }
1623 }
1624 }
1625
1626 if self.should_track_dependencies() {
1627 enum Dep {
1629 CurrentCell(CellRef),
1630 CurrentOutput(TaskId),
1631 OutdatedCell(CellRef),
1632 OutdatedOutput(TaskId),
1633 }
1634 let dependencies = iter_many!(task, CellDependency { target } => Dep::CurrentCell(target))
1635 .chain(iter_many!(task, OutputDependency { target } => Dep::CurrentOutput(target)))
1636 .chain(iter_many!(task, OutdatedCellDependency { target } => Dep::OutdatedCell(target)))
1637 .chain(iter_many!(task, OutdatedOutputDependency { target } => Dep::OutdatedOutput(target)))
1638 .collect::<Vec<_>>();
1639 for dep in dependencies {
1640 match dep {
1641 Dep::CurrentCell(cell) => {
1642 let _ = task.add(CachedDataItem::OutdatedCellDependency {
1643 target: cell,
1644 value: (),
1645 });
1646 }
1647 Dep::CurrentOutput(output) => {
1648 let _ = task.add(CachedDataItem::OutdatedOutputDependency {
1649 target: output,
1650 value: (),
1651 });
1652 }
1653 Dep::OutdatedCell(cell) => {
1654 if !task.has_key(&CachedDataItemKey::CellDependency { target: cell }) {
1655 task.remove(&CachedDataItemKey::OutdatedCellDependency {
1656 target: cell,
1657 });
1658 }
1659 }
1660 Dep::OutdatedOutput(output) => {
1661 if !task
1662 .has_key(&CachedDataItemKey::OutputDependency { target: output })
1663 {
1664 task.remove(&CachedDataItemKey::OutdatedOutputDependency {
1665 target: output,
1666 });
1667 }
1668 }
1669 }
1670 }
1671 }
1672 }
1673
1674 let (span, future) = match task_type {
1675 TaskType::Cached(task_type) => {
1676 let CachedTaskType {
1677 native_fn,
1678 this,
1679 arg,
1680 } = &*task_type;
1681 (
1682 native_fn.span(task_id.persistence(), execution_reason),
1683 native_fn.execute(*this, &**arg),
1684 )
1685 }
1686 TaskType::Transient(task_type) => {
1687 let span = tracing::trace_span!("turbo_tasks::root_task");
1688 let future = match &*task_type {
1689 TransientTask::Root(f) => f(),
1690 TransientTask::Once(future_mutex) => take(&mut *future_mutex.lock())?,
1691 };
1692 (span, future)
1693 }
1694 };
1695 Some(TaskExecutionSpec { future, span })
1696 }
1697
1698 fn task_execution_result(
1699 &self,
1700 task_id: TaskId,
1701 result: Result<RawVc, TurboTasksExecutionError>,
1702 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1703 ) {
1704 operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1705 }
1706
1707 fn task_execution_completed(
1708 &self,
1709 task_id: TaskId,
1710 _duration: Duration,
1711 _memory_usage: usize,
1712 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
1713 stateful: bool,
1714 has_invalidator: bool,
1715 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1716 ) -> bool {
1717 let span = tracing::trace_span!("task execution completed", immutable = Empty).entered();
1732 let mut ctx = self.execute_context(turbo_tasks);
1733
1734 let mut task = ctx.task(task_id, TaskDataCategory::All);
1737 let Some(in_progress) = get_mut!(task, InProgress) else {
1738 panic!("Task execution completed, but task is not in progress: {task:#?}");
1739 };
1740 let &mut InProgressState::InProgress(box InProgressStateInner {
1741 stale,
1742 ref mut done,
1743 ref done_event,
1744 ref mut new_children,
1745 session_dependent,
1746 ..
1747 }) = in_progress
1748 else {
1749 panic!("Task execution completed, but task is not in progress: {task:#?}");
1750 };
1751
1752 #[cfg(not(feature = "no_fast_stale"))]
1754 if stale {
1755 let Some(InProgressState::InProgress(box InProgressStateInner {
1756 done_event,
1757 mut new_children,
1758 ..
1759 })) = remove!(task, InProgress)
1760 else {
1761 unreachable!();
1762 };
1763 task.add_new(CachedDataItem::InProgress {
1764 value: InProgressState::Scheduled {
1765 done_event,
1766 reason: TaskExecutionReason::Stale,
1767 },
1768 });
1769 for task in iter_many!(task, Child { task } => task) {
1772 new_children.remove(&task);
1773 }
1774 drop(task);
1775
1776 AggregationUpdateQueue::run(
1779 AggregationUpdateJob::DecreaseActiveCounts {
1780 task_ids: new_children.into_iter().collect(),
1781 },
1782 &mut ctx,
1783 );
1784 return true;
1785 }
1786
1787 if cfg!(not(feature = "no_fast_stale")) || !stale {
1788 *done = true;
1790 done_event.notify(usize::MAX);
1791 }
1792
1793 let mut new_children = take(new_children);
1795
1796 if stateful {
1798 let _ = task.add(CachedDataItem::Stateful { value: () });
1799 }
1800
1801 if has_invalidator {
1803 let _ = task.add(CachedDataItem::HasInvalidator { value: () });
1804 }
1805
1806 let old_counters: FxHashMap<_, _> =
1808 get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, *max_index));
1809 let mut counters_to_remove = old_counters.clone();
1810 for (&cell_type, &max_index) in cell_counters.iter() {
1811 if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
1812 if old_max_index != max_index {
1813 task.insert(CachedDataItem::CellTypeMaxIndex {
1814 cell_type,
1815 value: max_index,
1816 });
1817 }
1818 } else {
1819 task.add_new(CachedDataItem::CellTypeMaxIndex {
1820 cell_type,
1821 value: max_index,
1822 });
1823 }
1824 }
1825 for (cell_type, _) in counters_to_remove {
1826 task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1827 }
1828
1829 let mut queue = AggregationUpdateQueue::new();
1830
1831 let mut removed_data = Vec::new();
1832 let mut old_edges = Vec::new();
1833
1834 let has_children = !new_children.is_empty();
1835 let is_immutable = task.is_immutable();
1836 let task_dependencies_for_immutable =
1837 if !is_immutable
1839 && !session_dependent
1841 && !task.has_key(&CachedDataItemKey::HasInvalidator {})
1843 && !task.has_key(&CachedDataItemKey::Stateful {})
1845 && count!(task, CollectiblesDependency) == 0
1847 {
1848 Some(
1849 iter_many!(task, OutputDependency { target } => target)
1851 .chain(iter_many!(task, CellDependency { target } => target.task))
1852 .collect::<FxHashSet<_>>(),
1853 )
1854 } else {
1855 None
1856 };
1857
1858 if has_children {
1860 prepare_new_children(task_id, &mut task, &new_children, &mut queue);
1861 }
1862
1863 if has_children {
1865 old_edges.extend(
1866 iter_many!(task, Child { task } => task)
1867 .filter(|task| !new_children.remove(task))
1868 .map(OutdatedEdge::Child),
1869 );
1870 } else {
1871 old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
1872 }
1873
1874 if task_id.is_transient() || iter_many!(task, CellData { cell }
1879 if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
1880 ).count() > 0 {
1881 removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
1882 matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
1883 .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
1884 }));
1885 }
1886 if self.should_track_children() {
1887 old_edges.extend(
1888 task.iter(CachedDataItemType::OutdatedCollectible)
1889 .filter_map(|(key, value)| match (key, value) {
1890 (
1891 CachedDataItemKey::OutdatedCollectible { collectible },
1892 CachedDataItemValueRef::OutdatedCollectible { value },
1893 ) => Some(OutdatedEdge::Collectible(collectible, *value)),
1894 _ => None,
1895 }),
1896 );
1897 }
1898 if self.should_track_dependencies() {
1899 old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target)));
1900 old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
1901 old_edges.extend(
1902 iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map(
1903 |(cell, task)| {
1904 if cell_counters
1905 .get(&cell.type_id)
1906 .is_none_or(|start_index| cell.index >= *start_index)
1907 && let Some(old_counter) = old_counters.get(&cell.type_id)
1908 && cell.index < *old_counter
1909 {
1910 return Some(OutdatedEdge::RemovedCellDependent {
1911 task_id: task,
1912 #[cfg(feature = "trace_task_dirty")]
1913 value_type_id: cell.type_id,
1914 });
1915 }
1916 None
1917 },
1918 ),
1919 );
1920 }
1921
1922 drop(task);
1923
1924 let mut is_now_immutable = false;
1926 if let Some(dependencies) = task_dependencies_for_immutable
1927 && dependencies
1928 .iter()
1929 .all(|&task_id| ctx.task(task_id, TaskDataCategory::Data).is_immutable())
1930 {
1931 is_now_immutable = true;
1932 }
1933 span.record("immutable", is_immutable || is_now_immutable);
1934
1935 if !queue.is_empty() || !old_edges.is_empty() {
1936 #[cfg(feature = "trace_task_completion")]
1937 let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
1938 CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx);
1942 }
1943
1944 let mut task = ctx.task(task_id, TaskDataCategory::All);
1951 let Some(in_progress) = get!(task, InProgress) else {
1952 panic!("Task execution completed, but task is not in progress: {task:#?}");
1953 };
1954 let InProgressState::InProgress(box InProgressStateInner {
1955 #[cfg(not(feature = "no_fast_stale"))]
1956 stale,
1957 ..
1958 }) = in_progress
1959 else {
1960 panic!("Task execution completed, but task is not in progress: {task:#?}");
1961 };
1962
1963 #[cfg(not(feature = "no_fast_stale"))]
1965 if *stale {
1966 let Some(InProgressState::InProgress(box InProgressStateInner { done_event, .. })) =
1967 remove!(task, InProgress)
1968 else {
1969 unreachable!();
1970 };
1971 task.add_new(CachedDataItem::InProgress {
1972 value: InProgressState::Scheduled {
1973 done_event,
1974 reason: TaskExecutionReason::Stale,
1975 },
1976 });
1977 drop(task);
1978
1979 AggregationUpdateQueue::run(
1982 AggregationUpdateJob::DecreaseActiveCounts {
1983 task_ids: new_children.into_iter().collect(),
1984 },
1985 &mut ctx,
1986 );
1987 return true;
1988 }
1989
1990 if is_now_immutable {
1993 let _ = task.add(CachedDataItem::Immutable { value: () });
1994 }
1995
1996 let mut queue = AggregationUpdateQueue::new();
1997
1998 if has_children {
1999 let has_active_count = ctx.should_track_activeness()
2000 && get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
2001 connect_children(
2002 task_id,
2003 &mut task,
2004 new_children,
2005 &mut queue,
2006 has_active_count,
2007 ctx.should_track_activeness(),
2008 );
2009 }
2010
2011 drop(task);
2012
2013 if has_children {
2014 #[cfg(feature = "trace_task_completion")]
2015 let _span = tracing::trace_span!("connect new children").entered();
2016 queue.execute(&mut ctx);
2017 }
2018
2019 let mut task = ctx.task(task_id, TaskDataCategory::All);
2020 let Some(in_progress) = remove!(task, InProgress) else {
2021 panic!("Task execution completed, but task is not in progress: {task:#?}");
2022 };
2023 let InProgressState::InProgress(box InProgressStateInner {
2024 done_event,
2025 once_task: _,
2026 stale,
2027 session_dependent,
2028 done: _,
2029 marked_as_completed: _,
2030 new_children,
2031 }) = in_progress
2032 else {
2033 panic!("Task execution completed, but task is not in progress: {task:#?}");
2034 };
2035 debug_assert!(new_children.is_empty());
2036
2037 if stale {
2039 task.add_new(CachedDataItem::InProgress {
2040 value: InProgressState::Scheduled {
2041 done_event,
2042 reason: TaskExecutionReason::Stale,
2043 },
2044 });
2045 return true;
2046 }
2047
2048 removed_data.extend(task.extract_if(
2050 CachedDataItemType::InProgressCell,
2051 |key, value| match (key, value) {
2052 (
2053 CachedDataItemKey::InProgressCell { .. },
2054 CachedDataItemValueRef::InProgressCell { value },
2055 ) => {
2056 value.event.notify(usize::MAX);
2057 true
2058 }
2059 _ => false,
2060 },
2061 ));
2062
2063 let old_dirty_state = get!(task, Dirty).copied();
2065
2066 let new_dirty_state = if session_dependent {
2067 Some(DirtyState {
2068 clean_in_session: Some(self.session_id),
2069 })
2070 } else {
2071 None
2072 };
2073
2074 let data_update = if old_dirty_state != new_dirty_state {
2075 if let Some(new_dirty_state) = new_dirty_state {
2076 task.insert(CachedDataItem::Dirty {
2077 value: new_dirty_state,
2078 });
2079 } else {
2080 task.remove(&CachedDataItemKey::Dirty {});
2081 }
2082
2083 if self.should_track_children()
2084 && (old_dirty_state.is_some() || new_dirty_state.is_some())
2085 {
2086 let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
2087 .cloned()
2088 .unwrap_or_default();
2089 if let Some(old_dirty_state) = old_dirty_state {
2090 dirty_containers.update_with_dirty_state(&old_dirty_state);
2091 }
2092 let aggregated_update = match (old_dirty_state, new_dirty_state) {
2093 (None, None) => unreachable!(),
2094 (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
2095 (None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
2096 (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
2097 };
2098 if !aggregated_update.is_zero() {
2099 if aggregated_update.get(self.session_id) < 0
2100 && let Some(activeness_state) = get_mut!(task, Activeness)
2101 {
2102 activeness_state.all_clean_event.notify(usize::MAX);
2103 activeness_state.unset_active_until_clean();
2104 if activeness_state.is_empty() {
2105 task.remove(&CachedDataItemKey::Activeness {});
2106 }
2107 }
2108 AggregationUpdateJob::data_update(
2109 &mut task,
2110 AggregatedDataUpdate::new()
2111 .dirty_container_update(task_id, aggregated_update),
2112 )
2113 } else {
2114 None
2115 }
2116 } else {
2117 None
2118 }
2119 } else {
2120 None
2121 };
2122
2123 drop(task);
2124
2125 if let Some(data_update) = data_update {
2126 AggregationUpdateQueue::run(data_update, &mut ctx);
2127 }
2128
2129 drop(removed_data);
2130
2131 let mut task = ctx.task(task_id, TaskDataCategory::All);
2134 task.shrink_to_fit(CachedDataItemType::CellData);
2135 task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
2136 task.shrink_to_fit(CachedDataItemType::CellDependency);
2137 task.shrink_to_fit(CachedDataItemType::OutputDependency);
2138 task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);
2139 drop(task);
2140
2141 false
2142 }
2143
2144 fn run_backend_job<'a>(
2145 self: &'a Arc<Self>,
2146 job: TurboTasksBackendJob,
2147 turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2148 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2149 Box::pin(async move {
2150 match job {
2151 TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2152 debug_assert!(self.should_persist());
2153
2154 let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2155 let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2156 loop {
2157 const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2158 const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2159 const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2160
2161 let time = if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2162 FIRST_SNAPSHOT_WAIT
2163 } else {
2164 SNAPSHOT_INTERVAL
2165 };
2166
2167 let until = last_snapshot + time;
2168 if until > Instant::now() {
2169 let mut stop_listener = self.stopping_event.listen();
2170 if self.stopping.load(Ordering::Acquire) {
2171 return;
2172 }
2173 let mut idle_start_listener = self.idle_start_event.listen();
2174 let mut idle_end_listener = self.idle_end_event.listen();
2175 let mut idle_time = if turbo_tasks.is_idle() {
2176 Instant::now() + IDLE_TIMEOUT
2177 } else {
2178 far_future()
2179 };
2180 loop {
2181 tokio::select! {
2182 _ = &mut stop_listener => {
2183 return;
2184 },
2185 _ = &mut idle_start_listener => {
2186 idle_time = Instant::now() + IDLE_TIMEOUT;
2187 idle_start_listener = self.idle_start_event.listen()
2188 },
2189 _ = &mut idle_end_listener => {
2190 idle_time = until + IDLE_TIMEOUT;
2191 idle_end_listener = self.idle_end_event.listen()
2192 },
2193 _ = tokio::time::sleep_until(until) => {
2194 break;
2195 },
2196 _ = tokio::time::sleep_until(idle_time) => {
2197 if turbo_tasks.is_idle() {
2198 break;
2199 }
2200 },
2201 }
2202 }
2203 }
2204
2205 let this = self.clone();
2206 let snapshot = this.snapshot();
2207 if let Some((snapshot_start, new_data)) = snapshot {
2208 last_snapshot = snapshot_start;
2209 if new_data {
2210 continue;
2211 }
2212 let last_snapshot = last_snapshot.duration_since(self.start_time);
2213 self.last_snapshot.store(
2214 last_snapshot.as_millis().try_into().unwrap(),
2215 Ordering::Relaxed,
2216 );
2217
2218 turbo_tasks.schedule_backend_background_job(
2219 TurboTasksBackendJob::FollowUpSnapshot,
2220 );
2221 return;
2222 }
2223 }
2224 }
2225 TurboTasksBackendJob::Prefetch { data, range } => {
2226 let range = if let Some(range) = range {
2227 range
2228 } else {
2229 if data.len() > 128 {
2230 let chunk_size = good_chunk_size(data.len());
2231 let chunks = data.len().div_ceil(chunk_size);
2232 for i in 0..chunks {
2233 turbo_tasks.schedule_backend_foreground_job(
2234 TurboTasksBackendJob::Prefetch {
2235 data: data.clone(),
2236 range: Some(
2237 (i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2238 ),
2239 },
2240 );
2241 }
2242 return;
2243 }
2244 0..data.len()
2245 };
2246
2247 let _span = info_span!("prefetching").entered();
2248 let mut ctx = self.execute_context(turbo_tasks);
2249 for i in range {
2250 let (&task, &with_data) = data.get_index(i).unwrap();
2251 let category = if with_data {
2252 TaskDataCategory::All
2253 } else {
2254 TaskDataCategory::Meta
2255 };
2256 drop(ctx.task(task, category));
2258 }
2259 }
2260 }
2261 })
2262 }
2263
2264 fn try_read_own_task_cell_untracked(
2265 &self,
2266 task_id: TaskId,
2267 cell: CellId,
2268 _options: ReadCellOptions,
2269 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2270 ) -> Result<TypedCellContent> {
2271 let mut ctx = self.execute_context(turbo_tasks);
2272 let task = ctx.task(task_id, TaskDataCategory::Data);
2273 if let Some(content) = get!(task, CellData { cell }) {
2274 Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
2275 } else {
2276 Ok(CellContent(None).into_typed(cell.type_id))
2277 }
2278 }
2279
2280 fn read_task_collectibles(
2281 &self,
2282 task_id: TaskId,
2283 collectible_type: TraitTypeId,
2284 reader_id: TaskId,
2285 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2286 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
2287 if !self.should_track_children() {
2288 return AutoMap::default();
2289 }
2290
2291 let mut ctx = self.execute_context(turbo_tasks);
2292 let mut collectibles = AutoMap::default();
2293 {
2294 let mut task = ctx.task(task_id, TaskDataCategory::All);
2295 loop {
2297 let aggregation_number = get_aggregation_number(&task);
2298 if is_root_node(aggregation_number) {
2299 break;
2300 }
2301 drop(task);
2302 AggregationUpdateQueue::run(
2303 AggregationUpdateJob::UpdateAggregationNumber {
2304 task_id,
2305 base_aggregation_number: u32::MAX,
2306 distance: None,
2307 },
2308 &mut ctx,
2309 );
2310 task = ctx.task(task_id, TaskDataCategory::All);
2311 }
2312 for collectible in iter_many!(
2313 task,
2314 AggregatedCollectible {
2315 collectible
2316 } count if collectible.collectible_type == collectible_type && *count > 0 => {
2317 collectible.cell
2318 }
2319 ) {
2320 *collectibles
2321 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2322 .or_insert(0) += 1;
2323 }
2324 for (collectible, count) in iter_many!(
2325 task,
2326 Collectible {
2327 collectible
2328 } count if collectible.collectible_type == collectible_type => {
2329 (collectible.cell, *count)
2330 }
2331 ) {
2332 *collectibles
2333 .entry(RawVc::TaskCell(collectible.task, collectible.cell))
2334 .or_insert(0) += count;
2335 }
2336 let _ = task.add(CachedDataItem::CollectiblesDependent {
2337 collectible_type,
2338 task: reader_id,
2339 value: (),
2340 });
2341 }
2342 {
2343 let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
2344 let target = CollectiblesRef {
2345 task: task_id,
2346 collectible_type,
2347 };
2348 if reader
2349 .remove(&CachedDataItemKey::OutdatedCollectiblesDependency { target })
2350 .is_none()
2351 {
2352 let _ = reader.add(CachedDataItem::CollectiblesDependency { target, value: () });
2353 }
2354 }
2355 collectibles
2356 }
2357
2358 fn emit_collectible(
2359 &self,
2360 collectible_type: TraitTypeId,
2361 collectible: RawVc,
2362 task_id: TaskId,
2363 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2364 ) {
2365 self.assert_valid_collectible(task_id, collectible);
2366 if !self.should_track_children() {
2367 return;
2368 }
2369
2370 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2371 panic!("Collectibles need to be resolved");
2372 };
2373 let cell = CellRef {
2374 task: collectible_task,
2375 cell,
2376 };
2377 operation::UpdateCollectibleOperation::run(
2378 task_id,
2379 CollectibleRef {
2380 collectible_type,
2381 cell,
2382 },
2383 1,
2384 self.execute_context(turbo_tasks),
2385 );
2386 }
2387
2388 fn unemit_collectible(
2389 &self,
2390 collectible_type: TraitTypeId,
2391 collectible: RawVc,
2392 count: u32,
2393 task_id: TaskId,
2394 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2395 ) {
2396 self.assert_valid_collectible(task_id, collectible);
2397 if !self.should_track_children() {
2398 return;
2399 }
2400
2401 let RawVc::TaskCell(collectible_task, cell) = collectible else {
2402 panic!("Collectibles need to be resolved");
2403 };
2404 let cell = CellRef {
2405 task: collectible_task,
2406 cell,
2407 };
2408 operation::UpdateCollectibleOperation::run(
2409 task_id,
2410 CollectibleRef {
2411 collectible_type,
2412 cell,
2413 },
2414 -(i32::try_from(count).unwrap()),
2415 self.execute_context(turbo_tasks),
2416 );
2417 }
2418
2419 fn update_task_cell(
2420 &self,
2421 task_id: TaskId,
2422 cell: CellId,
2423 content: CellContent,
2424 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2425 ) {
2426 operation::UpdateCellOperation::run(
2427 task_id,
2428 cell,
2429 content,
2430 self.execute_context(turbo_tasks),
2431 );
2432 }
2433
2434 fn mark_own_task_as_session_dependent(
2435 &self,
2436 task_id: TaskId,
2437 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2438 ) {
2439 if !self.should_track_dependencies() {
2440 return;
2442 }
2443 const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
2444 let mut ctx = self.execute_context(turbo_tasks);
2445 let mut task = ctx.task(task_id, TaskDataCategory::Meta);
2446 let aggregation_number = get_aggregation_number(&task);
2447 if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
2448 drop(task);
2449 AggregationUpdateQueue::run(
2452 AggregationUpdateJob::UpdateAggregationNumber {
2453 task_id,
2454 base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
2455 distance: None,
2456 },
2457 &mut ctx,
2458 );
2459 task = ctx.task(task_id, TaskDataCategory::Meta);
2460 }
2461 if let Some(InProgressState::InProgress(box InProgressStateInner {
2462 session_dependent,
2463 ..
2464 })) = get_mut!(task, InProgress)
2465 {
2466 *session_dependent = true;
2467 }
2468 }
2469
2470 fn mark_own_task_as_finished(
2471 &self,
2472 task: TaskId,
2473 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2474 ) {
2475 let mut ctx = self.execute_context(turbo_tasks);
2476 let mut task = ctx.task(task, TaskDataCategory::Data);
2477 if let Some(InProgressState::InProgress(box InProgressStateInner {
2478 marked_as_completed,
2479 ..
2480 })) = get_mut!(task, InProgress)
2481 {
2482 *marked_as_completed = true;
2483 }
2488 }
2489
2490 fn set_own_task_aggregation_number(
2491 &self,
2492 task: TaskId,
2493 aggregation_number: u32,
2494 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2495 ) {
2496 let mut ctx = self.execute_context(turbo_tasks);
2497 AggregationUpdateQueue::run(
2498 AggregationUpdateJob::UpdateAggregationNumber {
2499 task_id: task,
2500 base_aggregation_number: aggregation_number,
2501 distance: None,
2502 },
2503 &mut ctx,
2504 );
2505 }
2506
2507 fn connect_task(
2508 &self,
2509 task: TaskId,
2510 parent_task: TaskId,
2511 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2512 ) {
2513 self.assert_not_persistent_calling_transient(parent_task, task, None);
2514 ConnectChildOperation::run(parent_task, task, self.execute_context(turbo_tasks));
2515 }
2516
2517 fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
2518 let task_id = self.transient_task_id_factory.get();
2519 let root_type = match task_type {
2520 TransientTaskType::Root(_) => RootType::RootTask,
2521 TransientTaskType::Once(_) => RootType::OnceTask,
2522 };
2523 self.transient_tasks.insert(
2524 task_id,
2525 Arc::new(match task_type {
2526 TransientTaskType::Root(f) => TransientTask::Root(f),
2527 TransientTaskType::Once(f) => TransientTask::Once(Mutex::new(Some(f))),
2528 }),
2529 );
2530 {
2531 let mut task = self.storage.access_mut(task_id);
2532 task.add(CachedDataItem::AggregationNumber {
2533 value: AggregationNumber {
2534 base: u32::MAX,
2535 distance: 0,
2536 effective: u32::MAX,
2537 },
2538 });
2539 if self.should_track_activeness() {
2540 task.add(CachedDataItem::Activeness {
2541 value: ActivenessState::new_root(root_type, task_id),
2542 });
2543 }
2544 task.add(CachedDataItem::new_scheduled(
2545 TaskExecutionReason::Initial,
2546 move || {
2547 move || match root_type {
2548 RootType::RootTask => "Root Task".to_string(),
2549 RootType::OnceTask => "Once Task".to_string(),
2550 }
2551 },
2552 ));
2553 }
2554 #[cfg(feature = "verify_aggregation_graph")]
2555 self.root_tasks.lock().insert(task_id);
2556 task_id
2557 }
2558
2559 fn dispose_root_task(
2560 &self,
2561 task_id: TaskId,
2562 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2563 ) {
2564 #[cfg(feature = "verify_aggregation_graph")]
2565 self.root_tasks.lock().remove(&task_id);
2566
2567 let mut ctx = self.execute_context(turbo_tasks);
2568 let mut task = ctx.task(task_id, TaskDataCategory::All);
2569 let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
2570 let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
2571 .map_or(false, |dirty_containers| {
2572 dirty_containers.get(self.session_id) > 0
2573 });
2574 if is_dirty || has_dirty_containers {
2575 if let Some(activeness_state) = get_mut!(task, Activeness) {
2576 activeness_state.unset_root_type();
2578 activeness_state.set_active_until_clean();
2579 };
2580 } else if let Some(activeness_state) = remove!(task, Activeness) {
2581 activeness_state.all_clean_event.notify(usize::MAX);
2584 }
2585 }
2586
2587 #[cfg(feature = "verify_aggregation_graph")]
2588 fn verify_aggregation_graph(
2589 &self,
2590 turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
2591 idle: bool,
2592 ) {
2593 if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") {
2594 return;
2595 }
2596 use std::{collections::VecDeque, env, io::stdout};
2597
2598 use crate::backend::operation::{get_uppers, is_aggregating_node};
2599
2600 let mut ctx = self.execute_context(turbo_tasks);
2601 let root_tasks = self.root_tasks.lock().clone();
2602
2603 for task_id in root_tasks.into_iter() {
2604 let mut queue = VecDeque::new();
2605 let mut visited = FxHashSet::default();
2606 let mut aggregated_nodes = FxHashSet::default();
2607 let mut collectibles = FxHashMap::default();
2608 let root_task_id = task_id;
2609 visited.insert(task_id);
2610 aggregated_nodes.insert(task_id);
2611 queue.push_back(task_id);
2612 let mut counter = 0;
2613 while let Some(task_id) = queue.pop_front() {
2614 counter += 1;
2615 if counter % 100000 == 0 {
2616 println!(
2617 "queue={}, visited={}, aggregated_nodes={}",
2618 queue.len(),
2619 visited.len(),
2620 aggregated_nodes.len()
2621 );
2622 }
2623 let task = ctx.task(task_id, TaskDataCategory::All);
2624 if idle && !self.is_idle.load(Ordering::Relaxed) {
2625 return;
2626 }
2627
2628 let uppers = get_uppers(&task);
2629 if task_id != root_task_id
2630 && !uppers.iter().any(|upper| aggregated_nodes.contains(upper))
2631 {
2632 panic!(
2633 "Task {} {} doesn't report to any root but is reachable from one (uppers: \
2634 {:?})",
2635 task_id,
2636 ctx.get_task_description(task_id),
2637 uppers
2638 );
2639 }
2640
2641 let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible});
2642 for collectible in aggregated_collectibles {
2643 collectibles
2644 .entry(collectible)
2645 .or_insert_with(|| (false, Vec::new()))
2646 .1
2647 .push(task_id);
2648 }
2649
2650 let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible});
2651 for collectible in own_collectibles {
2652 if let Some((flag, _)) = collectibles.get_mut(&collectible) {
2653 *flag = true
2654 } else {
2655 panic!(
2656 "Task {} has a collectible {:?} that is not in any upper task",
2657 task_id, collectible
2658 );
2659 }
2660 }
2661
2662 let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
2663 let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
2664 .is_some_and(|count| count.get(self.session_id) > 0);
2665 let should_be_in_upper = is_dirty || has_dirty_container;
2666
2667 let aggregation_number = get_aggregation_number(&task);
2668 if is_aggregating_node(aggregation_number) {
2669 aggregated_nodes.insert(task_id);
2670 }
2671 for child_id in iter_many!(task, Child { task } => task) {
2678 if visited.insert(child_id) {
2680 queue.push_back(child_id);
2681 }
2682 }
2683 drop(task);
2684
2685 if should_be_in_upper {
2686 for upper_id in uppers {
2687 let task = ctx.task(task_id, TaskDataCategory::All);
2688 let in_upper = get!(task, AggregatedDirtyContainer { task: task_id })
2689 .is_some_and(|dirty| dirty.get(self.session_id) > 0);
2690 if !in_upper {
2691 panic!(
2692 "Task {} ({}) is dirty, but is not listed in the upper task {} \
2693 ({})",
2694 task_id,
2695 ctx.get_task_description(task_id),
2696 upper_id,
2697 ctx.get_task_description(upper_id)
2698 );
2699 }
2700 }
2701 }
2702 }
2703
2704 for (collectible, (flag, task_ids)) in collectibles {
2705 if !flag {
2706 use std::io::Write;
2707 let mut stdout = stdout().lock();
2708 writeln!(
2709 stdout,
2710 "{:?} that is not emitted in any child task but in these aggregated \
2711 tasks: {:#?}",
2712 collectible,
2713 task_ids
2714 .iter()
2715 .map(|t| format!("{t} {}", ctx.get_task_description(*t)))
2716 .collect::<Vec<_>>()
2717 )
2718 .unwrap();
2719
2720 let task_id = collectible.cell.task;
2721 let mut queue = {
2722 let task = ctx.task(task_id, TaskDataCategory::All);
2723 get_uppers(&task)
2724 };
2725 let mut visited = FxHashSet::default();
2726 for &upper_id in queue.iter() {
2727 visited.insert(upper_id);
2728 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
2729 }
2730 while let Some(task_id) = queue.pop() {
2731 let desc = ctx.get_task_description(task_id);
2732 let task = ctx.task(task_id, TaskDataCategory::All);
2733 let aggregated_collectible =
2734 get!(task, AggregatedCollectible { collectible })
2735 .copied()
2736 .unwrap_or_default();
2737 let uppers = get_uppers(&task);
2738 drop(task);
2739 writeln!(
2740 stdout,
2741 "upper {task_id} {desc} collectible={aggregated_collectible}"
2742 )
2743 .unwrap();
2744 if task_ids.contains(&task_id) {
2745 writeln!(
2746 stdout,
2747 "Task has an upper connection to an aggregated task that doesn't \
2748 reference it. Upper connection is invalid!"
2749 )
2750 .unwrap();
2751 }
2752 for upper_id in uppers {
2753 writeln!(stdout, "{task_id:?} -> {upper_id:?}").unwrap();
2754 if !visited.contains(&upper_id) {
2755 queue.push(upper_id);
2756 }
2757 }
2758 }
2759 panic!("See stdout for more details");
2760 }
2761 }
2762 }
2763 }
2764
2765 fn assert_not_persistent_calling_transient(
2766 &self,
2767 parent_id: TaskId,
2768 child_id: TaskId,
2769 cell_id: Option<CellId>,
2770 ) {
2771 if !parent_id.is_transient() && child_id.is_transient() {
2772 self.panic_persistent_calling_transient(
2773 self.lookup_task_type(parent_id).as_deref(),
2774 self.lookup_task_type(child_id).as_deref(),
2775 cell_id,
2776 );
2777 }
2778 }
2779
2780 fn panic_persistent_calling_transient(
2781 &self,
2782 parent: Option<&CachedTaskType>,
2783 child: Option<&CachedTaskType>,
2784 cell_id: Option<CellId>,
2785 ) {
2786 let transient_reason = if let Some(child) = child {
2787 Cow::Owned(format!(
2788 " The callee is transient because it depends on:\n{}",
2789 self.debug_trace_transient_task(child, cell_id),
2790 ))
2791 } else {
2792 Cow::Borrowed("")
2793 };
2794 panic!(
2795 "Persistent task {} is not allowed to call, read, or connect to transient tasks {}.{}",
2796 parent.map_or("unknown", |t| t.get_name()),
2797 child.map_or("unknown", |t| t.get_name()),
2798 transient_reason,
2799 );
2800 }
2801
2802 fn assert_valid_collectible(&self, task_id: TaskId, collectible: RawVc) {
2803 let RawVc::TaskCell(col_task_id, col_cell_id) = collectible else {
2805 let task_info = if let Some(col_task_ty) = collectible
2807 .try_get_task_id()
2808 .and_then(|t| self.lookup_task_type(t))
2809 {
2810 Cow::Owned(format!(" (return type of {col_task_ty})"))
2811 } else {
2812 Cow::Borrowed("")
2813 };
2814 panic!("Collectible{task_info} must be a ResolvedVc")
2815 };
2816 if col_task_id.is_transient() && !task_id.is_transient() {
2817 let transient_reason = if let Some(col_task_ty) = self.lookup_task_type(col_task_id) {
2818 Cow::Owned(format!(
2819 ". The collectible is transient because it depends on:\n{}",
2820 self.debug_trace_transient_task(&col_task_ty, Some(col_cell_id)),
2821 ))
2822 } else {
2823 Cow::Borrowed("")
2824 };
2825 panic!(
2827 "Collectible is transient, transient collectibles cannot be emitted from \
2828 persistent tasks{transient_reason}",
2829 )
2830 }
2831 }
2832}
2833
2834impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
2835 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2836 self.0.startup(turbo_tasks);
2837 }
2838
2839 fn stopping(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2840 self.0.stopping();
2841 }
2842
2843 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2844 self.0.stop(turbo_tasks);
2845 }
2846
2847 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2848 self.0.idle_start(turbo_tasks);
2849 }
2850
2851 fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2852 self.0.idle_end();
2853 }
2854
2855 fn get_or_create_persistent_task(
2856 &self,
2857 task_type: CachedTaskType,
2858 parent_task: TaskId,
2859 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2860 ) -> TaskId {
2861 self.0
2862 .get_or_create_persistent_task(task_type, parent_task, turbo_tasks)
2863 }
2864
2865 fn get_or_create_transient_task(
2866 &self,
2867 task_type: CachedTaskType,
2868 parent_task: TaskId,
2869 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2870 ) -> TaskId {
2871 self.0
2872 .get_or_create_transient_task(task_type, parent_task, turbo_tasks)
2873 }
2874
2875 fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2876 self.0.invalidate_task(task_id, turbo_tasks);
2877 }
2878
2879 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2880 self.0.invalidate_tasks(tasks, turbo_tasks);
2881 }
2882
2883 fn invalidate_tasks_set(
2884 &self,
2885 tasks: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
2886 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2887 ) {
2888 self.0.invalidate_tasks_set(tasks, turbo_tasks);
2889 }
2890
2891 fn invalidate_serialization(
2892 &self,
2893 task_id: TaskId,
2894 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2895 ) {
2896 self.0.invalidate_serialization(task_id, turbo_tasks);
2897 }
2898
2899 fn get_task_description(&self, task: TaskId) -> String {
2900 self.0.get_task_description(task)
2901 }
2902
2903 type TaskState = ();
2904 fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}
2905
2906 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2907 self.0.task_execution_canceled(task, turbo_tasks)
2908 }
2909
2910 fn try_start_task_execution(
2911 &self,
2912 task_id: TaskId,
2913 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2914 ) -> Option<TaskExecutionSpec<'_>> {
2915 self.0.try_start_task_execution(task_id, turbo_tasks)
2916 }
2917
2918 fn task_execution_result(
2919 &self,
2920 task_id: TaskId,
2921 result: Result<RawVc, TurboTasksExecutionError>,
2922 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2923 ) {
2924 self.0.task_execution_result(task_id, result, turbo_tasks);
2925 }
2926
2927 fn task_execution_completed(
2928 &self,
2929 task_id: TaskId,
2930 _duration: Duration,
2931 _memory_usage: usize,
2932 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
2933 stateful: bool,
2934 has_invalidator: bool,
2935 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2936 ) -> bool {
2937 self.0.task_execution_completed(
2938 task_id,
2939 _duration,
2940 _memory_usage,
2941 cell_counters,
2942 stateful,
2943 has_invalidator,
2944 turbo_tasks,
2945 )
2946 }
2947
2948 type BackendJob = TurboTasksBackendJob;
2949
2950 fn run_backend_job<'a>(
2951 &'a self,
2952 job: Self::BackendJob,
2953 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
2954 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2955 self.0.run_backend_job(job, turbo_tasks)
2956 }
2957
2958 fn try_read_task_output(
2959 &self,
2960 task_id: TaskId,
2961 reader: TaskId,
2962 consistency: ReadConsistency,
2963 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2964 ) -> Result<Result<RawVc, EventListener>> {
2965 self.0
2966 .try_read_task_output(task_id, Some(reader), consistency, turbo_tasks)
2967 }
2968
2969 fn try_read_task_output_untracked(
2970 &self,
2971 task_id: TaskId,
2972 consistency: ReadConsistency,
2973 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2974 ) -> Result<Result<RawVc, EventListener>> {
2975 self.0
2976 .try_read_task_output(task_id, None, consistency, turbo_tasks)
2977 }
2978
2979 fn try_read_task_cell(
2980 &self,
2981 task_id: TaskId,
2982 cell: CellId,
2983 reader: TaskId,
2984 options: ReadCellOptions,
2985 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2986 ) -> Result<Result<TypedCellContent, EventListener>> {
2987 self.0
2988 .try_read_task_cell(task_id, Some(reader), cell, options, turbo_tasks)
2989 }
2990
2991 fn try_read_task_cell_untracked(
2992 &self,
2993 task_id: TaskId,
2994 cell: CellId,
2995 options: ReadCellOptions,
2996 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2997 ) -> Result<Result<TypedCellContent, EventListener>> {
2998 self.0
2999 .try_read_task_cell(task_id, None, cell, options, turbo_tasks)
3000 }
3001
3002 fn try_read_own_task_cell_untracked(
3003 &self,
3004 task_id: TaskId,
3005 cell: CellId,
3006 options: ReadCellOptions,
3007 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3008 ) -> Result<TypedCellContent> {
3009 self.0
3010 .try_read_own_task_cell_untracked(task_id, cell, options, turbo_tasks)
3011 }
3012
3013 fn read_task_collectibles(
3014 &self,
3015 task_id: TaskId,
3016 collectible_type: TraitTypeId,
3017 reader: TaskId,
3018 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3019 ) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
3020 self.0
3021 .read_task_collectibles(task_id, collectible_type, reader, turbo_tasks)
3022 }
3023
3024 fn emit_collectible(
3025 &self,
3026 collectible_type: TraitTypeId,
3027 collectible: RawVc,
3028 task_id: TaskId,
3029 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3030 ) {
3031 self.0
3032 .emit_collectible(collectible_type, collectible, task_id, turbo_tasks)
3033 }
3034
3035 fn unemit_collectible(
3036 &self,
3037 collectible_type: TraitTypeId,
3038 collectible: RawVc,
3039 count: u32,
3040 task_id: TaskId,
3041 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3042 ) {
3043 self.0
3044 .unemit_collectible(collectible_type, collectible, count, task_id, turbo_tasks)
3045 }
3046
3047 fn update_task_cell(
3048 &self,
3049 task_id: TaskId,
3050 cell: CellId,
3051 content: CellContent,
3052 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3053 ) {
3054 self.0.update_task_cell(task_id, cell, content, turbo_tasks);
3055 }
3056
3057 fn mark_own_task_as_finished(
3058 &self,
3059 task_id: TaskId,
3060 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3061 ) {
3062 self.0.mark_own_task_as_finished(task_id, turbo_tasks);
3063 }
3064
3065 fn set_own_task_aggregation_number(
3066 &self,
3067 task: TaskId,
3068 aggregation_number: u32,
3069 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3070 ) {
3071 self.0
3072 .set_own_task_aggregation_number(task, aggregation_number, turbo_tasks);
3073 }
3074
3075 fn mark_own_task_as_session_dependent(
3076 &self,
3077 task: TaskId,
3078 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3079 ) {
3080 self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3081 }
3082
3083 fn connect_task(
3084 &self,
3085 task: TaskId,
3086 parent_task: TaskId,
3087 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3088 ) {
3089 self.0.connect_task(task, parent_task, turbo_tasks);
3090 }
3091
3092 fn create_transient_task(
3093 &self,
3094 task_type: TransientTaskType,
3095 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3096 ) -> TaskId {
3097 self.0.create_transient_task(task_type)
3098 }
3099
3100 fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
3101 self.0.dispose_root_task(task_id, turbo_tasks);
3102 }
3103
3104 fn task_statistics(&self) -> &TaskStatisticsApi {
3105 &self.0.task_statistics
3106 }
3107}
3108
3109enum DebugTraceTransientTask {
3110 Cached {
3111 task_name: &'static str,
3112 cell_type_id: Option<ValueTypeId>,
3113 cause_self: Option<Box<DebugTraceTransientTask>>,
3114 cause_args: Vec<DebugTraceTransientTask>,
3115 },
3116 Collapsed {
3118 task_name: &'static str,
3119 cell_type_id: Option<ValueTypeId>,
3120 },
3121 Uncached {
3122 cell_type_id: Option<ValueTypeId>,
3123 },
3124}
3125
3126impl DebugTraceTransientTask {
3127 fn fmt_indented(&self, f: &mut fmt::Formatter<'_>, level: usize) -> fmt::Result {
3128 let indent = " ".repeat(level);
3129 f.write_str(&indent)?;
3130
3131 fn fmt_cell_type_id(
3132 f: &mut fmt::Formatter<'_>,
3133 cell_type_id: Option<ValueTypeId>,
3134 ) -> fmt::Result {
3135 if let Some(ty) = cell_type_id {
3136 write!(f, " (read cell of type {})", get_value_type_global_name(ty))
3137 } else {
3138 Ok(())
3139 }
3140 }
3141
3142 match self {
3144 Self::Cached {
3145 task_name,
3146 cell_type_id,
3147 ..
3148 }
3149 | Self::Collapsed {
3150 task_name,
3151 cell_type_id,
3152 ..
3153 } => {
3154 f.write_str(task_name)?;
3155 fmt_cell_type_id(f, *cell_type_id)?;
3156 if matches!(self, Self::Collapsed { .. }) {
3157 f.write_str(" (collapsed)")?;
3158 }
3159 }
3160 Self::Uncached { cell_type_id } => {
3161 f.write_str("unknown transient task")?;
3162 fmt_cell_type_id(f, *cell_type_id)?;
3163 }
3164 }
3165 f.write_char('\n')?;
3166
3167 if let Self::Cached {
3169 cause_self,
3170 cause_args,
3171 ..
3172 } = self
3173 {
3174 if let Some(c) = cause_self {
3175 writeln!(f, "{indent} self:")?;
3176 c.fmt_indented(f, level + 1)?;
3177 }
3178 if !cause_args.is_empty() {
3179 writeln!(f, "{indent} args:")?;
3180 for c in cause_args {
3181 c.fmt_indented(f, level + 1)?;
3182 }
3183 }
3184 }
3185 Ok(())
3186 }
3187}
3188
3189impl fmt::Display for DebugTraceTransientTask {
3190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3191 self.fmt_indented(f, 0)
3192 }
3193}
3194
3195fn far_future() -> Instant {
3197 Instant::now() + Duration::from_secs(86400 * 365 * 30)
3202}