1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 pin::Pin,
8 sync::{
9 Arc, Mutex, RwLock, Weak,
10 atomic::{AtomicBool, AtomicUsize, Ordering},
11 },
12 time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
26
27use crate::{
28 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
29 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
30 VcValueTrait, VcValueType,
31 backend::{
32 Backend, CachedTaskType, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec,
33 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
34 },
35 capture_future::CaptureFuture,
36 event::{Event, EventListener},
37 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
38 id_factory::IdFactoryWithReuse,
39 keyed::KeyedEq,
40 macro_helpers::NativeFunction,
41 magic_any::MagicAny,
42 message_queue::{CompilationEvent, CompilationEventQueue},
43 priority_runner::{Executor, JoinHandle, PriorityRunner},
44 raw_vc::{CellId, RawVc},
45 registry,
46 serialization_invalidation::SerializationInvalidator,
47 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
48 task_statistics::TaskStatisticsApi,
49 trace::TraceRawVcs,
50 util::{IdFactory, StaticOrArc},
51};
52
53pub trait TurboTasksCallApi: Sync + Send {
56 fn dynamic_call(
59 &self,
60 native_fn: &'static NativeFunction,
61 this: Option<RawVc>,
62 arg: Box<dyn MagicAny>,
63 persistence: TaskPersistence,
64 ) -> RawVc;
65 fn native_call(
68 &self,
69 native_fn: &'static NativeFunction,
70 this: Option<RawVc>,
71 arg: Box<dyn MagicAny>,
72 persistence: TaskPersistence,
73 ) -> RawVc;
74 fn trait_call(
77 &self,
78 trait_method: &'static TraitMethod,
79 this: RawVc,
80 arg: Box<dyn MagicAny>,
81 persistence: TaskPersistence,
82 ) -> RawVc;
83
84 fn run(
85 &self,
86 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
88 fn run_once(
89 &self,
90 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
91 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
92 fn run_once_with_reason(
93 &self,
94 reason: StaticOrArc<dyn InvalidationReason>,
95 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
96 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
97 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
98
99 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
101
102 fn get_task_name(&self, task: TaskId) -> String;
104}
105
106pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
112 fn invalidate(&self, task: TaskId);
113 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
114
115 fn invalidate_serialization(&self, task: TaskId);
116
117 fn try_read_task_output(
118 &self,
119 task: TaskId,
120 options: ReadOutputOptions,
121 ) -> Result<Result<RawVc, EventListener>>;
122
123 fn try_read_task_cell(
124 &self,
125 task: TaskId,
126 index: CellId,
127 options: ReadCellOptions,
128 ) -> Result<Result<TypedCellContent, EventListener>>;
129
130 fn try_read_local_output(
145 &self,
146 execution_id: ExecutionId,
147 local_task_id: LocalTaskId,
148 ) -> Result<Result<RawVc, EventListener>>;
149
150 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
151
152 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
153 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
154 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
155
156 fn try_read_own_task_cell(
159 &self,
160 current_task: TaskId,
161 index: CellId,
162 options: ReadCellOptions,
163 ) -> Result<TypedCellContent>;
164
165 fn read_own_task_cell(
166 &self,
167 task: TaskId,
168 index: CellId,
169 options: ReadCellOptions,
170 ) -> Result<TypedCellContent>;
171 fn update_own_task_cell(
172 &self,
173 task: TaskId,
174 index: CellId,
175 is_serializable_cell_content: bool,
176 content: CellContent,
177 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
178 content_hash: Option<CellHash>,
179 verification_mode: VerificationMode,
180 );
181 fn mark_own_task_as_finished(&self, task: TaskId);
182 fn mark_own_task_as_session_dependent(&self, task: TaskId);
183
184 fn connect_task(&self, task: TaskId);
185
186 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
191
192 fn task_statistics(&self) -> &TaskStatisticsApi;
193
194 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
195
196 fn subscribe_to_compilation_events(
197 &self,
198 event_types: Option<Vec<String>>,
199 ) -> Receiver<Arc<dyn CompilationEvent>>;
200
201 fn is_tracking_dependencies(&self) -> bool;
203}
204
205pub struct Unused<T> {
207 inner: T,
208}
209
210impl<T> Unused<T> {
211 pub unsafe fn new_unchecked(inner: T) -> Self {
217 Self { inner }
218 }
219
220 pub unsafe fn get_unchecked(&self) -> &T {
226 &self.inner
227 }
228
229 pub fn into(self) -> T {
231 self.inner
232 }
233}
234
235pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
237 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
238
239 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
240 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
241 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
245 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
249
250 fn schedule(&self, task: TaskId, priority: TaskPriority);
252
253 fn get_current_task_priority(&self) -> TaskPriority;
255
256 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
258
259 fn schedule_backend_background_job(&self, job: B::BackendJob);
264
265 fn program_duration_until(&self, instant: Instant) -> Duration;
267
268 fn is_idle(&self) -> bool;
270
271 fn backend(&self) -> &B;
273}
274
275#[allow(clippy::manual_non_exhaustive)]
276pub struct UpdateInfo {
277 pub duration: Duration,
278 pub tasks: usize,
279 pub reasons: InvalidationReasonSet,
280 #[allow(dead_code)]
281 placeholder_for_future_fields: (),
282}
283
284#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
285pub enum TaskPersistence {
286 Persistent,
288
289 Transient,
296}
297
298impl Display for TaskPersistence {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 match self {
301 TaskPersistence::Persistent => write!(f, "persistent"),
302 TaskPersistence::Transient => write!(f, "transient"),
303 }
304 }
305}
306
307#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
308pub enum ReadConsistency {
309 #[default]
312 Eventual,
313 Strong,
318}
319
320#[derive(Clone, Copy, Debug, Eq, PartialEq)]
321pub enum ReadCellTracking {
322 Tracked {
324 key: Option<u64>,
326 },
327 TrackOnlyError,
332 Untracked,
337}
338
339impl ReadCellTracking {
340 pub fn should_track(&self, is_err: bool) -> bool {
341 match self {
342 ReadCellTracking::Tracked { .. } => true,
343 ReadCellTracking::TrackOnlyError => is_err,
344 ReadCellTracking::Untracked => false,
345 }
346 }
347
348 pub fn key(&self) -> Option<u64> {
349 match self {
350 ReadCellTracking::Tracked { key } => *key,
351 ReadCellTracking::TrackOnlyError => None,
352 ReadCellTracking::Untracked => None,
353 }
354 }
355}
356
357impl Default for ReadCellTracking {
358 fn default() -> Self {
359 ReadCellTracking::Tracked { key: None }
360 }
361}
362
363impl Display for ReadCellTracking {
364 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365 match self {
366 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
367 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
368 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
369 ReadCellTracking::Untracked => write!(f, "untracked"),
370 }
371 }
372}
373
374#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
375pub enum ReadTracking {
376 #[default]
378 Tracked,
379 TrackOnlyError,
384 Untracked,
389}
390
391impl ReadTracking {
392 pub fn should_track(&self, is_err: bool) -> bool {
393 match self {
394 ReadTracking::Tracked => true,
395 ReadTracking::TrackOnlyError => is_err,
396 ReadTracking::Untracked => false,
397 }
398 }
399}
400
401impl Display for ReadTracking {
402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403 match self {
404 ReadTracking::Tracked => write!(f, "tracked"),
405 ReadTracking::TrackOnlyError => write!(f, "track only error"),
406 ReadTracking::Untracked => write!(f, "untracked"),
407 }
408 }
409}
410
411#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
412pub enum TaskPriority {
413 #[default]
414 Initial,
415 Invalidation {
416 priority: Reverse<u32>,
417 },
418}
419
420impl TaskPriority {
421 pub fn invalidation(priority: u32) -> Self {
422 Self::Invalidation {
423 priority: Reverse(priority),
424 }
425 }
426
427 pub fn initial() -> Self {
428 Self::Initial
429 }
430
431 pub fn leaf() -> Self {
432 Self::Invalidation {
433 priority: Reverse(0),
434 }
435 }
436
437 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
438 match self {
439 TaskPriority::Initial => parent_priority,
440 TaskPriority::Invalidation { priority } => {
441 if let TaskPriority::Invalidation {
442 priority: parent_priority,
443 } = parent_priority
444 && priority.0 < parent_priority.0
445 {
446 Self::Invalidation {
447 priority: Reverse(parent_priority.0.saturating_add(1)),
448 }
449 } else {
450 *self
451 }
452 }
453 }
454 }
455}
456
457impl Display for TaskPriority {
458 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459 match self {
460 TaskPriority::Initial => write!(f, "initial"),
461 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
462 }
463 }
464}
465
466enum ScheduledTask {
467 Task {
468 task_id: TaskId,
469 span: Span,
470 },
471 LocalTask {
472 ty: LocalTaskSpec,
473 persistence: TaskPersistence,
474 local_task_id: LocalTaskId,
475 global_task_state: Arc<RwLock<CurrentTaskState>>,
476 span: Span,
477 },
478}
479
480pub struct TurboTasks<B: Backend + 'static> {
481 this: Weak<Self>,
482 backend: B,
483 task_id_factory: IdFactoryWithReuse<TaskId>,
484 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
485 execution_id_factory: IdFactory<ExecutionId>,
486 stopped: AtomicBool,
487 currently_scheduled_foreground_jobs: AtomicUsize,
488 currently_scheduled_background_jobs: AtomicUsize,
489 scheduled_tasks: AtomicUsize,
490 priority_runner:
491 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
492 start: Mutex<Option<Instant>>,
493 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
494 event_foreground_start: Event,
496 event_foreground_done: Event,
499 event_background_done: Event,
501 program_start: Instant,
502 compilation_events: CompilationEventQueue,
503}
504
505type LocalTaskTracker = Option<
506 FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
507>;
508
509struct CurrentTaskState {
518 task_id: Option<TaskId>,
519 execution_id: ExecutionId,
520 priority: TaskPriority,
521
522 #[cfg(feature = "verify_determinism")]
525 stateful: bool,
526
527 has_invalidator: bool,
529
530 in_top_level_task: bool,
533
534 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
539
540 local_tasks: Vec<LocalTask>,
542
543 local_task_tracker: LocalTaskTracker,
546}
547
548impl CurrentTaskState {
549 fn new(
550 task_id: TaskId,
551 execution_id: ExecutionId,
552 priority: TaskPriority,
553 in_top_level_task: bool,
554 ) -> Self {
555 Self {
556 task_id: Some(task_id),
557 execution_id,
558 priority,
559 #[cfg(feature = "verify_determinism")]
560 stateful: false,
561 has_invalidator: false,
562 in_top_level_task,
563 cell_counters: Some(AutoMap::default()),
564 local_tasks: Vec::new(),
565 local_task_tracker: None,
566 }
567 }
568
569 fn new_temporary(
570 execution_id: ExecutionId,
571 priority: TaskPriority,
572 in_top_level_task: bool,
573 ) -> Self {
574 Self {
575 task_id: None,
576 execution_id,
577 priority,
578 #[cfg(feature = "verify_determinism")]
579 stateful: false,
580 has_invalidator: false,
581 in_top_level_task,
582 cell_counters: None,
583 local_tasks: Vec::new(),
584 local_task_tracker: None,
585 }
586 }
587
588 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
589 if self.execution_id != expected_execution_id {
590 panic!(
591 "Local tasks can only be scheduled/awaited within the same execution of the \
592 parent task that created them"
593 );
594 }
595 }
596
597 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
598 self.local_tasks.push(local_task);
599 if cfg!(debug_assertions) {
601 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
602 } else {
603 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
604 }
605 }
606
607 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
608 &self.local_tasks[(*local_task_id as usize) - 1]
610 }
611
612 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
613 &mut self.local_tasks[(*local_task_id as usize) - 1]
614 }
615}
616
617task_local! {
619 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
621
622 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
623
624 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
629}
630
631impl<B: Backend + 'static> TurboTasks<B> {
632 pub fn new(backend: B) -> Arc<Self> {
638 let task_id_factory = IdFactoryWithReuse::new(
639 TaskId::MIN,
640 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
641 );
642 let transient_task_id_factory =
643 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
644 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
645 let this = Arc::new_cyclic(|this| Self {
646 this: this.clone(),
647 backend,
648 task_id_factory,
649 transient_task_id_factory,
650 execution_id_factory,
651 stopped: AtomicBool::new(false),
652 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
653 currently_scheduled_background_jobs: AtomicUsize::new(0),
654 scheduled_tasks: AtomicUsize::new(0),
655 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
656 start: Default::default(),
657 aggregated_update: Default::default(),
658 event_foreground_done: Event::new(|| {
659 || "TurboTasks::event_foreground_done".to_string()
660 }),
661 event_foreground_start: Event::new(|| {
662 || "TurboTasks::event_foreground_start".to_string()
663 }),
664 event_background_done: Event::new(|| {
665 || "TurboTasks::event_background_done".to_string()
666 }),
667 program_start: Instant::now(),
668 compilation_events: CompilationEventQueue::default(),
669 });
670 this.backend.startup(&*this);
671 this
672 }
673
674 pub fn pin(&self) -> Arc<Self> {
675 self.this.upgrade().unwrap()
676 }
677
678 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
680 where
681 T: ?Sized,
682 F: Fn() -> Fut + Send + Sync + Clone + 'static,
683 Fut: Future<Output = Result<Vc<T>>> + Send,
684 {
685 let id = self.backend.create_transient_task(
686 TransientTaskType::Root(Box::new(move || {
687 let functor = functor.clone();
688 Box::pin(async move {
689 mark_top_level_task();
690 let raw_vc = functor().await?.node;
691 raw_vc.to_non_local().await
692 })
693 })),
694 self,
695 );
696 self.schedule(id, TaskPriority::initial());
697 id
698 }
699
700 pub fn dispose_root_task(&self, task_id: TaskId) {
701 self.backend.dispose_root_task(task_id, self);
702 }
703
704 #[track_caller]
708 fn spawn_once_task<T, Fut>(&self, future: Fut)
709 where
710 T: ?Sized,
711 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
712 {
713 let id = self.backend.create_transient_task(
714 TransientTaskType::Once(Box::pin(async move {
715 mark_top_level_task();
716 let raw_vc = future.await?.node;
717 raw_vc.to_non_local().await
718 })),
719 self,
720 );
721 self.schedule(id, TaskPriority::initial());
722 }
723
724 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
725 &self,
726 future: impl Future<Output = Result<T>> + Send + 'static,
727 ) -> Result<T> {
728 let (tx, rx) = tokio::sync::oneshot::channel();
729 self.spawn_once_task(async move {
730 mark_top_level_task();
731 let result = future.await;
732 tx.send(result)
733 .map_err(|_| anyhow!("unable to send result"))?;
734 Ok(Completion::new())
735 });
736
737 rx.await?
738 }
739
740 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
741 pub async fn run<T: TraceRawVcs + Send + 'static>(
742 &self,
743 future: impl Future<Output = Result<T>> + Send + 'static,
744 ) -> Result<T, TurboTasksExecutionError> {
745 self.begin_foreground_job();
746 let execution_id = self.execution_id_factory.wrapping_get();
748 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
749 execution_id,
750 TaskPriority::initial(),
751 true, )));
753
754 let result = TURBO_TASKS
755 .scope(
756 self.pin(),
757 CURRENT_TASK_STATE.scope(current_task_state, async {
758 let result = CaptureFuture::new(future).await;
759
760 wait_for_local_tasks().await;
762
763 match result {
764 Ok(Ok(raw_vc)) => Ok(raw_vc),
765 Ok(Err(err)) => Err(err.into()),
766 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
767 }
768 }),
769 )
770 .await;
771 self.finish_foreground_job();
772 result
773 }
774
775 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
776 let this = self.pin();
777 tokio::spawn(async move {
778 this.pin()
779 .run_once(async move {
780 this.finish_foreground_job();
781 future.await;
782 this.begin_foreground_job();
783 Ok(())
784 })
785 .await
786 .unwrap()
787 });
788 }
789
790 pub(crate) fn native_call(
791 &self,
792 native_fn: &'static NativeFunction,
793 this: Option<RawVc>,
794 arg: Box<dyn MagicAny>,
795 persistence: TaskPersistence,
796 ) -> RawVc {
797 let task_type = CachedTaskType {
798 native_fn,
799 this,
800 arg,
801 };
802 RawVc::TaskOutput(match persistence {
803 TaskPersistence::Transient => self.backend.get_or_create_transient_task(
804 task_type,
805 current_task_if_available("turbo_function calls"),
806 self,
807 ),
808 TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
809 task_type,
810 current_task_if_available("turbo_function calls"),
811 self,
812 ),
813 })
814 }
815
816 pub fn dynamic_call(
817 &self,
818 native_fn: &'static NativeFunction,
819 this: Option<RawVc>,
820 arg: Box<dyn MagicAny>,
821 persistence: TaskPersistence,
822 ) -> RawVc {
823 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
824 return self.native_call(native_fn, this, arg, persistence);
825 }
826 let task_type = LocalTaskSpec {
827 task_type: LocalTaskType::ResolveNative { native_fn },
828 this,
829 arg,
830 };
831 self.schedule_local_task(task_type, persistence)
832 }
833
834 pub fn trait_call(
835 &self,
836 trait_method: &'static TraitMethod,
837 this: RawVc,
838 arg: Box<dyn MagicAny>,
839 persistence: TaskPersistence,
840 ) -> RawVc {
841 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
845 match registry::get_value_type(type_id).get_trait_method(trait_method) {
846 Some(native_fn) => {
847 let arg = native_fn.arg_meta.filter_owned(arg);
848 return self.dynamic_call(native_fn, Some(this), arg, persistence);
849 }
850 None => {
851 }
855 }
856 }
857
858 let task_type = LocalTaskSpec {
860 task_type: LocalTaskType::ResolveTrait { trait_method },
861 this: Some(this),
862 arg,
863 };
864
865 self.schedule_local_task(task_type, persistence)
866 }
867
868 #[track_caller]
869 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
870 self.begin_foreground_job();
871 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
872
873 self.priority_runner.schedule(
874 &self.pin(),
875 ScheduledTask::Task {
876 task_id,
877 span: Span::current(),
878 },
879 priority,
880 );
881 }
882
883 fn schedule_local_task(
884 &self,
885 ty: LocalTaskSpec,
886 persistence: TaskPersistence,
888 ) -> RawVc {
889 let task_type = ty.task_type;
890 let (global_task_state, execution_id, priority, local_task_id) =
891 CURRENT_TASK_STATE.with(|gts| {
892 let mut gts_write = gts.write().unwrap();
893 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
894 done_event: Event::new(move || {
895 move || format!("LocalTask({task_type})::done_event")
896 }),
897 });
898 (
899 Arc::clone(gts),
900 gts_write.execution_id,
901 gts_write.priority,
902 local_task_id,
903 )
904 });
905
906 let future = self.priority_runner.schedule_with_join_handle(
907 &self.pin(),
908 ScheduledTask::LocalTask {
909 ty,
910 persistence,
911 local_task_id,
912 global_task_state: global_task_state.clone(),
913 span: Span::current(),
914 },
915 priority,
916 );
917 global_task_state
918 .write()
919 .unwrap()
920 .local_task_tracker
921 .get_or_insert_default()
922 .push(Either::Left(future));
923
924 RawVc::LocalOutput(execution_id, local_task_id, persistence)
925 }
926
927 fn begin_foreground_job(&self) {
928 if self
929 .currently_scheduled_foreground_jobs
930 .fetch_add(1, Ordering::AcqRel)
931 == 0
932 {
933 *self.start.lock().unwrap() = Some(Instant::now());
934 self.event_foreground_start.notify(usize::MAX);
935 self.backend.idle_end(self);
936 }
937 }
938
939 fn finish_foreground_job(&self) {
940 if self
941 .currently_scheduled_foreground_jobs
942 .fetch_sub(1, Ordering::AcqRel)
943 == 1
944 {
945 self.backend.idle_start(self);
946 let total = self.scheduled_tasks.load(Ordering::Acquire);
949 self.scheduled_tasks.store(0, Ordering::Release);
950 if let Some(start) = *self.start.lock().unwrap() {
951 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
952 if let Some(update) = update.as_mut() {
953 update.0 += start.elapsed();
954 update.1 += total;
955 } else {
956 *update = Some((start.elapsed(), total));
957 }
958 }
959 self.event_foreground_done.notify(usize::MAX);
960 }
961 }
962
963 fn begin_background_job(&self) {
964 self.currently_scheduled_background_jobs
965 .fetch_add(1, Ordering::Relaxed);
966 }
967
968 fn finish_background_job(&self) {
969 if self
970 .currently_scheduled_background_jobs
971 .fetch_sub(1, Ordering::Relaxed)
972 == 1
973 {
974 self.event_background_done.notify(usize::MAX);
975 }
976 }
977
978 pub fn get_in_progress_count(&self) -> usize {
979 self.currently_scheduled_foreground_jobs
980 .load(Ordering::Acquire)
981 }
982
983 pub async fn wait_task_completion(
995 &self,
996 id: TaskId,
997 consistency: ReadConsistency,
998 ) -> Result<()> {
999 read_task_output(
1000 self,
1001 id,
1002 ReadOutputOptions {
1003 tracking: ReadTracking::Untracked,
1005 consistency,
1006 },
1007 )
1008 .await?;
1009 Ok(())
1010 }
1011
1012 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
1015 self.aggregated_update_info(aggregation, Duration::MAX)
1016 .await
1017 .unwrap()
1018 }
1019
1020 pub async fn aggregated_update_info(
1024 &self,
1025 aggregation: Duration,
1026 timeout: Duration,
1027 ) -> Option<UpdateInfo> {
1028 let listener = self
1029 .event_foreground_done
1030 .listen_with_note(|| || "wait for update info".to_string());
1031 let wait_for_finish = {
1032 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1033 if aggregation.is_zero() {
1034 if let Some((duration, tasks)) = update.take() {
1035 return Some(UpdateInfo {
1036 duration,
1037 tasks,
1038 reasons: take(reason_set),
1039 placeholder_for_future_fields: (),
1040 });
1041 } else {
1042 true
1043 }
1044 } else {
1045 update.is_none()
1046 }
1047 };
1048 if wait_for_finish {
1049 if timeout == Duration::MAX {
1050 listener.await;
1052 } else {
1053 let start_listener = self
1055 .event_foreground_start
1056 .listen_with_note(|| || "wait for update info".to_string());
1057 if self
1058 .currently_scheduled_foreground_jobs
1059 .load(Ordering::Acquire)
1060 == 0
1061 {
1062 start_listener.await;
1063 } else {
1064 drop(start_listener);
1065 }
1066 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1067 return None;
1069 }
1070 }
1071 }
1072 if !aggregation.is_zero() {
1073 loop {
1074 select! {
1075 () = tokio::time::sleep(aggregation) => {
1076 break;
1077 }
1078 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1079 }
1081 }
1082 }
1083 }
1084 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1085 if let Some((duration, tasks)) = update.take() {
1086 Some(UpdateInfo {
1087 duration,
1088 tasks,
1089 reasons: take(reason_set),
1090 placeholder_for_future_fields: (),
1091 })
1092 } else {
1093 panic!("aggregated_update_info must not called concurrently")
1094 }
1095 }
1096
1097 pub async fn wait_background_done(&self) {
1098 let listener = self.event_background_done.listen();
1099 if self
1100 .currently_scheduled_background_jobs
1101 .load(Ordering::Acquire)
1102 != 0
1103 {
1104 listener.await;
1105 }
1106 }
1107
1108 pub async fn stop_and_wait(&self) {
1109 turbo_tasks_future_scope(self.pin(), async move {
1110 self.backend.stopping(self);
1111 self.stopped.store(true, Ordering::Release);
1112 {
1113 let listener = self
1114 .event_foreground_done
1115 .listen_with_note(|| || "wait for stop".to_string());
1116 if self
1117 .currently_scheduled_foreground_jobs
1118 .load(Ordering::Acquire)
1119 != 0
1120 {
1121 listener.await;
1122 }
1123 }
1124 {
1125 let listener = self.event_background_done.listen();
1126 if self
1127 .currently_scheduled_background_jobs
1128 .load(Ordering::Acquire)
1129 != 0
1130 {
1131 listener.await;
1132 }
1133 }
1134 self.backend.stop(self);
1135 })
1136 .await;
1137 }
1138
1139 #[track_caller]
1140 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1141 where
1142 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1143 T::CallOnceFuture: Send,
1144 {
1145 let mut this = self.pin();
1146 this.begin_foreground_job();
1147 tokio::spawn(
1148 TURBO_TASKS
1149 .scope(this.clone(), async move {
1150 if !this.stopped.load(Ordering::Acquire) {
1151 this = func(this.clone()).await;
1152 }
1153 this.finish_foreground_job();
1154 })
1155 .in_current_span(),
1156 );
1157 }
1158
1159 #[track_caller]
1160 pub(crate) fn schedule_background_job<T>(&self, func: T)
1161 where
1162 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1163 T::CallOnceFuture: Send,
1164 {
1165 let mut this = self.pin();
1166 self.begin_background_job();
1167 tokio::spawn(
1168 TURBO_TASKS
1169 .scope(this.clone(), async move {
1170 if !this.stopped.load(Ordering::Acquire) {
1171 this = func(this).await;
1172 }
1173 this.finish_background_job();
1174 })
1175 .in_current_span(),
1176 );
1177 }
1178
1179 fn finish_current_task_state(&self) -> FinishedTaskState {
1180 CURRENT_TASK_STATE.with(|cell| {
1181 let current_task_state = &*cell.write().unwrap();
1182 FinishedTaskState {
1183 #[cfg(feature = "verify_determinism")]
1184 stateful: current_task_state.stateful,
1185 has_invalidator: current_task_state.has_invalidator,
1186 }
1187 })
1188 }
1189
1190 pub fn backend(&self) -> &B {
1191 &self.backend
1192 }
1193}
1194
1195struct TurboTasksExecutor;
1196
1197impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1198 type Future = impl Future<Output = ()> + Send + 'static;
1199
1200 fn execute(
1201 &self,
1202 this: &Arc<TurboTasks<B>>,
1203 scheduled_task: ScheduledTask,
1204 priority: TaskPriority,
1205 ) -> Self::Future {
1206 match scheduled_task {
1207 ScheduledTask::Task { task_id, span } => {
1208 let this2 = this.clone();
1209 let this = this.clone();
1210 let future = async move {
1211 let mut schedule_again = true;
1212 while schedule_again {
1213 let execution_id = this.execution_id_factory.wrapping_get();
1216 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1217 task_id,
1218 execution_id,
1219 priority,
1220 false, )));
1222 let single_execution_future = async {
1223 if this.stopped.load(Ordering::Acquire) {
1224 this.backend.task_execution_canceled(task_id, &*this);
1225 return false;
1226 }
1227
1228 let Some(TaskExecutionSpec { future, span }) = this
1229 .backend
1230 .try_start_task_execution(task_id, priority, &*this)
1231 else {
1232 return false;
1233 };
1234
1235 async {
1236 let result = CaptureFuture::new(future).await;
1237
1238 wait_for_local_tasks().await;
1240
1241 let result = match result {
1242 Ok(Ok(raw_vc)) => Ok(raw_vc),
1243 Ok(Err(err)) => Err(err.into()),
1244 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1245 };
1246
1247 let finished_state = this.finish_current_task_state();
1248 let cell_counters = CURRENT_TASK_STATE
1249 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1250 this.backend.task_execution_completed(
1251 task_id,
1252 result,
1253 &cell_counters,
1254 #[cfg(feature = "verify_determinism")]
1255 finished_state.stateful,
1256 finished_state.has_invalidator,
1257 &*this,
1258 )
1259 }
1260 .instrument(span)
1261 .await
1262 };
1263 schedule_again = CURRENT_TASK_STATE
1264 .scope(current_task_state, single_execution_future)
1265 .await;
1266 }
1267 this.finish_foreground_job();
1268 };
1269
1270 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1271 }
1272 ScheduledTask::LocalTask {
1273 ty,
1274 persistence,
1275 local_task_id,
1276 global_task_state,
1277 span,
1278 } => {
1279 let this2 = this.clone();
1280 let this = this.clone();
1281 let task_type = ty.task_type;
1282 let future = async move {
1283 let span = match &ty.task_type {
1284 LocalTaskType::ResolveNative { native_fn } => {
1285 native_fn.resolve_span(priority)
1286 }
1287 LocalTaskType::ResolveTrait { trait_method } => {
1288 trait_method.resolve_span(priority)
1289 }
1290 };
1291 async move {
1292 let result = match ty.task_type {
1293 LocalTaskType::ResolveNative { native_fn } => {
1294 LocalTaskType::run_resolve_native(
1295 native_fn,
1296 ty.this,
1297 &*ty.arg,
1298 persistence,
1299 this,
1300 )
1301 .await
1302 }
1303 LocalTaskType::ResolveTrait { trait_method } => {
1304 LocalTaskType::run_resolve_trait(
1305 trait_method,
1306 ty.this.unwrap(),
1307 &*ty.arg,
1308 persistence,
1309 this,
1310 )
1311 .await
1312 }
1313 };
1314
1315 let output = match result {
1316 Ok(raw_vc) => OutputContent::Link(raw_vc),
1317 Err(err) => OutputContent::Error(
1318 TurboTasksExecutionError::from(err)
1319 .with_local_task_context(task_type.to_string()),
1320 ),
1321 };
1322
1323 let local_task = LocalTask::Done { output };
1324
1325 let done_event = CURRENT_TASK_STATE.with(move |gts| {
1326 let mut gts_write = gts.write().unwrap();
1327 let scheduled_task = std::mem::replace(
1328 gts_write.get_mut_local_task(local_task_id),
1329 local_task,
1330 );
1331 let LocalTask::Scheduled { done_event } = scheduled_task else {
1332 panic!("local task finished, but was not in the scheduled state?");
1333 };
1334 done_event
1335 });
1336 done_event.notify(usize::MAX)
1337 }
1338 .instrument(span)
1339 .await
1340 };
1341 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1342
1343 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1344 }
1345 }
1346 }
1347}
1348
1349struct FinishedTaskState {
1350 #[cfg(feature = "verify_determinism")]
1353 stateful: bool,
1354
1355 has_invalidator: bool,
1357}
1358
1359impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1360 fn dynamic_call(
1361 &self,
1362 native_fn: &'static NativeFunction,
1363 this: Option<RawVc>,
1364 arg: Box<dyn MagicAny>,
1365 persistence: TaskPersistence,
1366 ) -> RawVc {
1367 self.dynamic_call(native_fn, this, arg, persistence)
1368 }
1369 fn native_call(
1370 &self,
1371 native_fn: &'static NativeFunction,
1372 this: Option<RawVc>,
1373 arg: Box<dyn MagicAny>,
1374 persistence: TaskPersistence,
1375 ) -> RawVc {
1376 self.native_call(native_fn, this, arg, persistence)
1377 }
1378 fn trait_call(
1379 &self,
1380 trait_method: &'static TraitMethod,
1381 this: RawVc,
1382 arg: Box<dyn MagicAny>,
1383 persistence: TaskPersistence,
1384 ) -> RawVc {
1385 self.trait_call(trait_method, this, arg, persistence)
1386 }
1387
1388 #[track_caller]
1389 fn run(
1390 &self,
1391 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1392 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1393 let this = self.pin();
1394 Box::pin(async move { this.run(future).await })
1395 }
1396
1397 #[track_caller]
1398 fn run_once(
1399 &self,
1400 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1401 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1402 let this = self.pin();
1403 Box::pin(async move { this.run_once(future).await })
1404 }
1405
1406 #[track_caller]
1407 fn run_once_with_reason(
1408 &self,
1409 reason: StaticOrArc<dyn InvalidationReason>,
1410 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1411 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1412 {
1413 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1414 reason_set.insert(reason);
1415 }
1416 let this = self.pin();
1417 Box::pin(async move { this.run_once(future).await })
1418 }
1419
1420 #[track_caller]
1421 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1422 self.start_once_process(future)
1423 }
1424
1425 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1426 if let Err(e) = self.compilation_events.send(event) {
1427 tracing::warn!("Failed to send compilation event: {e}");
1428 }
1429 }
1430
1431 fn get_task_name(&self, task: TaskId) -> String {
1432 self.backend.get_task_name(task, self)
1433 }
1434}
1435
1436impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1437 #[instrument(level = "info", skip_all, name = "invalidate")]
1438 fn invalidate(&self, task: TaskId) {
1439 self.backend.invalidate_task(task, self);
1440 }
1441
1442 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1443 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1444 {
1445 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1446 reason_set.insert(reason);
1447 }
1448 self.backend.invalidate_task(task, self);
1449 }
1450
1451 fn invalidate_serialization(&self, task: TaskId) {
1452 self.backend.invalidate_serialization(task, self);
1453 }
1454
1455 #[track_caller]
1456 fn try_read_task_output(
1457 &self,
1458 task: TaskId,
1459 options: ReadOutputOptions,
1460 ) -> Result<Result<RawVc, EventListener>> {
1461 if options.consistency == ReadConsistency::Eventual {
1462 debug_assert_not_in_top_level_task("read_task_output");
1463 }
1464 self.backend.try_read_task_output(
1465 task,
1466 current_task_if_available("reading Vcs"),
1467 options,
1468 self,
1469 )
1470 }
1471
1472 #[track_caller]
1473 fn try_read_task_cell(
1474 &self,
1475 task: TaskId,
1476 index: CellId,
1477 options: ReadCellOptions,
1478 ) -> Result<Result<TypedCellContent, EventListener>> {
1479 let reader = current_task_if_available("reading Vcs");
1480 self.backend
1481 .try_read_task_cell(task, index, reader, options, self)
1482 }
1483
1484 fn try_read_own_task_cell(
1485 &self,
1486 current_task: TaskId,
1487 index: CellId,
1488 options: ReadCellOptions,
1489 ) -> Result<TypedCellContent> {
1490 self.backend
1491 .try_read_own_task_cell(current_task, index, options, self)
1492 }
1493
1494 #[track_caller]
1495 fn try_read_local_output(
1496 &self,
1497 execution_id: ExecutionId,
1498 local_task_id: LocalTaskId,
1499 ) -> Result<Result<RawVc, EventListener>> {
1500 debug_assert_not_in_top_level_task("read_local_output");
1501 CURRENT_TASK_STATE.with(|gts| {
1502 let gts_read = gts.read().unwrap();
1503
1504 gts_read.assert_execution_id(execution_id);
1509
1510 match gts_read.get_local_task(local_task_id) {
1511 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1512 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1513 }
1514 })
1515 }
1516
1517 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1518 self.backend.read_task_collectibles(
1521 task,
1522 trait_id,
1523 current_task_if_available("reading collectibles"),
1524 self,
1525 )
1526 }
1527
1528 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1529 self.backend.emit_collectible(
1530 trait_type,
1531 collectible,
1532 current_task("emitting collectible"),
1533 self,
1534 );
1535 }
1536
1537 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1538 self.backend.unemit_collectible(
1539 trait_type,
1540 collectible,
1541 count,
1542 current_task("emitting collectible"),
1543 self,
1544 );
1545 }
1546
1547 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1548 for (&collectible, &count) in collectibles {
1549 if count > 0 {
1550 self.backend.unemit_collectible(
1551 trait_type,
1552 collectible,
1553 count as u32,
1554 current_task("emitting collectible"),
1555 self,
1556 );
1557 }
1558 }
1559 }
1560
1561 fn read_own_task_cell(
1562 &self,
1563 task: TaskId,
1564 index: CellId,
1565 options: ReadCellOptions,
1566 ) -> Result<TypedCellContent> {
1567 self.try_read_own_task_cell(task, index, options)
1568 }
1569
1570 fn update_own_task_cell(
1571 &self,
1572 task: TaskId,
1573 index: CellId,
1574 is_serializable_cell_content: bool,
1575 content: CellContent,
1576 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1577 content_hash: Option<CellHash>,
1578 verification_mode: VerificationMode,
1579 ) {
1580 self.backend.update_task_cell(
1581 task,
1582 index,
1583 is_serializable_cell_content,
1584 content,
1585 updated_key_hashes,
1586 content_hash,
1587 verification_mode,
1588 self,
1589 );
1590 }
1591
1592 fn connect_task(&self, task: TaskId) {
1593 self.backend
1594 .connect_task(task, current_task_if_available("connecting task"), self);
1595 }
1596
1597 fn mark_own_task_as_finished(&self, task: TaskId) {
1598 self.backend.mark_own_task_as_finished(task, self);
1599 }
1600
1601 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1602 self.backend.mark_own_task_as_session_dependent(task, self);
1603 }
1604
1605 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1608 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1611 let fut = tokio::spawn(TURBO_TASKS.scope(
1612 turbo_tasks(),
1613 CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1614 ));
1615 let fut = Box::pin(async move {
1616 fut.await.unwrap();
1617 });
1618 let mut ts = global_task_state.write().unwrap();
1619 ts.local_task_tracker
1620 .get_or_insert_default()
1621 .push(Either::Right(fut));
1622 }
1623
1624 fn task_statistics(&self) -> &TaskStatisticsApi {
1625 self.backend.task_statistics()
1626 }
1627
1628 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1629 let this = self.pin();
1630 Box::pin(async move {
1631 this.stop_and_wait().await;
1632 })
1633 }
1634
1635 fn subscribe_to_compilation_events(
1636 &self,
1637 event_types: Option<Vec<String>>,
1638 ) -> Receiver<Arc<dyn CompilationEvent>> {
1639 self.compilation_events.subscribe(event_types)
1640 }
1641
1642 fn is_tracking_dependencies(&self) -> bool {
1643 self.backend.is_tracking_dependencies()
1644 }
1645}
1646
1647impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1648 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1649 self.pin()
1650 }
1651 fn backend(&self) -> &B {
1652 &self.backend
1653 }
1654
1655 #[track_caller]
1656 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1657 self.schedule_background_job(async move |this| {
1658 this.backend.run_backend_job(job, &*this).await;
1659 this
1660 })
1661 }
1662
1663 #[track_caller]
1664 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1665 self.schedule_foreground_job(async move |this| {
1666 this.backend.run_backend_job(job, &*this).await;
1667 this
1668 })
1669 }
1670
1671 #[track_caller]
1672 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1673 self.schedule(task, priority)
1674 }
1675
1676 fn get_current_task_priority(&self) -> TaskPriority {
1677 CURRENT_TASK_STATE
1678 .try_with(|task_state| task_state.read().unwrap().priority)
1679 .unwrap_or(TaskPriority::initial())
1680 }
1681
1682 fn program_duration_until(&self, instant: Instant) -> Duration {
1683 instant - self.program_start
1684 }
1685
1686 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1687 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1689 }
1690
1691 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1692 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1694 }
1695
1696 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1697 unsafe { self.task_id_factory.reuse(id.into()) }
1698 }
1699
1700 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1701 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1702 }
1703
1704 fn is_idle(&self) -> bool {
1705 self.currently_scheduled_foreground_jobs
1706 .load(Ordering::Acquire)
1707 == 0
1708 }
1709}
1710
1711async fn wait_for_local_tasks() {
1712 while let Some(mut ltt) =
1713 CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1714 {
1715 use futures::StreamExt;
1716 while ltt.next().await.is_some() {}
1717 }
1718}
1719
1720pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1721 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1722 Ok(id) => id,
1723 Err(_) => panic!(
1724 "{from} can only be used in the context of a turbo_tasks task execution or \
1725 turbo_tasks run"
1726 ),
1727 }
1728}
1729
1730pub(crate) fn current_task(from: &str) -> TaskId {
1731 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1732 Ok(Some(id)) => id,
1733 Ok(None) | Err(_) => {
1734 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1735 }
1736 }
1737}
1738
1739#[track_caller]
1742pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1743 if !cfg!(debug_assertions) {
1744 return;
1745 }
1746
1747 let in_top_level = CURRENT_TASK_STATE
1748 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1749 .unwrap_or(true);
1750 if !in_top_level {
1751 panic!("{message}");
1752 }
1753}
1754
1755#[track_caller]
1756pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1757 if !cfg!(debug_assertions) {
1758 return;
1759 }
1760
1761 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1764 .try_with(|&suppressed| suppressed)
1765 .unwrap_or(false);
1766 if suppressed {
1767 return;
1768 }
1769
1770 let in_top_level = CURRENT_TASK_STATE
1771 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1772 .unwrap_or(false);
1773 if in_top_level {
1774 panic!(
1775 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1776 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1777 reads to avoid leaking inconsistent return values."
1778 );
1779 }
1780}
1781
1782pub async fn run<T: Send + 'static>(
1783 tt: Arc<dyn TurboTasksApi>,
1784 future: impl Future<Output = Result<T>> + Send + 'static,
1785) -> Result<T> {
1786 let (tx, rx) = tokio::sync::oneshot::channel();
1787
1788 tt.run(Box::pin(async move {
1789 let result = future.await?;
1790 tx.send(result)
1791 .map_err(|_| anyhow!("unable to send result"))?;
1792 Ok(())
1793 }))
1794 .await?;
1795
1796 Ok(rx.await?)
1797}
1798
1799pub async fn run_once<T: Send + 'static>(
1800 tt: Arc<dyn TurboTasksApi>,
1801 future: impl Future<Output = Result<T>> + Send + 'static,
1802) -> Result<T> {
1803 let (tx, rx) = tokio::sync::oneshot::channel();
1804
1805 tt.run_once(Box::pin(async move {
1806 let result = future.await?;
1807 tx.send(result)
1808 .map_err(|_| anyhow!("unable to send result"))?;
1809 Ok(())
1810 }))
1811 .await?;
1812
1813 Ok(rx.await?)
1814}
1815
1816pub async fn run_once_with_reason<T: Send + 'static>(
1817 tt: Arc<dyn TurboTasksApi>,
1818 reason: impl InvalidationReason,
1819 future: impl Future<Output = Result<T>> + Send + 'static,
1820) -> Result<T> {
1821 let (tx, rx) = tokio::sync::oneshot::channel();
1822
1823 tt.run_once_with_reason(
1824 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1825 Box::pin(async move {
1826 let result = future.await?;
1827 tx.send(result)
1828 .map_err(|_| anyhow!("unable to send result"))?;
1829 Ok(())
1830 }),
1831 )
1832 .await?;
1833
1834 Ok(rx.await?)
1835}
1836
1837pub fn dynamic_call(
1839 func: &'static NativeFunction,
1840 this: Option<RawVc>,
1841 arg: Box<dyn MagicAny>,
1842 persistence: TaskPersistence,
1843) -> RawVc {
1844 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1845}
1846
1847pub fn trait_call(
1849 trait_method: &'static TraitMethod,
1850 this: RawVc,
1851 arg: Box<dyn MagicAny>,
1852 persistence: TaskPersistence,
1853) -> RawVc {
1854 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1855}
1856
1857pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1858 TURBO_TASKS.with(|arc| arc.clone())
1859}
1860
1861pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1862 TURBO_TASKS.with(Arc::downgrade)
1863}
1864
1865pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1866 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1867}
1868
1869pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1870 TURBO_TASKS.with(|arc| func(arc))
1871}
1872
1873pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1874 TURBO_TASKS.sync_scope(tt, f)
1875}
1876
1877pub fn turbo_tasks_future_scope<T>(
1878 tt: Arc<dyn TurboTasksApi>,
1879 f: impl Future<Output = T>,
1880) -> impl Future<Output = T> {
1881 TURBO_TASKS.scope(tt, f)
1882}
1883
1884pub fn with_turbo_tasks_for_testing<T>(
1885 tt: Arc<dyn TurboTasksApi>,
1886 current_task: TaskId,
1887 execution_id: ExecutionId,
1888 f: impl Future<Output = T>,
1889) -> impl Future<Output = T> {
1890 TURBO_TASKS.scope(
1891 tt,
1892 CURRENT_TASK_STATE.scope(
1893 Arc::new(RwLock::new(CurrentTaskState::new(
1894 current_task,
1895 execution_id,
1896 TaskPriority::initial(),
1897 false, ))),
1899 f,
1900 ),
1901 )
1902}
1903
1904pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1909 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1910}
1911
1912pub fn current_task_for_testing() -> Option<TaskId> {
1913 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1914}
1915
1916pub fn mark_session_dependent() {
1918 with_turbo_tasks(|tt| {
1919 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1920 });
1921}
1922
1923pub fn mark_finished() {
1926 with_turbo_tasks(|tt| {
1927 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1928 });
1929}
1930
1931pub fn get_serialization_invalidator() -> SerializationInvalidator {
1937 CURRENT_TASK_STATE.with(|cell| {
1938 let CurrentTaskState {
1939 task_id,
1940 #[cfg(feature = "verify_determinism")]
1941 stateful,
1942 ..
1943 } = &mut *cell.write().unwrap();
1944 #[cfg(feature = "verify_determinism")]
1945 {
1946 *stateful = true;
1947 }
1948 let Some(task_id) = *task_id else {
1949 panic!(
1950 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1951 task execution"
1952 );
1953 };
1954 SerializationInvalidator::new(task_id)
1955 })
1956}
1957
1958pub fn mark_invalidator() {
1959 CURRENT_TASK_STATE.with(|cell| {
1960 let CurrentTaskState {
1961 has_invalidator, ..
1962 } = &mut *cell.write().unwrap();
1963 *has_invalidator = true;
1964 })
1965}
1966
1967pub fn mark_stateful() {
1973 #[cfg(feature = "verify_determinism")]
1974 {
1975 CURRENT_TASK_STATE.with(|cell| {
1976 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1977 *stateful = true;
1978 })
1979 }
1980 }
1982
1983pub fn mark_top_level_task() {
1987 if cfg!(debug_assertions) {
1988 CURRENT_TASK_STATE.with(|cell| {
1989 cell.write().unwrap().in_top_level_task = true;
1990 })
1991 }
1992}
1993
1994pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
2005 if cfg!(debug_assertions) {
2006 CURRENT_TASK_STATE.with(|cell| {
2007 cell.write().unwrap().in_top_level_task = false;
2008 })
2009 }
2010}
2011
2012pub fn prevent_gc() {
2013 }
2015
2016pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
2017 with_turbo_tasks(|tt| {
2018 let raw_vc = collectible.node.node;
2019 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2020 })
2021}
2022
2023pub(crate) async fn read_task_output(
2024 this: &dyn TurboTasksApi,
2025 id: TaskId,
2026 options: ReadOutputOptions,
2027) -> Result<RawVc> {
2028 loop {
2029 match this.try_read_task_output(id, options)? {
2030 Ok(result) => return Ok(result),
2031 Err(listener) => listener.await,
2032 }
2033 }
2034}
2035
2036#[derive(Clone, Copy)]
2042pub struct CurrentCellRef {
2043 current_task: TaskId,
2044 index: CellId,
2045 is_serializable_cell_content: bool,
2046}
2047
2048type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2049
2050impl CurrentCellRef {
2051 fn conditional_update<T>(
2053 &self,
2054 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2055 ) where
2056 T: VcValueType,
2057 {
2058 self.conditional_update_with_shared_reference(|old_shared_reference| {
2059 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2060 let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2061 Some((
2062 SharedReference::new(triomphe::Arc::new(new_value)),
2063 updated_key_hashes,
2064 content_hash,
2065 ))
2066 })
2067 }
2068
2069 fn conditional_update_with_shared_reference(
2071 &self,
2072 functor: impl FnOnce(
2073 Option<&SharedReference>,
2074 ) -> Option<(
2075 SharedReference,
2076 Option<SmallVec<[u64; 2]>>,
2077 Option<CellHash>,
2078 )>,
2079 ) {
2080 let tt = turbo_tasks();
2081 let cell_content = tt
2082 .read_own_task_cell(
2083 self.current_task,
2084 self.index,
2085 ReadCellOptions {
2086 tracking: ReadCellTracking::Untracked,
2088 is_serializable_cell_content: self.is_serializable_cell_content,
2089 final_read_hint: false,
2090 },
2091 )
2092 .ok();
2093 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2094 if let Some((update, updated_key_hashes, content_hash)) = update {
2095 tt.update_own_task_cell(
2096 self.current_task,
2097 self.index,
2098 self.is_serializable_cell_content,
2099 CellContent(Some(update)),
2100 updated_key_hashes,
2101 content_hash,
2102 VerificationMode::EqualityCheck,
2103 )
2104 }
2105 }
2106
2107 pub fn compare_and_update<T>(&self, new_value: T)
2141 where
2142 T: PartialEq + VcValueType,
2143 {
2144 self.conditional_update(|old_value| {
2145 if let Some(old_value) = old_value
2146 && old_value == &new_value
2147 {
2148 return None;
2149 }
2150 Some((new_value, None, None))
2151 });
2152 }
2153
2154 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2162 where
2163 T: VcValueType + PartialEq,
2164 {
2165 self.conditional_update_with_shared_reference(|old_sr| {
2166 if let Some(old_sr) = old_sr {
2167 let old_value = extract_sr_value::<T>(old_sr);
2168 let new_value = extract_sr_value::<T>(&new_shared_reference);
2169 if old_value == new_value {
2170 return None;
2171 }
2172 }
2173 Some((new_shared_reference, None, None))
2174 });
2175 }
2176
2177 pub fn hashed_compare_and_update<T>(&self, new_value: T)
2186 where
2187 T: PartialEq + DeterministicHash + VcValueType,
2188 {
2189 self.conditional_update(|old_value| {
2190 if let Some(old_value) = old_value
2191 && old_value == &new_value
2192 {
2193 return None;
2194 }
2195 let content_hash = hash_xxh3_hash128(&new_value);
2196 Some((new_value, None, Some(content_hash)))
2197 });
2198 }
2199
2200 pub fn hashed_compare_and_update_with_shared_reference<T>(
2206 &self,
2207 new_shared_reference: SharedReference,
2208 ) where
2209 T: VcValueType + PartialEq + DeterministicHash,
2210 {
2211 self.conditional_update_with_shared_reference(move |old_sr| {
2212 if let Some(old_sr) = old_sr {
2213 let old_value = extract_sr_value::<T>(old_sr);
2214 let new_value = extract_sr_value::<T>(&new_shared_reference);
2215 if old_value == new_value {
2216 return None;
2217 }
2218 }
2219 let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2220 Some((new_shared_reference, None, Some(content_hash)))
2221 });
2222 }
2223
2224 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2226 where
2227 T: PartialEq + VcValueType,
2228 VcReadTarget<T>: KeyedEq,
2229 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2230 {
2231 self.conditional_update(|old_value| {
2232 let Some(old_value) = old_value else {
2233 return Some((new_value, None, None));
2234 };
2235 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2236 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2237 let updated_keys = old_value.different_keys(new_value_ref);
2238 if updated_keys.is_empty() {
2239 return None;
2240 }
2241 let updated_key_hashes = updated_keys
2243 .into_iter()
2244 .map(|key| FxBuildHasher.hash_one(key))
2245 .collect();
2246 Some((new_value, Some(updated_key_hashes), None))
2247 });
2248 }
2249
2250 pub fn keyed_compare_and_update_with_shared_reference<T>(
2253 &self,
2254 new_shared_reference: SharedReference,
2255 ) where
2256 T: VcValueType + PartialEq,
2257 VcReadTarget<T>: KeyedEq,
2258 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2259 {
2260 self.conditional_update_with_shared_reference(|old_sr| {
2261 let Some(old_sr) = old_sr else {
2262 return Some((new_shared_reference, None, None));
2263 };
2264 let old_value = extract_sr_value::<T>(old_sr);
2265 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2266 let new_value = extract_sr_value::<T>(&new_shared_reference);
2267 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2268 let updated_keys = old_value.different_keys(new_value);
2269 if updated_keys.is_empty() {
2270 return None;
2271 }
2272 let updated_key_hashes = updated_keys
2274 .into_iter()
2275 .map(|key| FxBuildHasher.hash_one(key))
2276 .collect();
2277 Some((new_shared_reference, Some(updated_key_hashes), None))
2278 });
2279 }
2280
2281 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2283 where
2284 T: VcValueType,
2285 {
2286 let tt = turbo_tasks();
2287 tt.update_own_task_cell(
2288 self.current_task,
2289 self.index,
2290 self.is_serializable_cell_content,
2291 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2292 None,
2293 None,
2294 verification_mode,
2295 )
2296 }
2297
2298 pub fn update_with_shared_reference(
2306 &self,
2307 shared_ref: SharedReference,
2308 verification_mode: VerificationMode,
2309 ) {
2310 let tt = turbo_tasks();
2311 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2312 let content = tt
2313 .read_own_task_cell(
2314 self.current_task,
2315 self.index,
2316 ReadCellOptions {
2317 tracking: ReadCellTracking::Untracked,
2319 is_serializable_cell_content: self.is_serializable_cell_content,
2320 final_read_hint: false,
2321 },
2322 )
2323 .ok();
2324 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2325 shared_ref_exp != shared_ref
2327 } else {
2328 true
2329 }
2330 } else {
2331 true
2332 };
2333 if update {
2334 tt.update_own_task_cell(
2335 self.current_task,
2336 self.index,
2337 self.is_serializable_cell_content,
2338 CellContent(Some(shared_ref)),
2339 None,
2340 None,
2341 verification_mode,
2342 )
2343 }
2344 }
2345}
2346
2347impl From<CurrentCellRef> for RawVc {
2348 fn from(cell: CurrentCellRef) -> Self {
2349 RawVc::TaskCell(cell.current_task, cell.index)
2350 }
2351}
2352
2353fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2354 sr.0.downcast_ref::<T>()
2355 .expect("cannot update SharedReference of different type")
2356}
2357
2358pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2359 find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2360}
2361
2362pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2363 CURRENT_TASK_STATE.with(|ts| {
2364 let current_task = current_task("celling turbo_tasks values");
2365 let mut ts = ts.write().unwrap();
2366 let map = ts.cell_counters.as_mut().unwrap();
2367 let current_index = map.entry(ty).or_default();
2368 let index = *current_index;
2369 *current_index += 1;
2370 CurrentCellRef {
2371 current_task,
2372 index: CellId { type_id: ty, index },
2373 is_serializable_cell_content,
2374 }
2375 })
2376}
2377
2378pub(crate) async fn read_local_output(
2379 this: &dyn TurboTasksApi,
2380 execution_id: ExecutionId,
2381 local_task_id: LocalTaskId,
2382) -> Result<RawVc> {
2383 loop {
2384 match this.try_read_local_output(execution_id, local_task_id)? {
2385 Ok(raw_vc) => return Ok(raw_vc),
2386 Err(event_listener) => event_listener.await,
2387 }
2388 }
2389}