1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 pin::Pin,
8 sync::{
9 Arc, Mutex, RwLock, Weak,
10 atomic::{AtomicBool, AtomicUsize, Ordering},
11 },
12 time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25
26use crate::{
27 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
28 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
29 VcValueTrait, VcValueType,
30 backend::{
31 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
32 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
33 },
34 capture_future::CaptureFuture,
35 event::{Event, EventListener},
36 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
37 id_factory::IdFactoryWithReuse,
38 keyed::KeyedEq,
39 macro_helpers::NativeFunction,
40 magic_any::MagicAny,
41 message_queue::{CompilationEvent, CompilationEventQueue},
42 priority_runner::{Executor, JoinHandle, PriorityRunner},
43 raw_vc::{CellId, RawVc},
44 registry,
45 serialization_invalidation::SerializationInvalidator,
46 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactory, StaticOrArc},
50};
51
52pub trait TurboTasksCallApi: Sync + Send {
55 fn dynamic_call(
58 &self,
59 native_fn: &'static NativeFunction,
60 this: Option<RawVc>,
61 arg: Box<dyn MagicAny>,
62 persistence: TaskPersistence,
63 ) -> RawVc;
64 fn native_call(
67 &self,
68 native_fn: &'static NativeFunction,
69 this: Option<RawVc>,
70 arg: Box<dyn MagicAny>,
71 persistence: TaskPersistence,
72 ) -> RawVc;
73 fn trait_call(
76 &self,
77 trait_method: &'static TraitMethod,
78 this: RawVc,
79 arg: Box<dyn MagicAny>,
80 persistence: TaskPersistence,
81 ) -> RawVc;
82
83 fn run(
84 &self,
85 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
86 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
87 fn run_once(
88 &self,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91 fn run_once_with_reason(
92 &self,
93 reason: StaticOrArc<dyn InvalidationReason>,
94 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
95 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
96 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
97
98 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
100
101 fn get_task_name(&self, task: TaskId) -> String;
103}
104
105pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
111 fn invalidate(&self, task: TaskId);
112 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
113
114 fn invalidate_serialization(&self, task: TaskId);
115
116 fn try_read_task_output(
117 &self,
118 task: TaskId,
119 options: ReadOutputOptions,
120 ) -> Result<Result<RawVc, EventListener>>;
121
122 fn try_read_task_cell(
123 &self,
124 task: TaskId,
125 index: CellId,
126 options: ReadCellOptions,
127 ) -> Result<Result<TypedCellContent, EventListener>>;
128
129 fn try_read_local_output(
144 &self,
145 execution_id: ExecutionId,
146 local_task_id: LocalTaskId,
147 ) -> Result<Result<RawVc, EventListener>>;
148
149 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
150
151 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
152 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
153 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
154
155 fn try_read_own_task_cell(
158 &self,
159 current_task: TaskId,
160 index: CellId,
161 options: ReadCellOptions,
162 ) -> Result<TypedCellContent>;
163
164 fn read_own_task_cell(
165 &self,
166 task: TaskId,
167 index: CellId,
168 options: ReadCellOptions,
169 ) -> Result<TypedCellContent>;
170 fn update_own_task_cell(
171 &self,
172 task: TaskId,
173 index: CellId,
174 is_serializable_cell_content: bool,
175 content: CellContent,
176 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
177 verification_mode: VerificationMode,
178 );
179 fn mark_own_task_as_finished(&self, task: TaskId);
180 fn mark_own_task_as_session_dependent(&self, task: TaskId);
181
182 fn connect_task(&self, task: TaskId);
183
184 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
189
190 fn task_statistics(&self) -> &TaskStatisticsApi;
191
192 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
193
194 fn subscribe_to_compilation_events(
195 &self,
196 event_types: Option<Vec<String>>,
197 ) -> Receiver<Arc<dyn CompilationEvent>>;
198
199 fn is_tracking_dependencies(&self) -> bool;
201}
202
203pub struct Unused<T> {
205 inner: T,
206}
207
208impl<T> Unused<T> {
209 pub unsafe fn new_unchecked(inner: T) -> Self {
215 Self { inner }
216 }
217
218 pub unsafe fn get_unchecked(&self) -> &T {
224 &self.inner
225 }
226
227 pub fn into(self) -> T {
229 self.inner
230 }
231}
232
233pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
235 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
236
237 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
238 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
239 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
243 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
247
248 fn schedule(&self, task: TaskId, priority: TaskPriority);
250
251 fn get_current_task_priority(&self) -> TaskPriority;
253
254 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
256
257 fn schedule_backend_background_job(&self, job: B::BackendJob);
262
263 fn program_duration_until(&self, instant: Instant) -> Duration;
265
266 fn is_idle(&self) -> bool;
268
269 fn backend(&self) -> &B;
271}
272
273#[allow(clippy::manual_non_exhaustive)]
274pub struct UpdateInfo {
275 pub duration: Duration,
276 pub tasks: usize,
277 pub reasons: InvalidationReasonSet,
278 #[allow(dead_code)]
279 placeholder_for_future_fields: (),
280}
281
282#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
283pub enum TaskPersistence {
284 Persistent,
286
287 Transient,
294}
295
296impl Display for TaskPersistence {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 match self {
299 TaskPersistence::Persistent => write!(f, "persistent"),
300 TaskPersistence::Transient => write!(f, "transient"),
301 }
302 }
303}
304
305#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
306pub enum ReadConsistency {
307 #[default]
310 Eventual,
311 Strong,
316}
317
318#[derive(Clone, Copy, Debug, Eq, PartialEq)]
319pub enum ReadCellTracking {
320 Tracked {
322 key: Option<u64>,
324 },
325 TrackOnlyError,
330 Untracked,
335}
336
337impl ReadCellTracking {
338 pub fn should_track(&self, is_err: bool) -> bool {
339 match self {
340 ReadCellTracking::Tracked { .. } => true,
341 ReadCellTracking::TrackOnlyError => is_err,
342 ReadCellTracking::Untracked => false,
343 }
344 }
345
346 pub fn key(&self) -> Option<u64> {
347 match self {
348 ReadCellTracking::Tracked { key } => *key,
349 ReadCellTracking::TrackOnlyError => None,
350 ReadCellTracking::Untracked => None,
351 }
352 }
353}
354
355impl Default for ReadCellTracking {
356 fn default() -> Self {
357 ReadCellTracking::Tracked { key: None }
358 }
359}
360
361impl Display for ReadCellTracking {
362 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 match self {
364 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
365 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
366 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
367 ReadCellTracking::Untracked => write!(f, "untracked"),
368 }
369 }
370}
371
372#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
373pub enum ReadTracking {
374 #[default]
376 Tracked,
377 TrackOnlyError,
382 Untracked,
387}
388
389impl ReadTracking {
390 pub fn should_track(&self, is_err: bool) -> bool {
391 match self {
392 ReadTracking::Tracked => true,
393 ReadTracking::TrackOnlyError => is_err,
394 ReadTracking::Untracked => false,
395 }
396 }
397}
398
399impl Display for ReadTracking {
400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401 match self {
402 ReadTracking::Tracked => write!(f, "tracked"),
403 ReadTracking::TrackOnlyError => write!(f, "track only error"),
404 ReadTracking::Untracked => write!(f, "untracked"),
405 }
406 }
407}
408
409#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
410pub enum TaskPriority {
411 #[default]
412 Initial,
413 Invalidation {
414 priority: Reverse<u32>,
415 },
416}
417
418impl TaskPriority {
419 pub fn invalidation(priority: u32) -> Self {
420 Self::Invalidation {
421 priority: Reverse(priority),
422 }
423 }
424
425 pub fn initial() -> Self {
426 Self::Initial
427 }
428
429 pub fn leaf() -> Self {
430 Self::Invalidation {
431 priority: Reverse(0),
432 }
433 }
434
435 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
436 match self {
437 TaskPriority::Initial => parent_priority,
438 TaskPriority::Invalidation { priority } => {
439 if let TaskPriority::Invalidation {
440 priority: parent_priority,
441 } = parent_priority
442 && priority.0 < parent_priority.0
443 {
444 Self::Invalidation {
445 priority: Reverse(parent_priority.0.saturating_add(1)),
446 }
447 } else {
448 *self
449 }
450 }
451 }
452 }
453}
454
455impl Display for TaskPriority {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 match self {
458 TaskPriority::Initial => write!(f, "initial"),
459 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
460 }
461 }
462}
463
464enum ScheduledTask {
465 Task {
466 task_id: TaskId,
467 span: Span,
468 },
469 LocalTask {
470 ty: LocalTaskSpec,
471 persistence: TaskPersistence,
472 local_task_id: LocalTaskId,
473 global_task_state: Arc<RwLock<CurrentTaskState>>,
474 span: Span,
475 },
476}
477
478pub struct TurboTasks<B: Backend + 'static> {
479 this: Weak<Self>,
480 backend: B,
481 task_id_factory: IdFactoryWithReuse<TaskId>,
482 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
483 execution_id_factory: IdFactory<ExecutionId>,
484 stopped: AtomicBool,
485 currently_scheduled_foreground_jobs: AtomicUsize,
486 currently_scheduled_background_jobs: AtomicUsize,
487 scheduled_tasks: AtomicUsize,
488 priority_runner:
489 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
490 start: Mutex<Option<Instant>>,
491 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
492 event_foreground_start: Event,
494 event_foreground_done: Event,
497 event_background_done: Event,
499 program_start: Instant,
500 compilation_events: CompilationEventQueue,
501}
502
503type LocalTaskTracker = Option<
504 FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
505>;
506
507struct CurrentTaskState {
516 task_id: Option<TaskId>,
517 execution_id: ExecutionId,
518 priority: TaskPriority,
519
520 #[cfg(feature = "verify_determinism")]
523 stateful: bool,
524
525 has_invalidator: bool,
527
528 in_top_level_task: bool,
531
532 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
537
538 local_tasks: Vec<LocalTask>,
540
541 local_task_tracker: LocalTaskTracker,
544}
545
546impl CurrentTaskState {
547 fn new(
548 task_id: TaskId,
549 execution_id: ExecutionId,
550 priority: TaskPriority,
551 in_top_level_task: bool,
552 ) -> Self {
553 Self {
554 task_id: Some(task_id),
555 execution_id,
556 priority,
557 #[cfg(feature = "verify_determinism")]
558 stateful: false,
559 has_invalidator: false,
560 in_top_level_task,
561 cell_counters: Some(AutoMap::default()),
562 local_tasks: Vec::new(),
563 local_task_tracker: None,
564 }
565 }
566
567 fn new_temporary(
568 execution_id: ExecutionId,
569 priority: TaskPriority,
570 in_top_level_task: bool,
571 ) -> Self {
572 Self {
573 task_id: None,
574 execution_id,
575 priority,
576 #[cfg(feature = "verify_determinism")]
577 stateful: false,
578 has_invalidator: false,
579 in_top_level_task,
580 cell_counters: None,
581 local_tasks: Vec::new(),
582 local_task_tracker: None,
583 }
584 }
585
586 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
587 if self.execution_id != expected_execution_id {
588 panic!(
589 "Local tasks can only be scheduled/awaited within the same execution of the \
590 parent task that created them"
591 );
592 }
593 }
594
595 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
596 self.local_tasks.push(local_task);
597 if cfg!(debug_assertions) {
599 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
600 } else {
601 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
602 }
603 }
604
605 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
606 &self.local_tasks[(*local_task_id as usize) - 1]
608 }
609
610 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
611 &mut self.local_tasks[(*local_task_id as usize) - 1]
612 }
613}
614
615task_local! {
617 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
619
620 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
621
622 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
627}
628
629impl<B: Backend + 'static> TurboTasks<B> {
630 pub fn new(backend: B) -> Arc<Self> {
636 let task_id_factory = IdFactoryWithReuse::new(
637 TaskId::MIN,
638 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
639 );
640 let transient_task_id_factory =
641 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
642 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
643 let this = Arc::new_cyclic(|this| Self {
644 this: this.clone(),
645 backend,
646 task_id_factory,
647 transient_task_id_factory,
648 execution_id_factory,
649 stopped: AtomicBool::new(false),
650 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
651 currently_scheduled_background_jobs: AtomicUsize::new(0),
652 scheduled_tasks: AtomicUsize::new(0),
653 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
654 start: Default::default(),
655 aggregated_update: Default::default(),
656 event_foreground_done: Event::new(|| {
657 || "TurboTasks::event_foreground_done".to_string()
658 }),
659 event_foreground_start: Event::new(|| {
660 || "TurboTasks::event_foreground_start".to_string()
661 }),
662 event_background_done: Event::new(|| {
663 || "TurboTasks::event_background_done".to_string()
664 }),
665 program_start: Instant::now(),
666 compilation_events: CompilationEventQueue::default(),
667 });
668 this.backend.startup(&*this);
669 this
670 }
671
672 pub fn pin(&self) -> Arc<Self> {
673 self.this.upgrade().unwrap()
674 }
675
676 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
678 where
679 T: ?Sized,
680 F: Fn() -> Fut + Send + Sync + Clone + 'static,
681 Fut: Future<Output = Result<Vc<T>>> + Send,
682 {
683 let id = self.backend.create_transient_task(
684 TransientTaskType::Root(Box::new(move || {
685 let functor = functor.clone();
686 Box::pin(async move {
687 let raw_vc = functor().await?.node;
688 raw_vc.to_non_local().await
689 })
690 })),
691 self,
692 );
693 self.schedule(id, TaskPriority::initial());
694 id
695 }
696
697 pub fn dispose_root_task(&self, task_id: TaskId) {
698 self.backend.dispose_root_task(task_id, self);
699 }
700
701 #[track_caller]
705 fn spawn_once_task<T, Fut>(&self, future: Fut)
706 where
707 T: ?Sized,
708 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
709 {
710 let id = self.backend.create_transient_task(
711 TransientTaskType::Once(Box::pin(async move {
712 let raw_vc = future.await?.node;
713 raw_vc.to_non_local().await
714 })),
715 self,
716 );
717 self.schedule(id, TaskPriority::initial());
718 }
719
720 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
721 &self,
722 future: impl Future<Output = Result<T>> + Send + 'static,
723 ) -> Result<T> {
724 let (tx, rx) = tokio::sync::oneshot::channel();
725 self.spawn_once_task(async move {
726 mark_top_level_task();
727 let result = future.await;
728 tx.send(result)
729 .map_err(|_| anyhow!("unable to send result"))?;
730 Ok(Completion::new())
731 });
732
733 rx.await?
734 }
735
736 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
737 pub async fn run<T: TraceRawVcs + Send + 'static>(
738 &self,
739 future: impl Future<Output = Result<T>> + Send + 'static,
740 ) -> Result<T, TurboTasksExecutionError> {
741 self.begin_foreground_job();
742 let execution_id = self.execution_id_factory.wrapping_get();
744 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
745 execution_id,
746 TaskPriority::initial(),
747 true, )));
749
750 let result = TURBO_TASKS
751 .scope(
752 self.pin(),
753 CURRENT_TASK_STATE.scope(current_task_state, async {
754 let result = CaptureFuture::new(future).await;
755
756 wait_for_local_tasks().await;
758
759 match result {
760 Ok(Ok(raw_vc)) => Ok(raw_vc),
761 Ok(Err(err)) => Err(err.into()),
762 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
763 }
764 }),
765 )
766 .await;
767 self.finish_foreground_job();
768 result
769 }
770
771 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
772 let this = self.pin();
773 tokio::spawn(async move {
774 this.pin()
775 .run_once(async move {
776 this.finish_foreground_job();
777 future.await;
778 this.begin_foreground_job();
779 Ok(())
780 })
781 .await
782 .unwrap()
783 });
784 }
785
786 pub(crate) fn native_call(
787 &self,
788 native_fn: &'static NativeFunction,
789 this: Option<RawVc>,
790 arg: Box<dyn MagicAny>,
791 persistence: TaskPersistence,
792 ) -> RawVc {
793 let task_type = CachedTaskType {
794 native_fn,
795 this,
796 arg,
797 };
798 RawVc::TaskOutput(match persistence {
799 TaskPersistence::Transient => self.backend.get_or_create_transient_task(
800 task_type,
801 current_task_if_available("turbo_function calls"),
802 self,
803 ),
804 TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
805 task_type,
806 current_task_if_available("turbo_function calls"),
807 self,
808 ),
809 })
810 }
811
812 pub fn dynamic_call(
813 &self,
814 native_fn: &'static NativeFunction,
815 this: Option<RawVc>,
816 arg: Box<dyn MagicAny>,
817 persistence: TaskPersistence,
818 ) -> RawVc {
819 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
820 return self.native_call(native_fn, this, arg, persistence);
821 }
822 let task_type = LocalTaskSpec {
823 task_type: LocalTaskType::ResolveNative { native_fn },
824 this,
825 arg,
826 };
827 self.schedule_local_task(task_type, persistence)
828 }
829
830 pub fn trait_call(
831 &self,
832 trait_method: &'static TraitMethod,
833 this: RawVc,
834 arg: Box<dyn MagicAny>,
835 persistence: TaskPersistence,
836 ) -> RawVc {
837 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
841 match registry::get_value_type(type_id).get_trait_method(trait_method) {
842 Some(native_fn) => {
843 let arg = native_fn.arg_meta.filter_owned(arg);
844 return self.dynamic_call(native_fn, Some(this), arg, persistence);
845 }
846 None => {
847 }
851 }
852 }
853
854 let task_type = LocalTaskSpec {
856 task_type: LocalTaskType::ResolveTrait { trait_method },
857 this: Some(this),
858 arg,
859 };
860
861 self.schedule_local_task(task_type, persistence)
862 }
863
864 #[track_caller]
865 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
866 self.begin_foreground_job();
867 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
868
869 self.priority_runner.schedule(
870 &self.pin(),
871 ScheduledTask::Task {
872 task_id,
873 span: Span::current(),
874 },
875 priority,
876 );
877 }
878
879 fn schedule_local_task(
880 &self,
881 ty: LocalTaskSpec,
882 persistence: TaskPersistence,
884 ) -> RawVc {
885 let task_type = ty.task_type;
886 let (global_task_state, execution_id, priority, local_task_id) =
887 CURRENT_TASK_STATE.with(|gts| {
888 let mut gts_write = gts.write().unwrap();
889 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
890 done_event: Event::new(move || {
891 move || format!("LocalTask({task_type})::done_event")
892 }),
893 });
894 (
895 Arc::clone(gts),
896 gts_write.execution_id,
897 gts_write.priority,
898 local_task_id,
899 )
900 });
901
902 let future = self.priority_runner.schedule_with_join_handle(
903 &self.pin(),
904 ScheduledTask::LocalTask {
905 ty,
906 persistence,
907 local_task_id,
908 global_task_state: global_task_state.clone(),
909 span: Span::current(),
910 },
911 priority,
912 );
913 global_task_state
914 .write()
915 .unwrap()
916 .local_task_tracker
917 .get_or_insert_default()
918 .push(Either::Left(future));
919
920 RawVc::LocalOutput(execution_id, local_task_id, persistence)
921 }
922
923 fn begin_foreground_job(&self) {
924 if self
925 .currently_scheduled_foreground_jobs
926 .fetch_add(1, Ordering::AcqRel)
927 == 0
928 {
929 *self.start.lock().unwrap() = Some(Instant::now());
930 self.event_foreground_start.notify(usize::MAX);
931 self.backend.idle_end(self);
932 }
933 }
934
935 fn finish_foreground_job(&self) {
936 if self
937 .currently_scheduled_foreground_jobs
938 .fetch_sub(1, Ordering::AcqRel)
939 == 1
940 {
941 self.backend.idle_start(self);
942 let total = self.scheduled_tasks.load(Ordering::Acquire);
945 self.scheduled_tasks.store(0, Ordering::Release);
946 if let Some(start) = *self.start.lock().unwrap() {
947 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
948 if let Some(update) = update.as_mut() {
949 update.0 += start.elapsed();
950 update.1 += total;
951 } else {
952 *update = Some((start.elapsed(), total));
953 }
954 }
955 self.event_foreground_done.notify(usize::MAX);
956 }
957 }
958
959 fn begin_background_job(&self) {
960 self.currently_scheduled_background_jobs
961 .fetch_add(1, Ordering::Relaxed);
962 }
963
964 fn finish_background_job(&self) {
965 if self
966 .currently_scheduled_background_jobs
967 .fetch_sub(1, Ordering::Relaxed)
968 == 1
969 {
970 self.event_background_done.notify(usize::MAX);
971 }
972 }
973
974 pub fn get_in_progress_count(&self) -> usize {
975 self.currently_scheduled_foreground_jobs
976 .load(Ordering::Acquire)
977 }
978
979 pub async fn wait_task_completion(
991 &self,
992 id: TaskId,
993 consistency: ReadConsistency,
994 ) -> Result<()> {
995 read_task_output(
996 self,
997 id,
998 ReadOutputOptions {
999 tracking: ReadTracking::Untracked,
1001 consistency,
1002 },
1003 )
1004 .await?;
1005 Ok(())
1006 }
1007
1008 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
1011 self.aggregated_update_info(aggregation, Duration::MAX)
1012 .await
1013 .unwrap()
1014 }
1015
1016 pub async fn aggregated_update_info(
1020 &self,
1021 aggregation: Duration,
1022 timeout: Duration,
1023 ) -> Option<UpdateInfo> {
1024 let listener = self
1025 .event_foreground_done
1026 .listen_with_note(|| || "wait for update info".to_string());
1027 let wait_for_finish = {
1028 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1029 if aggregation.is_zero() {
1030 if let Some((duration, tasks)) = update.take() {
1031 return Some(UpdateInfo {
1032 duration,
1033 tasks,
1034 reasons: take(reason_set),
1035 placeholder_for_future_fields: (),
1036 });
1037 } else {
1038 true
1039 }
1040 } else {
1041 update.is_none()
1042 }
1043 };
1044 if wait_for_finish {
1045 if timeout == Duration::MAX {
1046 listener.await;
1048 } else {
1049 let start_listener = self
1051 .event_foreground_start
1052 .listen_with_note(|| || "wait for update info".to_string());
1053 if self
1054 .currently_scheduled_foreground_jobs
1055 .load(Ordering::Acquire)
1056 == 0
1057 {
1058 start_listener.await;
1059 } else {
1060 drop(start_listener);
1061 }
1062 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1063 return None;
1065 }
1066 }
1067 }
1068 if !aggregation.is_zero() {
1069 loop {
1070 select! {
1071 () = tokio::time::sleep(aggregation) => {
1072 break;
1073 }
1074 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1075 }
1077 }
1078 }
1079 }
1080 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1081 if let Some((duration, tasks)) = update.take() {
1082 Some(UpdateInfo {
1083 duration,
1084 tasks,
1085 reasons: take(reason_set),
1086 placeholder_for_future_fields: (),
1087 })
1088 } else {
1089 panic!("aggregated_update_info must not called concurrently")
1090 }
1091 }
1092
1093 pub async fn wait_background_done(&self) {
1094 let listener = self.event_background_done.listen();
1095 if self
1096 .currently_scheduled_background_jobs
1097 .load(Ordering::Acquire)
1098 != 0
1099 {
1100 listener.await;
1101 }
1102 }
1103
1104 pub async fn stop_and_wait(&self) {
1105 turbo_tasks_future_scope(self.pin(), async move {
1106 self.backend.stopping(self);
1107 self.stopped.store(true, Ordering::Release);
1108 {
1109 let listener = self
1110 .event_foreground_done
1111 .listen_with_note(|| || "wait for stop".to_string());
1112 if self
1113 .currently_scheduled_foreground_jobs
1114 .load(Ordering::Acquire)
1115 != 0
1116 {
1117 listener.await;
1118 }
1119 }
1120 {
1121 let listener = self.event_background_done.listen();
1122 if self
1123 .currently_scheduled_background_jobs
1124 .load(Ordering::Acquire)
1125 != 0
1126 {
1127 listener.await;
1128 }
1129 }
1130 self.backend.stop(self);
1131 })
1132 .await;
1133 }
1134
1135 #[track_caller]
1136 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1137 where
1138 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1139 T::CallOnceFuture: Send,
1140 {
1141 let mut this = self.pin();
1142 this.begin_foreground_job();
1143 tokio::spawn(
1144 TURBO_TASKS
1145 .scope(this.clone(), async move {
1146 if !this.stopped.load(Ordering::Acquire) {
1147 this = func(this.clone()).await;
1148 }
1149 this.finish_foreground_job();
1150 })
1151 .in_current_span(),
1152 );
1153 }
1154
1155 #[track_caller]
1156 pub(crate) fn schedule_background_job<T>(&self, func: T)
1157 where
1158 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1159 T::CallOnceFuture: Send,
1160 {
1161 let mut this = self.pin();
1162 self.begin_background_job();
1163 tokio::spawn(
1164 TURBO_TASKS
1165 .scope(this.clone(), async move {
1166 if !this.stopped.load(Ordering::Acquire) {
1167 this = func(this).await;
1168 }
1169 this.finish_background_job();
1170 })
1171 .in_current_span(),
1172 );
1173 }
1174
1175 fn finish_current_task_state(&self) -> FinishedTaskState {
1176 CURRENT_TASK_STATE.with(|cell| {
1177 let current_task_state = &*cell.write().unwrap();
1178 FinishedTaskState {
1179 #[cfg(feature = "verify_determinism")]
1180 stateful: current_task_state.stateful,
1181 has_invalidator: current_task_state.has_invalidator,
1182 }
1183 })
1184 }
1185
1186 pub fn backend(&self) -> &B {
1187 &self.backend
1188 }
1189}
1190
1191struct TurboTasksExecutor;
1192
1193impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1194 type Future = impl Future<Output = ()> + Send + 'static;
1195
1196 fn execute(
1197 &self,
1198 this: &Arc<TurboTasks<B>>,
1199 scheduled_task: ScheduledTask,
1200 priority: TaskPriority,
1201 ) -> Self::Future {
1202 match scheduled_task {
1203 ScheduledTask::Task { task_id, span } => {
1204 let this2 = this.clone();
1205 let this = this.clone();
1206 let future = async move {
1207 let mut schedule_again = true;
1208 while schedule_again {
1209 let execution_id = this.execution_id_factory.wrapping_get();
1212 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1213 task_id,
1214 execution_id,
1215 priority,
1216 false, )));
1218 let single_execution_future = async {
1219 if this.stopped.load(Ordering::Acquire) {
1220 this.backend.task_execution_canceled(task_id, &*this);
1221 return false;
1222 }
1223
1224 let Some(TaskExecutionSpec { future, span }) = this
1225 .backend
1226 .try_start_task_execution(task_id, priority, &*this)
1227 else {
1228 return false;
1229 };
1230
1231 async {
1232 let result = CaptureFuture::new(future).await;
1233
1234 wait_for_local_tasks().await;
1236
1237 let result = match result {
1238 Ok(Ok(raw_vc)) => Ok(raw_vc),
1239 Ok(Err(err)) => Err(err.into()),
1240 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1241 };
1242
1243 let finihed_state = this.finish_current_task_state();
1244 let cell_counters = CURRENT_TASK_STATE
1245 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1246 this.backend.task_execution_completed(
1247 task_id,
1248 result,
1249 &cell_counters,
1250 #[cfg(feature = "verify_determinism")]
1251 finihed_state.stateful,
1252 finihed_state.has_invalidator,
1253 &*this,
1254 )
1255 }
1256 .instrument(span)
1257 .await
1258 };
1259 schedule_again = CURRENT_TASK_STATE
1260 .scope(current_task_state, single_execution_future)
1261 .await;
1262 }
1263 this.finish_foreground_job();
1264 };
1265
1266 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1267 }
1268 ScheduledTask::LocalTask {
1269 ty,
1270 persistence,
1271 local_task_id,
1272 global_task_state,
1273 span,
1274 } => {
1275 let this2 = this.clone();
1276 let this = this.clone();
1277 let task_type = ty.task_type;
1278 let future = async move {
1279 let span = match &ty.task_type {
1280 LocalTaskType::ResolveNative { native_fn } => {
1281 native_fn.resolve_span(priority)
1282 }
1283 LocalTaskType::ResolveTrait { trait_method } => {
1284 trait_method.resolve_span(priority)
1285 }
1286 };
1287 async move {
1288 let result = match ty.task_type {
1289 LocalTaskType::ResolveNative { native_fn } => {
1290 LocalTaskType::run_resolve_native(
1291 native_fn,
1292 ty.this,
1293 &*ty.arg,
1294 persistence,
1295 this,
1296 )
1297 .await
1298 }
1299 LocalTaskType::ResolveTrait { trait_method } => {
1300 LocalTaskType::run_resolve_trait(
1301 trait_method,
1302 ty.this.unwrap(),
1303 &*ty.arg,
1304 persistence,
1305 this,
1306 )
1307 .await
1308 }
1309 };
1310
1311 let output = match result {
1312 Ok(raw_vc) => OutputContent::Link(raw_vc),
1313 Err(err) => OutputContent::Error(
1314 TurboTasksExecutionError::from(err)
1315 .with_local_task_context(task_type.to_string()),
1316 ),
1317 };
1318
1319 let local_task = LocalTask::Done { output };
1320
1321 let done_event = CURRENT_TASK_STATE.with(move |gts| {
1322 let mut gts_write = gts.write().unwrap();
1323 let scheduled_task = std::mem::replace(
1324 gts_write.get_mut_local_task(local_task_id),
1325 local_task,
1326 );
1327 let LocalTask::Scheduled { done_event } = scheduled_task else {
1328 panic!("local task finished, but was not in the scheduled state?");
1329 };
1330 done_event
1331 });
1332 done_event.notify(usize::MAX)
1333 }
1334 .instrument(span)
1335 .await
1336 };
1337 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1338
1339 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1340 }
1341 }
1342 }
1343}
1344
1345struct FinishedTaskState {
1346 #[cfg(feature = "verify_determinism")]
1349 stateful: bool,
1350
1351 has_invalidator: bool,
1353}
1354
1355impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1356 fn dynamic_call(
1357 &self,
1358 native_fn: &'static NativeFunction,
1359 this: Option<RawVc>,
1360 arg: Box<dyn MagicAny>,
1361 persistence: TaskPersistence,
1362 ) -> RawVc {
1363 self.dynamic_call(native_fn, this, arg, persistence)
1364 }
1365 fn native_call(
1366 &self,
1367 native_fn: &'static NativeFunction,
1368 this: Option<RawVc>,
1369 arg: Box<dyn MagicAny>,
1370 persistence: TaskPersistence,
1371 ) -> RawVc {
1372 self.native_call(native_fn, this, arg, persistence)
1373 }
1374 fn trait_call(
1375 &self,
1376 trait_method: &'static TraitMethod,
1377 this: RawVc,
1378 arg: Box<dyn MagicAny>,
1379 persistence: TaskPersistence,
1380 ) -> RawVc {
1381 self.trait_call(trait_method, this, arg, persistence)
1382 }
1383
1384 #[track_caller]
1385 fn run(
1386 &self,
1387 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1389 let this = self.pin();
1390 Box::pin(async move { this.run(future).await })
1391 }
1392
1393 #[track_caller]
1394 fn run_once(
1395 &self,
1396 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1397 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1398 let this = self.pin();
1399 Box::pin(async move { this.run_once(future).await })
1400 }
1401
1402 #[track_caller]
1403 fn run_once_with_reason(
1404 &self,
1405 reason: StaticOrArc<dyn InvalidationReason>,
1406 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1407 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1408 {
1409 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1410 reason_set.insert(reason);
1411 }
1412 let this = self.pin();
1413 Box::pin(async move { this.run_once(future).await })
1414 }
1415
1416 #[track_caller]
1417 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1418 self.start_once_process(future)
1419 }
1420
1421 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1422 if let Err(e) = self.compilation_events.send(event) {
1423 tracing::warn!("Failed to send compilation event: {e}");
1424 }
1425 }
1426
1427 fn get_task_name(&self, task: TaskId) -> String {
1428 self.backend.get_task_name(task, self)
1429 }
1430}
1431
1432impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1433 #[instrument(level = "info", skip_all, name = "invalidate")]
1434 fn invalidate(&self, task: TaskId) {
1435 self.backend.invalidate_task(task, self);
1436 }
1437
1438 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1439 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1440 {
1441 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1442 reason_set.insert(reason);
1443 }
1444 self.backend.invalidate_task(task, self);
1445 }
1446
1447 fn invalidate_serialization(&self, task: TaskId) {
1448 self.backend.invalidate_serialization(task, self);
1449 }
1450
1451 #[track_caller]
1452 fn try_read_task_output(
1453 &self,
1454 task: TaskId,
1455 options: ReadOutputOptions,
1456 ) -> Result<Result<RawVc, EventListener>> {
1457 if options.consistency == ReadConsistency::Eventual {
1458 debug_assert_not_in_top_level_task("read_task_output");
1459 }
1460 self.backend.try_read_task_output(
1461 task,
1462 current_task_if_available("reading Vcs"),
1463 options,
1464 self,
1465 )
1466 }
1467
1468 #[track_caller]
1469 fn try_read_task_cell(
1470 &self,
1471 task: TaskId,
1472 index: CellId,
1473 options: ReadCellOptions,
1474 ) -> Result<Result<TypedCellContent, EventListener>> {
1475 let reader = current_task_if_available("reading Vcs");
1476 self.backend
1477 .try_read_task_cell(task, index, reader, options, self)
1478 }
1479
1480 fn try_read_own_task_cell(
1481 &self,
1482 current_task: TaskId,
1483 index: CellId,
1484 options: ReadCellOptions,
1485 ) -> Result<TypedCellContent> {
1486 self.backend
1487 .try_read_own_task_cell(current_task, index, options, self)
1488 }
1489
1490 #[track_caller]
1491 fn try_read_local_output(
1492 &self,
1493 execution_id: ExecutionId,
1494 local_task_id: LocalTaskId,
1495 ) -> Result<Result<RawVc, EventListener>> {
1496 debug_assert_not_in_top_level_task("read_local_output");
1497 CURRENT_TASK_STATE.with(|gts| {
1498 let gts_read = gts.read().unwrap();
1499
1500 gts_read.assert_execution_id(execution_id);
1505
1506 match gts_read.get_local_task(local_task_id) {
1507 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1508 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1509 }
1510 })
1511 }
1512
1513 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1514 self.backend.read_task_collectibles(
1517 task,
1518 trait_id,
1519 current_task_if_available("reading collectibles"),
1520 self,
1521 )
1522 }
1523
1524 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1525 self.backend.emit_collectible(
1526 trait_type,
1527 collectible,
1528 current_task("emitting collectible"),
1529 self,
1530 );
1531 }
1532
1533 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1534 self.backend.unemit_collectible(
1535 trait_type,
1536 collectible,
1537 count,
1538 current_task("emitting collectible"),
1539 self,
1540 );
1541 }
1542
1543 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1544 for (&collectible, &count) in collectibles {
1545 if count > 0 {
1546 self.backend.unemit_collectible(
1547 trait_type,
1548 collectible,
1549 count as u32,
1550 current_task("emitting collectible"),
1551 self,
1552 );
1553 }
1554 }
1555 }
1556
1557 fn read_own_task_cell(
1558 &self,
1559 task: TaskId,
1560 index: CellId,
1561 options: ReadCellOptions,
1562 ) -> Result<TypedCellContent> {
1563 self.try_read_own_task_cell(task, index, options)
1564 }
1565
1566 fn update_own_task_cell(
1567 &self,
1568 task: TaskId,
1569 index: CellId,
1570 is_serializable_cell_content: bool,
1571 content: CellContent,
1572 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1573 verification_mode: VerificationMode,
1574 ) {
1575 self.backend.update_task_cell(
1576 task,
1577 index,
1578 is_serializable_cell_content,
1579 content,
1580 updated_key_hashes,
1581 verification_mode,
1582 self,
1583 );
1584 }
1585
1586 fn connect_task(&self, task: TaskId) {
1587 self.backend
1588 .connect_task(task, current_task_if_available("connecting task"), self);
1589 }
1590
1591 fn mark_own_task_as_finished(&self, task: TaskId) {
1592 self.backend.mark_own_task_as_finished(task, self);
1593 }
1594
1595 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1596 self.backend.mark_own_task_as_session_dependent(task, self);
1597 }
1598
1599 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1602 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1605 let fut = tokio::spawn(TURBO_TASKS.scope(
1606 turbo_tasks(),
1607 CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1608 ));
1609 let fut = Box::pin(async move {
1610 fut.await.unwrap();
1611 });
1612 let mut ts = global_task_state.write().unwrap();
1613 ts.local_task_tracker
1614 .get_or_insert_default()
1615 .push(Either::Right(fut));
1616 }
1617
1618 fn task_statistics(&self) -> &TaskStatisticsApi {
1619 self.backend.task_statistics()
1620 }
1621
1622 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1623 let this = self.pin();
1624 Box::pin(async move {
1625 this.stop_and_wait().await;
1626 })
1627 }
1628
1629 fn subscribe_to_compilation_events(
1630 &self,
1631 event_types: Option<Vec<String>>,
1632 ) -> Receiver<Arc<dyn CompilationEvent>> {
1633 self.compilation_events.subscribe(event_types)
1634 }
1635
1636 fn is_tracking_dependencies(&self) -> bool {
1637 self.backend.is_tracking_dependencies()
1638 }
1639}
1640
1641impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1642 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1643 self.pin()
1644 }
1645 fn backend(&self) -> &B {
1646 &self.backend
1647 }
1648
1649 #[track_caller]
1650 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1651 self.schedule_background_job(async move |this| {
1652 this.backend.run_backend_job(job, &*this).await;
1653 this
1654 })
1655 }
1656
1657 #[track_caller]
1658 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1659 self.schedule_foreground_job(async move |this| {
1660 this.backend.run_backend_job(job, &*this).await;
1661 this
1662 })
1663 }
1664
1665 #[track_caller]
1666 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1667 self.schedule(task, priority)
1668 }
1669
1670 fn get_current_task_priority(&self) -> TaskPriority {
1671 CURRENT_TASK_STATE
1672 .try_with(|task_state| task_state.read().unwrap().priority)
1673 .unwrap_or(TaskPriority::initial())
1674 }
1675
1676 fn program_duration_until(&self, instant: Instant) -> Duration {
1677 instant - self.program_start
1678 }
1679
1680 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1681 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1683 }
1684
1685 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1686 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1688 }
1689
1690 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1691 unsafe { self.task_id_factory.reuse(id.into()) }
1692 }
1693
1694 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1695 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1696 }
1697
1698 fn is_idle(&self) -> bool {
1699 self.currently_scheduled_foreground_jobs
1700 .load(Ordering::Acquire)
1701 == 0
1702 }
1703}
1704
1705async fn wait_for_local_tasks() {
1706 while let Some(mut ltt) =
1707 CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1708 {
1709 use futures::StreamExt;
1710 while ltt.next().await.is_some() {}
1711 }
1712}
1713
1714pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1715 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1716 Ok(id) => id,
1717 Err(_) => panic!(
1718 "{from} can only be used in the context of a turbo_tasks task execution or \
1719 turbo_tasks run"
1720 ),
1721 }
1722}
1723
1724pub(crate) fn current_task(from: &str) -> TaskId {
1725 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1726 Ok(Some(id)) => id,
1727 Ok(None) | Err(_) => {
1728 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1729 }
1730 }
1731}
1732
1733#[track_caller]
1734fn debug_assert_not_in_top_level_task(operation: &str) {
1735 if !cfg!(debug_assertions) {
1736 return;
1737 }
1738
1739 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1742 .try_with(|&suppressed| suppressed)
1743 .unwrap_or(false);
1744 if suppressed {
1745 return;
1746 }
1747
1748 let in_top_level = CURRENT_TASK_STATE
1749 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1750 .unwrap_or(false);
1751 if in_top_level {
1752 panic!(
1753 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1754 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1755 reads to avoid leaking inconsistent return values."
1756 );
1757 }
1758}
1759
1760pub async fn run<T: Send + 'static>(
1761 tt: Arc<dyn TurboTasksApi>,
1762 future: impl Future<Output = Result<T>> + Send + 'static,
1763) -> Result<T> {
1764 let (tx, rx) = tokio::sync::oneshot::channel();
1765
1766 tt.run(Box::pin(async move {
1767 let result = future.await?;
1768 tx.send(result)
1769 .map_err(|_| anyhow!("unable to send result"))?;
1770 Ok(())
1771 }))
1772 .await?;
1773
1774 Ok(rx.await?)
1775}
1776
1777pub async fn run_once<T: Send + 'static>(
1778 tt: Arc<dyn TurboTasksApi>,
1779 future: impl Future<Output = Result<T>> + Send + 'static,
1780) -> Result<T> {
1781 let (tx, rx) = tokio::sync::oneshot::channel();
1782
1783 tt.run_once(Box::pin(async move {
1784 let result = future.await?;
1785 tx.send(result)
1786 .map_err(|_| anyhow!("unable to send result"))?;
1787 Ok(())
1788 }))
1789 .await?;
1790
1791 Ok(rx.await?)
1792}
1793
1794pub async fn run_once_with_reason<T: Send + 'static>(
1795 tt: Arc<dyn TurboTasksApi>,
1796 reason: impl InvalidationReason,
1797 future: impl Future<Output = Result<T>> + Send + 'static,
1798) -> Result<T> {
1799 let (tx, rx) = tokio::sync::oneshot::channel();
1800
1801 tt.run_once_with_reason(
1802 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1803 Box::pin(async move {
1804 let result = future.await?;
1805 tx.send(result)
1806 .map_err(|_| anyhow!("unable to send result"))?;
1807 Ok(())
1808 }),
1809 )
1810 .await?;
1811
1812 Ok(rx.await?)
1813}
1814
1815pub fn dynamic_call(
1817 func: &'static NativeFunction,
1818 this: Option<RawVc>,
1819 arg: Box<dyn MagicAny>,
1820 persistence: TaskPersistence,
1821) -> RawVc {
1822 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1823}
1824
1825pub fn trait_call(
1827 trait_method: &'static TraitMethod,
1828 this: RawVc,
1829 arg: Box<dyn MagicAny>,
1830 persistence: TaskPersistence,
1831) -> RawVc {
1832 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1833}
1834
1835pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1836 TURBO_TASKS.with(|arc| arc.clone())
1837}
1838
1839pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1840 TURBO_TASKS.with(Arc::downgrade)
1841}
1842
1843pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1844 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1845}
1846
1847pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1848 TURBO_TASKS.with(|arc| func(arc))
1849}
1850
1851pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1852 TURBO_TASKS.sync_scope(tt, f)
1853}
1854
1855pub fn turbo_tasks_future_scope<T>(
1856 tt: Arc<dyn TurboTasksApi>,
1857 f: impl Future<Output = T>,
1858) -> impl Future<Output = T> {
1859 TURBO_TASKS.scope(tt, f)
1860}
1861
1862pub fn with_turbo_tasks_for_testing<T>(
1863 tt: Arc<dyn TurboTasksApi>,
1864 current_task: TaskId,
1865 execution_id: ExecutionId,
1866 f: impl Future<Output = T>,
1867) -> impl Future<Output = T> {
1868 TURBO_TASKS.scope(
1869 tt,
1870 CURRENT_TASK_STATE.scope(
1871 Arc::new(RwLock::new(CurrentTaskState::new(
1872 current_task,
1873 execution_id,
1874 TaskPriority::initial(),
1875 false, ))),
1877 f,
1878 ),
1879 )
1880}
1881
1882pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1887 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1888}
1889
1890pub fn current_task_for_testing() -> Option<TaskId> {
1891 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1892}
1893
1894pub fn mark_session_dependent() {
1896 with_turbo_tasks(|tt| {
1897 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1898 });
1899}
1900
1901pub fn mark_finished() {
1904 with_turbo_tasks(|tt| {
1905 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1906 });
1907}
1908
1909pub fn get_serialization_invalidator() -> SerializationInvalidator {
1915 CURRENT_TASK_STATE.with(|cell| {
1916 let CurrentTaskState {
1917 task_id,
1918 #[cfg(feature = "verify_determinism")]
1919 stateful,
1920 ..
1921 } = &mut *cell.write().unwrap();
1922 #[cfg(feature = "verify_determinism")]
1923 {
1924 *stateful = true;
1925 }
1926 let Some(task_id) = *task_id else {
1927 panic!(
1928 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1929 task execution"
1930 );
1931 };
1932 SerializationInvalidator::new(task_id)
1933 })
1934}
1935
1936pub fn mark_invalidator() {
1937 CURRENT_TASK_STATE.with(|cell| {
1938 let CurrentTaskState {
1939 has_invalidator, ..
1940 } = &mut *cell.write().unwrap();
1941 *has_invalidator = true;
1942 })
1943}
1944
1945pub fn mark_stateful() {
1951 #[cfg(feature = "verify_determinism")]
1952 {
1953 CURRENT_TASK_STATE.with(|cell| {
1954 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1955 *stateful = true;
1956 })
1957 }
1958 }
1960
1961pub fn mark_top_level_task() {
1965 if cfg!(debug_assertions) {
1966 CURRENT_TASK_STATE.with(|cell| {
1967 cell.write().unwrap().in_top_level_task = true;
1968 })
1969 }
1970}
1971
1972pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1983 if cfg!(debug_assertions) {
1984 CURRENT_TASK_STATE.with(|cell| {
1985 cell.write().unwrap().in_top_level_task = false;
1986 })
1987 }
1988}
1989
1990pub fn prevent_gc() {
1991 }
1993
1994pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1995 with_turbo_tasks(|tt| {
1996 let raw_vc = collectible.node.node;
1997 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1998 })
1999}
2000
2001pub(crate) async fn read_task_output(
2002 this: &dyn TurboTasksApi,
2003 id: TaskId,
2004 options: ReadOutputOptions,
2005) -> Result<RawVc> {
2006 loop {
2007 match this.try_read_task_output(id, options)? {
2008 Ok(result) => return Ok(result),
2009 Err(listener) => listener.await,
2010 }
2011 }
2012}
2013
2014#[derive(Clone, Copy)]
2020pub struct CurrentCellRef {
2021 current_task: TaskId,
2022 index: CellId,
2023 is_serializable_cell_content: bool,
2024}
2025
2026type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2027
2028impl CurrentCellRef {
2029 fn conditional_update<T>(
2031 &self,
2032 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>)>,
2033 ) where
2034 T: VcValueType,
2035 {
2036 self.conditional_update_with_shared_reference(|old_shared_reference| {
2037 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2038 let (new_value, updated_key_hashes) = functor(old_ref)?;
2039 Some((
2040 SharedReference::new(triomphe::Arc::new(new_value)),
2041 updated_key_hashes,
2042 ))
2043 })
2044 }
2045
2046 fn conditional_update_with_shared_reference(
2048 &self,
2049 functor: impl FnOnce(
2050 Option<&SharedReference>,
2051 ) -> Option<(SharedReference, Option<SmallVec<[u64; 2]>>)>,
2052 ) {
2053 let tt = turbo_tasks();
2054 let cell_content = tt
2055 .read_own_task_cell(
2056 self.current_task,
2057 self.index,
2058 ReadCellOptions {
2059 tracking: ReadCellTracking::Untracked,
2061 is_serializable_cell_content: self.is_serializable_cell_content,
2062 final_read_hint: false,
2063 },
2064 )
2065 .ok();
2066 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2067 if let Some((update, updated_key_hashes)) = update {
2068 tt.update_own_task_cell(
2069 self.current_task,
2070 self.index,
2071 self.is_serializable_cell_content,
2072 CellContent(Some(update)),
2073 updated_key_hashes,
2074 VerificationMode::EqualityCheck,
2075 )
2076 }
2077 }
2078
2079 pub fn compare_and_update<T>(&self, new_value: T)
2113 where
2114 T: PartialEq + VcValueType,
2115 {
2116 self.conditional_update(|old_value| {
2117 if let Some(old_value) = old_value
2118 && old_value == &new_value
2119 {
2120 return None;
2121 }
2122 Some((new_value, None))
2123 });
2124 }
2125
2126 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2134 where
2135 T: VcValueType + PartialEq,
2136 {
2137 self.conditional_update_with_shared_reference(|old_sr| {
2138 if let Some(old_sr) = old_sr {
2139 let old_value = extract_sr_value::<T>(old_sr);
2140 let new_value = extract_sr_value::<T>(&new_shared_reference);
2141 if old_value == new_value {
2142 return None;
2143 }
2144 }
2145 Some((new_shared_reference, None))
2146 });
2147 }
2148
2149 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2151 where
2152 T: PartialEq + VcValueType,
2153 VcReadTarget<T>: KeyedEq,
2154 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2155 {
2156 self.conditional_update(|old_value| {
2157 let Some(old_value) = old_value else {
2158 return Some((new_value, None));
2159 };
2160 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2161 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2162 let updated_keys = old_value.different_keys(new_value_ref);
2163 if updated_keys.is_empty() {
2164 return None;
2165 }
2166 let updated_key_hashes = updated_keys
2168 .into_iter()
2169 .map(|key| FxBuildHasher.hash_one(key))
2170 .collect();
2171 Some((new_value, Some(updated_key_hashes)))
2172 });
2173 }
2174
2175 pub fn keyed_compare_and_update_with_shared_reference<T>(
2178 &self,
2179 new_shared_reference: SharedReference,
2180 ) where
2181 T: VcValueType + PartialEq,
2182 VcReadTarget<T>: KeyedEq,
2183 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2184 {
2185 self.conditional_update_with_shared_reference(|old_sr| {
2186 let Some(old_sr) = old_sr else {
2187 return Some((new_shared_reference, None));
2188 };
2189 let old_value = extract_sr_value::<T>(old_sr);
2190 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2191 let new_value = extract_sr_value::<T>(&new_shared_reference);
2192 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2193 let updated_keys = old_value.different_keys(new_value);
2194 if updated_keys.is_empty() {
2195 return None;
2196 }
2197 let updated_key_hashes = updated_keys
2199 .into_iter()
2200 .map(|key| FxBuildHasher.hash_one(key))
2201 .collect();
2202 Some((new_shared_reference, Some(updated_key_hashes)))
2203 });
2204 }
2205
2206 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2208 where
2209 T: VcValueType,
2210 {
2211 let tt = turbo_tasks();
2212 tt.update_own_task_cell(
2213 self.current_task,
2214 self.index,
2215 self.is_serializable_cell_content,
2216 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2217 None,
2218 verification_mode,
2219 )
2220 }
2221
2222 pub fn update_with_shared_reference(
2230 &self,
2231 shared_ref: SharedReference,
2232 verification_mode: VerificationMode,
2233 ) {
2234 let tt = turbo_tasks();
2235 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2236 let content = tt
2237 .read_own_task_cell(
2238 self.current_task,
2239 self.index,
2240 ReadCellOptions {
2241 tracking: ReadCellTracking::Untracked,
2243 is_serializable_cell_content: self.is_serializable_cell_content,
2244 final_read_hint: false,
2245 },
2246 )
2247 .ok();
2248 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2249 shared_ref_exp != shared_ref
2251 } else {
2252 true
2253 }
2254 } else {
2255 true
2256 };
2257 if update {
2258 tt.update_own_task_cell(
2259 self.current_task,
2260 self.index,
2261 self.is_serializable_cell_content,
2262 CellContent(Some(shared_ref)),
2263 None,
2264 verification_mode,
2265 )
2266 }
2267 }
2268}
2269
2270impl From<CurrentCellRef> for RawVc {
2271 fn from(cell: CurrentCellRef) -> Self {
2272 RawVc::TaskCell(cell.current_task, cell.index)
2273 }
2274}
2275
2276fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2277 sr.0.downcast_ref::<T>()
2278 .expect("cannot update SharedReference of different type")
2279}
2280
2281pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2282 find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2283}
2284
2285pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2286 CURRENT_TASK_STATE.with(|ts| {
2287 let current_task = current_task("celling turbo_tasks values");
2288 let mut ts = ts.write().unwrap();
2289 let map = ts.cell_counters.as_mut().unwrap();
2290 let current_index = map.entry(ty).or_default();
2291 let index = *current_index;
2292 *current_index += 1;
2293 CurrentCellRef {
2294 current_task,
2295 index: CellId { type_id: ty, index },
2296 is_serializable_cell_content,
2297 }
2298 })
2299}
2300
2301pub(crate) async fn read_local_output(
2302 this: &dyn TurboTasksApi,
2303 execution_id: ExecutionId,
2304 local_task_id: LocalTaskId,
2305) -> Result<RawVc> {
2306 loop {
2307 match this.try_read_local_output(execution_id, local_task_id)? {
2308 Ok(raw_vc) => return Ok(raw_vc),
2309 Err(event_listener) => event_listener.await,
2310 }
2311 }
2312}