1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 pin::Pin,
8 sync::{
9 Arc, Mutex, RwLock, Weak,
10 atomic::{AtomicBool, AtomicUsize, Ordering},
11 },
12 time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
26
27use crate::{
28 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
29 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
30 VcValueTrait, VcValueType,
31 backend::{
32 Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
33 TurboTasksExecutionError, TypedCellContent, VerificationMode,
34 },
35 capture_future::CaptureFuture,
36 dyn_task_inputs::StackDynTaskInputs,
37 event::{Event, EventListener},
38 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
39 id_factory::IdFactoryWithReuse,
40 keyed::KeyedEq,
41 macro_helpers::NativeFunction,
42 message_queue::{CompilationEvent, CompilationEventQueue},
43 priority_runner::{Executor, JoinHandle, PriorityRunner},
44 raw_vc::{CellId, RawVc},
45 registry,
46 serialization_invalidation::SerializationInvalidator,
47 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
48 task_statistics::TaskStatisticsApi,
49 trace::TraceRawVcs,
50 util::{IdFactory, StaticOrArc},
51};
52
53pub trait TurboTasksCallApi: Sync + Send {
56 fn dynamic_call(
59 &self,
60 native_fn: &'static NativeFunction,
61 this: Option<RawVc>,
62 arg: &mut dyn StackDynTaskInputs,
63 persistence: TaskPersistence,
64 ) -> RawVc;
65 fn native_call(
68 &self,
69 native_fn: &'static NativeFunction,
70 this: Option<RawVc>,
71 arg: &mut dyn StackDynTaskInputs,
72 persistence: TaskPersistence,
73 ) -> RawVc;
74 fn trait_call(
77 &self,
78 trait_method: &'static TraitMethod,
79 this: RawVc,
80 arg: &mut dyn StackDynTaskInputs,
81 persistence: TaskPersistence,
82 ) -> RawVc;
83
84 fn run(
85 &self,
86 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
88 fn run_once(
89 &self,
90 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
91 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
92 fn run_once_with_reason(
93 &self,
94 reason: StaticOrArc<dyn InvalidationReason>,
95 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
96 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
97 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
98
99 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
101
102 fn get_task_name(&self, task: TaskId) -> String;
104}
105
106pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
112 fn invalidate(&self, task: TaskId);
113 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
114
115 fn invalidate_serialization(&self, task: TaskId);
116
117 fn try_read_task_output(
118 &self,
119 task: TaskId,
120 options: ReadOutputOptions,
121 ) -> Result<Result<RawVc, EventListener>>;
122
123 fn try_read_task_cell(
124 &self,
125 task: TaskId,
126 index: CellId,
127 options: ReadCellOptions,
128 ) -> Result<Result<TypedCellContent, EventListener>>;
129
130 fn try_read_local_output(
145 &self,
146 execution_id: ExecutionId,
147 local_task_id: LocalTaskId,
148 ) -> Result<Result<RawVc, EventListener>>;
149
150 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
151
152 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
153 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
154 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
155
156 fn try_read_own_task_cell(
159 &self,
160 current_task: TaskId,
161 index: CellId,
162 ) -> Result<TypedCellContent>;
163
164 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
165 fn update_own_task_cell(
166 &self,
167 task: TaskId,
168 index: CellId,
169 content: CellContent,
170 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
171 content_hash: Option<CellHash>,
172 verification_mode: VerificationMode,
173 );
174 fn mark_own_task_as_finished(&self, task: TaskId);
175 fn mark_own_task_as_session_dependent(&self, task: TaskId);
176
177 fn connect_task(&self, task: TaskId);
178
179 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
184
185 fn task_statistics(&self) -> &TaskStatisticsApi;
186
187 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
188
189 fn subscribe_to_compilation_events(
190 &self,
191 event_types: Option<Vec<String>>,
192 ) -> Receiver<Arc<dyn CompilationEvent>>;
193
194 fn is_tracking_dependencies(&self) -> bool;
196}
197
198pub struct Unused<T> {
200 inner: T,
201}
202
203impl<T> Unused<T> {
204 pub unsafe fn new_unchecked(inner: T) -> Self {
210 Self { inner }
211 }
212
213 pub unsafe fn get_unchecked(&self) -> &T {
219 &self.inner
220 }
221
222 pub fn into(self) -> T {
224 self.inner
225 }
226}
227
228pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
230 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
231
232 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
233 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
234 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
238 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
242
243 fn schedule(&self, task: TaskId, priority: TaskPriority);
245
246 fn get_current_task_priority(&self) -> TaskPriority;
248
249 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
251
252 fn schedule_backend_background_job(&self, job: B::BackendJob);
257
258 fn program_duration_until(&self, instant: Instant) -> Duration;
260
261 fn is_idle(&self) -> bool;
263
264 fn backend(&self) -> &B;
266}
267
268#[allow(clippy::manual_non_exhaustive)]
269pub struct UpdateInfo {
270 pub duration: Duration,
271 pub tasks: usize,
272 pub reasons: InvalidationReasonSet,
273 #[allow(dead_code)]
274 placeholder_for_future_fields: (),
275}
276
277#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
278pub enum TaskPersistence {
279 Persistent,
281
282 Transient,
289}
290
291impl Display for TaskPersistence {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 match self {
294 TaskPersistence::Persistent => write!(f, "persistent"),
295 TaskPersistence::Transient => write!(f, "transient"),
296 }
297 }
298}
299
300#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
301pub enum ReadConsistency {
302 #[default]
305 Eventual,
306 Strong,
311}
312
313#[derive(Clone, Copy, Debug, Eq, PartialEq)]
314pub enum ReadCellTracking {
315 Tracked {
317 key: Option<u64>,
319 },
320 TrackOnlyError,
325 Untracked,
330}
331
332impl ReadCellTracking {
333 pub fn should_track(&self, is_err: bool) -> bool {
334 match self {
335 ReadCellTracking::Tracked { .. } => true,
336 ReadCellTracking::TrackOnlyError => is_err,
337 ReadCellTracking::Untracked => false,
338 }
339 }
340
341 pub fn key(&self) -> Option<u64> {
342 match self {
343 ReadCellTracking::Tracked { key } => *key,
344 ReadCellTracking::TrackOnlyError => None,
345 ReadCellTracking::Untracked => None,
346 }
347 }
348}
349
350impl Default for ReadCellTracking {
351 fn default() -> Self {
352 ReadCellTracking::Tracked { key: None }
353 }
354}
355
356impl Display for ReadCellTracking {
357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358 match self {
359 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
360 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
361 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
362 ReadCellTracking::Untracked => write!(f, "untracked"),
363 }
364 }
365}
366
367#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
368pub enum ReadTracking {
369 #[default]
371 Tracked,
372 TrackOnlyError,
377 Untracked,
382}
383
384impl ReadTracking {
385 pub fn should_track(&self, is_err: bool) -> bool {
386 match self {
387 ReadTracking::Tracked => true,
388 ReadTracking::TrackOnlyError => is_err,
389 ReadTracking::Untracked => false,
390 }
391 }
392}
393
394impl Display for ReadTracking {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 match self {
397 ReadTracking::Tracked => write!(f, "tracked"),
398 ReadTracking::TrackOnlyError => write!(f, "track only error"),
399 ReadTracking::Untracked => write!(f, "untracked"),
400 }
401 }
402}
403
404#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
405pub enum TaskPriority {
406 #[default]
407 Initial,
408 Invalidation {
409 priority: Reverse<u32>,
410 },
411}
412
413impl TaskPriority {
414 pub fn invalidation(priority: u32) -> Self {
415 Self::Invalidation {
416 priority: Reverse(priority),
417 }
418 }
419
420 pub fn initial() -> Self {
421 Self::Initial
422 }
423
424 pub fn leaf() -> Self {
425 Self::Invalidation {
426 priority: Reverse(0),
427 }
428 }
429
430 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
431 match self {
432 TaskPriority::Initial => parent_priority,
433 TaskPriority::Invalidation { priority } => {
434 if let TaskPriority::Invalidation {
435 priority: parent_priority,
436 } = parent_priority
437 && priority.0 < parent_priority.0
438 {
439 Self::Invalidation {
440 priority: Reverse(parent_priority.0.saturating_add(1)),
441 }
442 } else {
443 *self
444 }
445 }
446 }
447 }
448}
449
450impl Display for TaskPriority {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 match self {
453 TaskPriority::Initial => write!(f, "initial"),
454 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
455 }
456 }
457}
458
459enum ScheduledTask {
460 Task {
461 task_id: TaskId,
462 span: Span,
463 },
464 LocalTask {
465 ty: LocalTaskSpec,
466 persistence: TaskPersistence,
467 local_task_id: LocalTaskId,
468 global_task_state: Arc<RwLock<CurrentTaskState>>,
469 span: Span,
470 },
471}
472
473pub struct TurboTasks<B: Backend + 'static> {
474 this: Weak<Self>,
475 backend: B,
476 task_id_factory: IdFactoryWithReuse<TaskId>,
477 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
478 execution_id_factory: IdFactory<ExecutionId>,
479 stopped: AtomicBool,
480 currently_scheduled_foreground_jobs: AtomicUsize,
481 currently_scheduled_background_jobs: AtomicUsize,
482 scheduled_tasks: AtomicUsize,
483 priority_runner:
484 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
485 start: Mutex<Option<Instant>>,
486 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
487 event_foreground_start: Event,
489 event_foreground_done: Event,
492 event_background_done: Event,
494 program_start: Instant,
495 compilation_events: CompilationEventQueue,
496}
497
498type LocalTaskTracker = Option<
499 FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
500>;
501
502struct CurrentTaskState {
511 task_id: Option<TaskId>,
512 execution_id: ExecutionId,
513 priority: TaskPriority,
514
515 #[cfg(feature = "verify_determinism")]
518 stateful: bool,
519
520 has_invalidator: bool,
522
523 in_top_level_task: bool,
526
527 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
532
533 local_tasks: Vec<LocalTask>,
535
536 local_task_tracker: LocalTaskTracker,
539}
540
541impl CurrentTaskState {
542 fn new(
543 task_id: TaskId,
544 execution_id: ExecutionId,
545 priority: TaskPriority,
546 in_top_level_task: bool,
547 ) -> Self {
548 Self {
549 task_id: Some(task_id),
550 execution_id,
551 priority,
552 #[cfg(feature = "verify_determinism")]
553 stateful: false,
554 has_invalidator: false,
555 in_top_level_task,
556 cell_counters: Some(AutoMap::default()),
557 local_tasks: Vec::new(),
558 local_task_tracker: None,
559 }
560 }
561
562 fn new_temporary(
563 execution_id: ExecutionId,
564 priority: TaskPriority,
565 in_top_level_task: bool,
566 ) -> Self {
567 Self {
568 task_id: None,
569 execution_id,
570 priority,
571 #[cfg(feature = "verify_determinism")]
572 stateful: false,
573 has_invalidator: false,
574 in_top_level_task,
575 cell_counters: None,
576 local_tasks: Vec::new(),
577 local_task_tracker: None,
578 }
579 }
580
581 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
582 if self.execution_id != expected_execution_id {
583 panic!(
584 "Local tasks can only be scheduled/awaited within the same execution of the \
585 parent task that created them"
586 );
587 }
588 }
589
590 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
591 self.local_tasks.push(local_task);
592 if cfg!(debug_assertions) {
594 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
595 } else {
596 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
597 }
598 }
599
600 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
601 &self.local_tasks[(*local_task_id as usize) - 1]
603 }
604
605 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
606 &mut self.local_tasks[(*local_task_id as usize) - 1]
607 }
608}
609
610task_local! {
612 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
614
615 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
616
617 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
622}
623
624impl<B: Backend + 'static> TurboTasks<B> {
625 pub fn new(backend: B) -> Arc<Self> {
631 let task_id_factory = IdFactoryWithReuse::new(
632 TaskId::MIN,
633 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
634 );
635 let transient_task_id_factory =
636 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
637 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
638 let this = Arc::new_cyclic(|this| Self {
639 this: this.clone(),
640 backend,
641 task_id_factory,
642 transient_task_id_factory,
643 execution_id_factory,
644 stopped: AtomicBool::new(false),
645 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
646 currently_scheduled_background_jobs: AtomicUsize::new(0),
647 scheduled_tasks: AtomicUsize::new(0),
648 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
649 start: Default::default(),
650 aggregated_update: Default::default(),
651 event_foreground_done: Event::new(|| {
652 || "TurboTasks::event_foreground_done".to_string()
653 }),
654 event_foreground_start: Event::new(|| {
655 || "TurboTasks::event_foreground_start".to_string()
656 }),
657 event_background_done: Event::new(|| {
658 || "TurboTasks::event_background_done".to_string()
659 }),
660 program_start: Instant::now(),
661 compilation_events: CompilationEventQueue::default(),
662 });
663 this.backend.startup(&*this);
664 this
665 }
666
667 pub fn pin(&self) -> Arc<Self> {
668 self.this.upgrade().unwrap()
669 }
670
671 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
673 where
674 T: ?Sized,
675 F: Fn() -> Fut + Send + Sync + Clone + 'static,
676 Fut: Future<Output = Result<Vc<T>>> + Send,
677 {
678 let id = self.backend.create_transient_task(
679 TransientTaskType::Root(Box::new(move || {
680 let functor = functor.clone();
681 Box::pin(async move {
682 mark_top_level_task();
683 let raw_vc = functor().await?.node;
684 raw_vc.to_non_local().await
685 })
686 })),
687 self,
688 );
689 self.schedule(id, TaskPriority::initial());
690 id
691 }
692
693 pub fn dispose_root_task(&self, task_id: TaskId) {
694 self.backend.dispose_root_task(task_id, self);
695 }
696
697 #[track_caller]
701 fn spawn_once_task<T, Fut>(&self, future: Fut)
702 where
703 T: ?Sized,
704 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
705 {
706 let id = self.backend.create_transient_task(
707 TransientTaskType::Once(Box::pin(async move {
708 mark_top_level_task();
709 let raw_vc = future.await?.node;
710 raw_vc.to_non_local().await
711 })),
712 self,
713 );
714 self.schedule(id, TaskPriority::initial());
715 }
716
717 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
718 &self,
719 future: impl Future<Output = Result<T>> + Send + 'static,
720 ) -> Result<T> {
721 let (tx, rx) = tokio::sync::oneshot::channel();
722 self.spawn_once_task(async move {
723 mark_top_level_task();
724 let result = future.await;
725 tx.send(result)
726 .map_err(|_| anyhow!("unable to send result"))?;
727 Ok(Completion::new())
728 });
729
730 rx.await?
731 }
732
733 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
734 pub async fn run<T: TraceRawVcs + Send + 'static>(
735 &self,
736 future: impl Future<Output = Result<T>> + Send + 'static,
737 ) -> Result<T, TurboTasksExecutionError> {
738 self.begin_foreground_job();
739 let execution_id = self.execution_id_factory.wrapping_get();
741 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
742 execution_id,
743 TaskPriority::initial(),
744 true, )));
746
747 let result = TURBO_TASKS
748 .scope(
749 self.pin(),
750 CURRENT_TASK_STATE.scope(current_task_state, async {
751 let result = CaptureFuture::new(future).await;
752
753 wait_for_local_tasks().await;
755
756 match result {
757 Ok(Ok(raw_vc)) => Ok(raw_vc),
758 Ok(Err(err)) => Err(err.into()),
759 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
760 }
761 }),
762 )
763 .await;
764 self.finish_foreground_job();
765 result
766 }
767
768 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
769 let this = self.pin();
770 tokio::spawn(async move {
771 this.pin()
772 .run_once(async move {
773 this.finish_foreground_job();
774 future.await;
775 this.begin_foreground_job();
776 Ok(())
777 })
778 .await
779 .unwrap()
780 });
781 }
782
783 pub(crate) fn native_call(
784 &self,
785 native_fn: &'static NativeFunction,
786 this: Option<RawVc>,
787 arg: &mut dyn StackDynTaskInputs,
788 persistence: TaskPersistence,
789 ) -> RawVc {
790 RawVc::TaskOutput(self.backend.get_or_create_task(
791 native_fn,
792 this,
793 arg,
794 current_task_if_available("turbo_function calls"),
795 persistence,
796 self,
797 ))
798 }
799
800 pub fn dynamic_call(
801 &self,
802 native_fn: &'static NativeFunction,
803 this: Option<RawVc>,
804 arg: &mut dyn StackDynTaskInputs,
805 persistence: TaskPersistence,
806 ) -> RawVc {
807 if this.is_none_or(|this| this.is_resolved())
808 && native_fn.arg_meta.is_resolved(arg.as_ref())
809 {
810 return self.native_call(native_fn, this, arg, persistence);
811 }
812 let arg = arg.take_box();
814 let task_type = LocalTaskSpec {
815 task_type: LocalTaskType::ResolveNative { native_fn },
816 this,
817 arg,
818 };
819 self.schedule_local_task(task_type, persistence)
820 }
821
822 pub fn trait_call(
823 &self,
824 trait_method: &'static TraitMethod,
825 this: RawVc,
826 arg: &mut dyn StackDynTaskInputs,
827 persistence: TaskPersistence,
828 ) -> RawVc {
829 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
833 match registry::get_value_type(type_id).get_trait_method(trait_method) {
834 Some(native_fn) => {
835 if let Some(filter) = native_fn.arg_meta.filter_owned {
836 let mut arg = (filter)(arg);
837 return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
838 } else {
839 return self.dynamic_call(native_fn, Some(this), arg, persistence);
840 }
841 }
842 None => {
843 }
847 }
848 }
849
850 let task_type = LocalTaskSpec {
852 task_type: LocalTaskType::ResolveTrait { trait_method },
853 this: Some(this),
854 arg: arg.take_box(),
855 };
856
857 self.schedule_local_task(task_type, persistence)
858 }
859
860 #[track_caller]
861 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
862 self.begin_foreground_job();
863 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
864
865 self.priority_runner.schedule(
866 &self.pin(),
867 ScheduledTask::Task {
868 task_id,
869 span: Span::current(),
870 },
871 priority,
872 );
873 }
874
875 fn schedule_local_task(
876 &self,
877 ty: LocalTaskSpec,
878 persistence: TaskPersistence,
880 ) -> RawVc {
881 let task_type = ty.task_type;
882 let (global_task_state, execution_id, priority, local_task_id) =
883 CURRENT_TASK_STATE.with(|gts| {
884 let mut gts_write = gts.write().unwrap();
885 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
886 done_event: Event::new(move || {
887 move || format!("LocalTask({task_type})::done_event")
888 }),
889 });
890 (
891 Arc::clone(gts),
892 gts_write.execution_id,
893 gts_write.priority,
894 local_task_id,
895 )
896 });
897
898 let future = self.priority_runner.schedule_with_join_handle(
899 &self.pin(),
900 ScheduledTask::LocalTask {
901 ty,
902 persistence,
903 local_task_id,
904 global_task_state: global_task_state.clone(),
905 span: Span::current(),
906 },
907 priority,
908 );
909 global_task_state
910 .write()
911 .unwrap()
912 .local_task_tracker
913 .get_or_insert_default()
914 .push(Either::Left(future));
915
916 RawVc::LocalOutput(execution_id, local_task_id, persistence)
917 }
918
919 fn begin_foreground_job(&self) {
920 if self
921 .currently_scheduled_foreground_jobs
922 .fetch_add(1, Ordering::AcqRel)
923 == 0
924 {
925 *self.start.lock().unwrap() = Some(Instant::now());
926 self.event_foreground_start.notify(usize::MAX);
927 self.backend.idle_end(self);
928 }
929 }
930
931 fn finish_foreground_job(&self) {
932 if self
933 .currently_scheduled_foreground_jobs
934 .fetch_sub(1, Ordering::AcqRel)
935 == 1
936 {
937 self.backend.idle_start(self);
938 let total = self.scheduled_tasks.load(Ordering::Acquire);
941 self.scheduled_tasks.store(0, Ordering::Release);
942 if let Some(start) = *self.start.lock().unwrap() {
943 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
944 if let Some(update) = update.as_mut() {
945 update.0 += start.elapsed();
946 update.1 += total;
947 } else {
948 *update = Some((start.elapsed(), total));
949 }
950 }
951 self.event_foreground_done.notify(usize::MAX);
952 }
953 }
954
955 fn begin_background_job(&self) {
956 self.currently_scheduled_background_jobs
957 .fetch_add(1, Ordering::Relaxed);
958 }
959
960 fn finish_background_job(&self) {
961 if self
962 .currently_scheduled_background_jobs
963 .fetch_sub(1, Ordering::Relaxed)
964 == 1
965 {
966 self.event_background_done.notify(usize::MAX);
967 }
968 }
969
970 pub fn get_in_progress_count(&self) -> usize {
971 self.currently_scheduled_foreground_jobs
972 .load(Ordering::Acquire)
973 }
974
975 pub async fn wait_task_completion(
987 &self,
988 id: TaskId,
989 consistency: ReadConsistency,
990 ) -> Result<()> {
991 read_task_output(
992 self,
993 id,
994 ReadOutputOptions {
995 tracking: ReadTracking::Untracked,
997 consistency,
998 },
999 )
1000 .await?;
1001 Ok(())
1002 }
1003
1004 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
1007 self.aggregated_update_info(aggregation, Duration::MAX)
1008 .await
1009 .unwrap()
1010 }
1011
1012 pub async fn aggregated_update_info(
1016 &self,
1017 aggregation: Duration,
1018 timeout: Duration,
1019 ) -> Option<UpdateInfo> {
1020 let listener = self
1021 .event_foreground_done
1022 .listen_with_note(|| || "wait for update info".to_string());
1023 let wait_for_finish = {
1024 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1025 if aggregation.is_zero() {
1026 if let Some((duration, tasks)) = update.take() {
1027 return Some(UpdateInfo {
1028 duration,
1029 tasks,
1030 reasons: take(reason_set),
1031 placeholder_for_future_fields: (),
1032 });
1033 } else {
1034 true
1035 }
1036 } else {
1037 update.is_none()
1038 }
1039 };
1040 if wait_for_finish {
1041 if timeout == Duration::MAX {
1042 listener.await;
1044 } else {
1045 let start_listener = self
1047 .event_foreground_start
1048 .listen_with_note(|| || "wait for update info".to_string());
1049 if self
1050 .currently_scheduled_foreground_jobs
1051 .load(Ordering::Acquire)
1052 == 0
1053 {
1054 start_listener.await;
1055 } else {
1056 drop(start_listener);
1057 }
1058 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1059 return None;
1061 }
1062 }
1063 }
1064 if !aggregation.is_zero() {
1065 loop {
1066 select! {
1067 () = tokio::time::sleep(aggregation) => {
1068 break;
1069 }
1070 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1071 }
1073 }
1074 }
1075 }
1076 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1077 if let Some((duration, tasks)) = update.take() {
1078 Some(UpdateInfo {
1079 duration,
1080 tasks,
1081 reasons: take(reason_set),
1082 placeholder_for_future_fields: (),
1083 })
1084 } else {
1085 panic!("aggregated_update_info must not called concurrently")
1086 }
1087 }
1088
1089 pub async fn wait_background_done(&self) {
1090 let listener = self.event_background_done.listen();
1091 if self
1092 .currently_scheduled_background_jobs
1093 .load(Ordering::Acquire)
1094 != 0
1095 {
1096 listener.await;
1097 }
1098 }
1099
1100 pub async fn stop_and_wait(&self) {
1101 turbo_tasks_future_scope(self.pin(), async move {
1102 self.backend.stopping(self);
1103 self.stopped.store(true, Ordering::Release);
1104 {
1105 let listener = self
1106 .event_foreground_done
1107 .listen_with_note(|| || "wait for stop".to_string());
1108 if self
1109 .currently_scheduled_foreground_jobs
1110 .load(Ordering::Acquire)
1111 != 0
1112 {
1113 listener.await;
1114 }
1115 }
1116 {
1117 let listener = self.event_background_done.listen();
1118 if self
1119 .currently_scheduled_background_jobs
1120 .load(Ordering::Acquire)
1121 != 0
1122 {
1123 listener.await;
1124 }
1125 }
1126 self.backend.stop(self);
1127 })
1128 .await;
1129 }
1130
1131 #[track_caller]
1132 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1133 where
1134 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1135 T::CallOnceFuture: Send,
1136 {
1137 let mut this = self.pin();
1138 this.begin_foreground_job();
1139 tokio::spawn(
1140 TURBO_TASKS
1141 .scope(this.clone(), async move {
1142 if !this.stopped.load(Ordering::Acquire) {
1143 this = func(this.clone()).await;
1144 }
1145 this.finish_foreground_job();
1146 })
1147 .in_current_span(),
1148 );
1149 }
1150
1151 #[track_caller]
1152 pub(crate) fn schedule_background_job<T>(&self, func: T)
1153 where
1154 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1155 T::CallOnceFuture: Send,
1156 {
1157 let mut this = self.pin();
1158 self.begin_background_job();
1159 tokio::spawn(
1160 TURBO_TASKS
1161 .scope(this.clone(), async move {
1162 if !this.stopped.load(Ordering::Acquire) {
1163 this = func(this).await;
1164 }
1165 this.finish_background_job();
1166 })
1167 .in_current_span(),
1168 );
1169 }
1170
1171 fn finish_current_task_state(&self) -> FinishedTaskState {
1172 CURRENT_TASK_STATE.with(|cell| {
1173 let current_task_state = &*cell.write().unwrap();
1174 FinishedTaskState {
1175 #[cfg(feature = "verify_determinism")]
1176 stateful: current_task_state.stateful,
1177 has_invalidator: current_task_state.has_invalidator,
1178 }
1179 })
1180 }
1181
1182 pub fn backend(&self) -> &B {
1183 &self.backend
1184 }
1185}
1186
1187struct TurboTasksExecutor;
1188
1189impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1190 type Future = impl Future<Output = ()> + Send + 'static;
1191
1192 fn execute(
1193 &self,
1194 this: &Arc<TurboTasks<B>>,
1195 scheduled_task: ScheduledTask,
1196 priority: TaskPriority,
1197 ) -> Self::Future {
1198 match scheduled_task {
1199 ScheduledTask::Task { task_id, span } => {
1200 let this2 = this.clone();
1201 let this = this.clone();
1202 let future = async move {
1203 let mut schedule_again = true;
1204 while schedule_again {
1205 let execution_id = this.execution_id_factory.wrapping_get();
1208 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1209 task_id,
1210 execution_id,
1211 priority,
1212 false, )));
1214 let single_execution_future = async {
1215 if this.stopped.load(Ordering::Acquire) {
1216 this.backend.task_execution_canceled(task_id, &*this);
1217 return false;
1218 }
1219
1220 let Some(TaskExecutionSpec { future, span }) = this
1221 .backend
1222 .try_start_task_execution(task_id, priority, &*this)
1223 else {
1224 return false;
1225 };
1226
1227 async {
1228 let result = CaptureFuture::new(future).await;
1229
1230 wait_for_local_tasks().await;
1232
1233 let result = match result {
1234 Ok(Ok(raw_vc)) => Ok(raw_vc),
1235 Ok(Err(err)) => Err(err.into()),
1236 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1237 };
1238
1239 let finished_state = this.finish_current_task_state();
1240 let cell_counters = CURRENT_TASK_STATE
1241 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1242 this.backend.task_execution_completed(
1243 task_id,
1244 result,
1245 &cell_counters,
1246 #[cfg(feature = "verify_determinism")]
1247 finished_state.stateful,
1248 finished_state.has_invalidator,
1249 &*this,
1250 )
1251 }
1252 .instrument(span)
1253 .await
1254 };
1255 schedule_again = CURRENT_TASK_STATE
1256 .scope(current_task_state, single_execution_future)
1257 .await;
1258 }
1259 this.finish_foreground_job();
1260 };
1261
1262 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1263 }
1264 ScheduledTask::LocalTask {
1265 ty,
1266 persistence,
1267 local_task_id,
1268 global_task_state,
1269 span,
1270 } => {
1271 let this2 = this.clone();
1272 let this = this.clone();
1273 let task_type = ty.task_type;
1274 let future = async move {
1275 let span = match &ty.task_type {
1276 LocalTaskType::ResolveNative { native_fn } => {
1277 native_fn.resolve_span(priority)
1278 }
1279 LocalTaskType::ResolveTrait { trait_method } => {
1280 trait_method.resolve_span(priority)
1281 }
1282 };
1283 async move {
1284 let result = match ty.task_type {
1285 LocalTaskType::ResolveNative { native_fn } => {
1286 LocalTaskType::run_resolve_native(
1287 native_fn,
1288 ty.this,
1289 &*ty.arg,
1290 persistence,
1291 this,
1292 )
1293 .await
1294 }
1295 LocalTaskType::ResolveTrait { trait_method } => {
1296 LocalTaskType::run_resolve_trait(
1297 trait_method,
1298 ty.this.unwrap(),
1299 &*ty.arg,
1300 persistence,
1301 this,
1302 )
1303 .await
1304 }
1305 };
1306
1307 let output = match result {
1308 Ok(raw_vc) => OutputContent::Link(raw_vc),
1309 Err(err) => OutputContent::Error(
1310 TurboTasksExecutionError::from(err)
1311 .with_local_task_context(task_type.to_string()),
1312 ),
1313 };
1314
1315 let local_task = LocalTask::Done { output };
1316
1317 let done_event = CURRENT_TASK_STATE.with(move |gts| {
1318 let mut gts_write = gts.write().unwrap();
1319 let scheduled_task = std::mem::replace(
1320 gts_write.get_mut_local_task(local_task_id),
1321 local_task,
1322 );
1323 let LocalTask::Scheduled { done_event } = scheduled_task else {
1324 panic!("local task finished, but was not in the scheduled state?");
1325 };
1326 done_event
1327 });
1328 done_event.notify(usize::MAX)
1329 }
1330 .instrument(span)
1331 .await
1332 };
1333 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1334
1335 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1336 }
1337 }
1338 }
1339}
1340
1341struct FinishedTaskState {
1342 #[cfg(feature = "verify_determinism")]
1345 stateful: bool,
1346
1347 has_invalidator: bool,
1349}
1350
1351impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1352 fn dynamic_call(
1353 &self,
1354 native_fn: &'static NativeFunction,
1355 this: Option<RawVc>,
1356 arg: &mut dyn StackDynTaskInputs,
1357 persistence: TaskPersistence,
1358 ) -> RawVc {
1359 self.dynamic_call(native_fn, this, arg, persistence)
1360 }
1361 fn native_call(
1362 &self,
1363 native_fn: &'static NativeFunction,
1364 this: Option<RawVc>,
1365 arg: &mut dyn StackDynTaskInputs,
1366 persistence: TaskPersistence,
1367 ) -> RawVc {
1368 self.native_call(native_fn, this, arg, persistence)
1369 }
1370 fn trait_call(
1371 &self,
1372 trait_method: &'static TraitMethod,
1373 this: RawVc,
1374 arg: &mut dyn StackDynTaskInputs,
1375 persistence: TaskPersistence,
1376 ) -> RawVc {
1377 self.trait_call(trait_method, this, arg, persistence)
1378 }
1379
1380 #[track_caller]
1381 fn run(
1382 &self,
1383 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1384 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1385 let this = self.pin();
1386 Box::pin(async move { this.run(future).await })
1387 }
1388
1389 #[track_caller]
1390 fn run_once(
1391 &self,
1392 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1393 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1394 let this = self.pin();
1395 Box::pin(async move { this.run_once(future).await })
1396 }
1397
1398 #[track_caller]
1399 fn run_once_with_reason(
1400 &self,
1401 reason: StaticOrArc<dyn InvalidationReason>,
1402 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1403 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1404 {
1405 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1406 reason_set.insert(reason);
1407 }
1408 let this = self.pin();
1409 Box::pin(async move { this.run_once(future).await })
1410 }
1411
1412 #[track_caller]
1413 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1414 self.start_once_process(future)
1415 }
1416
1417 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1418 if let Err(e) = self.compilation_events.send(event) {
1419 tracing::warn!("Failed to send compilation event: {e}");
1420 }
1421 }
1422
1423 fn get_task_name(&self, task: TaskId) -> String {
1424 self.backend.get_task_name(task, self)
1425 }
1426}
1427
1428impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1429 #[instrument(level = "info", skip_all, name = "invalidate")]
1430 fn invalidate(&self, task: TaskId) {
1431 self.backend.invalidate_task(task, self);
1432 }
1433
1434 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1435 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1436 {
1437 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1438 reason_set.insert(reason);
1439 }
1440 self.backend.invalidate_task(task, self);
1441 }
1442
1443 fn invalidate_serialization(&self, task: TaskId) {
1444 self.backend.invalidate_serialization(task, self);
1445 }
1446
1447 #[track_caller]
1448 fn try_read_task_output(
1449 &self,
1450 task: TaskId,
1451 options: ReadOutputOptions,
1452 ) -> Result<Result<RawVc, EventListener>> {
1453 if options.consistency == ReadConsistency::Eventual {
1454 debug_assert_not_in_top_level_task("read_task_output");
1455 }
1456 self.backend.try_read_task_output(
1457 task,
1458 current_task_if_available("reading Vcs"),
1459 options,
1460 self,
1461 )
1462 }
1463
1464 #[track_caller]
1465 fn try_read_task_cell(
1466 &self,
1467 task: TaskId,
1468 index: CellId,
1469 options: ReadCellOptions,
1470 ) -> Result<Result<TypedCellContent, EventListener>> {
1471 let reader = current_task_if_available("reading Vcs");
1472 self.backend
1473 .try_read_task_cell(task, index, reader, options, self)
1474 }
1475
1476 fn try_read_own_task_cell(
1477 &self,
1478 current_task: TaskId,
1479 index: CellId,
1480 ) -> Result<TypedCellContent> {
1481 self.backend
1482 .try_read_own_task_cell(current_task, index, self)
1483 }
1484
1485 #[track_caller]
1486 fn try_read_local_output(
1487 &self,
1488 execution_id: ExecutionId,
1489 local_task_id: LocalTaskId,
1490 ) -> Result<Result<RawVc, EventListener>> {
1491 debug_assert_not_in_top_level_task("read_local_output");
1492 CURRENT_TASK_STATE.with(|gts| {
1493 let gts_read = gts.read().unwrap();
1494
1495 gts_read.assert_execution_id(execution_id);
1500
1501 match gts_read.get_local_task(local_task_id) {
1502 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1503 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1504 }
1505 })
1506 }
1507
1508 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1509 self.backend.read_task_collectibles(
1512 task,
1513 trait_id,
1514 current_task_if_available("reading collectibles"),
1515 self,
1516 )
1517 }
1518
1519 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1520 self.backend.emit_collectible(
1521 trait_type,
1522 collectible,
1523 current_task("emitting collectible"),
1524 self,
1525 );
1526 }
1527
1528 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1529 self.backend.unemit_collectible(
1530 trait_type,
1531 collectible,
1532 count,
1533 current_task("emitting collectible"),
1534 self,
1535 );
1536 }
1537
1538 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1539 for (&collectible, &count) in collectibles {
1540 if count > 0 {
1541 self.backend.unemit_collectible(
1542 trait_type,
1543 collectible,
1544 count as u32,
1545 current_task("emitting collectible"),
1546 self,
1547 );
1548 }
1549 }
1550 }
1551
1552 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1553 self.try_read_own_task_cell(task, index)
1554 }
1555
1556 fn update_own_task_cell(
1557 &self,
1558 task: TaskId,
1559 index: CellId,
1560 content: CellContent,
1561 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1562 content_hash: Option<CellHash>,
1563 verification_mode: VerificationMode,
1564 ) {
1565 self.backend.update_task_cell(
1566 task,
1567 index,
1568 content,
1569 updated_key_hashes,
1570 content_hash,
1571 verification_mode,
1572 self,
1573 );
1574 }
1575
1576 fn connect_task(&self, task: TaskId) {
1577 self.backend
1578 .connect_task(task, current_task_if_available("connecting task"), self);
1579 }
1580
1581 fn mark_own_task_as_finished(&self, task: TaskId) {
1582 self.backend.mark_own_task_as_finished(task, self);
1583 }
1584
1585 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1586 self.backend.mark_own_task_as_session_dependent(task, self);
1587 }
1588
1589 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1592 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1595 let fut = tokio::spawn(TURBO_TASKS.scope(
1596 turbo_tasks(),
1597 CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1598 ));
1599 let fut = Box::pin(async move {
1600 fut.await.unwrap();
1601 });
1602 let mut ts = global_task_state.write().unwrap();
1603 ts.local_task_tracker
1604 .get_or_insert_default()
1605 .push(Either::Right(fut));
1606 }
1607
1608 fn task_statistics(&self) -> &TaskStatisticsApi {
1609 self.backend.task_statistics()
1610 }
1611
1612 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1613 let this = self.pin();
1614 Box::pin(async move {
1615 this.stop_and_wait().await;
1616 })
1617 }
1618
1619 fn subscribe_to_compilation_events(
1620 &self,
1621 event_types: Option<Vec<String>>,
1622 ) -> Receiver<Arc<dyn CompilationEvent>> {
1623 self.compilation_events.subscribe(event_types)
1624 }
1625
1626 fn is_tracking_dependencies(&self) -> bool {
1627 self.backend.is_tracking_dependencies()
1628 }
1629}
1630
1631impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1632 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1633 self.pin()
1634 }
1635 fn backend(&self) -> &B {
1636 &self.backend
1637 }
1638
1639 #[track_caller]
1640 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1641 self.schedule_background_job(async move |this| {
1642 this.backend.run_backend_job(job, &*this).await;
1643 this
1644 })
1645 }
1646
1647 #[track_caller]
1648 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1649 self.schedule_foreground_job(async move |this| {
1650 this.backend.run_backend_job(job, &*this).await;
1651 this
1652 })
1653 }
1654
1655 #[track_caller]
1656 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1657 self.schedule(task, priority)
1658 }
1659
1660 fn get_current_task_priority(&self) -> TaskPriority {
1661 CURRENT_TASK_STATE
1662 .try_with(|task_state| task_state.read().unwrap().priority)
1663 .unwrap_or(TaskPriority::initial())
1664 }
1665
1666 fn program_duration_until(&self, instant: Instant) -> Duration {
1667 instant - self.program_start
1668 }
1669
1670 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1671 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1673 }
1674
1675 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1676 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1678 }
1679
1680 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1681 unsafe { self.task_id_factory.reuse(id.into()) }
1682 }
1683
1684 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1685 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1686 }
1687
1688 fn is_idle(&self) -> bool {
1689 self.currently_scheduled_foreground_jobs
1690 .load(Ordering::Acquire)
1691 == 0
1692 }
1693}
1694
1695async fn wait_for_local_tasks() {
1696 while let Some(mut ltt) =
1697 CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1698 {
1699 use futures::StreamExt;
1700 while ltt.next().await.is_some() {}
1701 }
1702}
1703
1704pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1705 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1706 Ok(id) => id,
1707 Err(_) => panic!(
1708 "{from} can only be used in the context of a turbo_tasks task execution or \
1709 turbo_tasks run"
1710 ),
1711 }
1712}
1713
1714pub(crate) fn current_task(from: &str) -> TaskId {
1715 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1716 Ok(Some(id)) => id,
1717 Ok(None) | Err(_) => {
1718 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1719 }
1720 }
1721}
1722
1723#[track_caller]
1726pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1727 if !cfg!(debug_assertions) {
1728 return;
1729 }
1730
1731 let in_top_level = CURRENT_TASK_STATE
1732 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1733 .unwrap_or(true);
1734 if !in_top_level {
1735 panic!("{message}");
1736 }
1737}
1738
1739#[track_caller]
1740pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1741 if !cfg!(debug_assertions) {
1742 return;
1743 }
1744
1745 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1748 .try_with(|&suppressed| suppressed)
1749 .unwrap_or(false);
1750 if suppressed {
1751 return;
1752 }
1753
1754 let in_top_level = CURRENT_TASK_STATE
1755 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1756 .unwrap_or(false);
1757 if in_top_level {
1758 panic!(
1759 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1760 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1761 reads to avoid leaking inconsistent return values."
1762 );
1763 }
1764}
1765
1766pub async fn run<T: Send + 'static>(
1767 tt: Arc<dyn TurboTasksApi>,
1768 future: impl Future<Output = Result<T>> + Send + 'static,
1769) -> Result<T> {
1770 let (tx, rx) = tokio::sync::oneshot::channel();
1771
1772 tt.run(Box::pin(async move {
1773 let result = future.await?;
1774 tx.send(result)
1775 .map_err(|_| anyhow!("unable to send result"))?;
1776 Ok(())
1777 }))
1778 .await?;
1779
1780 Ok(rx.await?)
1781}
1782
1783pub async fn run_once<T: Send + 'static>(
1784 tt: Arc<dyn TurboTasksApi>,
1785 future: impl Future<Output = Result<T>> + Send + 'static,
1786) -> Result<T> {
1787 let (tx, rx) = tokio::sync::oneshot::channel();
1788
1789 tt.run_once(Box::pin(async move {
1790 let result = future.await?;
1791 tx.send(result)
1792 .map_err(|_| anyhow!("unable to send result"))?;
1793 Ok(())
1794 }))
1795 .await?;
1796
1797 Ok(rx.await?)
1798}
1799
1800pub async fn run_once_with_reason<T: Send + 'static>(
1801 tt: Arc<dyn TurboTasksApi>,
1802 reason: impl InvalidationReason,
1803 future: impl Future<Output = Result<T>> + Send + 'static,
1804) -> Result<T> {
1805 let (tx, rx) = tokio::sync::oneshot::channel();
1806
1807 tt.run_once_with_reason(
1808 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1809 Box::pin(async move {
1810 let result = future.await?;
1811 tx.send(result)
1812 .map_err(|_| anyhow!("unable to send result"))?;
1813 Ok(())
1814 }),
1815 )
1816 .await?;
1817
1818 Ok(rx.await?)
1819}
1820
1821pub fn dynamic_call(
1823 func: &'static NativeFunction,
1824 this: Option<RawVc>,
1825 arg: &mut dyn StackDynTaskInputs,
1826 persistence: TaskPersistence,
1827) -> RawVc {
1828 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1829}
1830
1831pub fn trait_call(
1833 trait_method: &'static TraitMethod,
1834 this: RawVc,
1835 arg: &mut dyn StackDynTaskInputs,
1836 persistence: TaskPersistence,
1837) -> RawVc {
1838 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1839}
1840
1841pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1842 TURBO_TASKS.with(|arc| arc.clone())
1843}
1844
1845pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1846 TURBO_TASKS.with(Arc::downgrade)
1847}
1848
1849pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1850 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1851}
1852
1853pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1854 TURBO_TASKS.with(|arc| func(arc))
1855}
1856
1857pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1858 TURBO_TASKS.sync_scope(tt, f)
1859}
1860
1861pub fn turbo_tasks_future_scope<T>(
1862 tt: Arc<dyn TurboTasksApi>,
1863 f: impl Future<Output = T>,
1864) -> impl Future<Output = T> {
1865 TURBO_TASKS.scope(tt, f)
1866}
1867
1868pub fn with_turbo_tasks_for_testing<T>(
1869 tt: Arc<dyn TurboTasksApi>,
1870 current_task: TaskId,
1871 execution_id: ExecutionId,
1872 f: impl Future<Output = T>,
1873) -> impl Future<Output = T> {
1874 TURBO_TASKS.scope(
1875 tt,
1876 CURRENT_TASK_STATE.scope(
1877 Arc::new(RwLock::new(CurrentTaskState::new(
1878 current_task,
1879 execution_id,
1880 TaskPriority::initial(),
1881 false, ))),
1883 f,
1884 ),
1885 )
1886}
1887
1888pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1893 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1894}
1895
1896pub fn current_task_for_testing() -> Option<TaskId> {
1897 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1898}
1899
1900pub fn mark_session_dependent() {
1902 with_turbo_tasks(|tt| {
1903 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1904 });
1905}
1906
1907pub fn mark_finished() {
1910 with_turbo_tasks(|tt| {
1911 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1912 });
1913}
1914
1915pub fn get_serialization_invalidator() -> SerializationInvalidator {
1921 CURRENT_TASK_STATE.with(|cell| {
1922 let CurrentTaskState {
1923 task_id,
1924 #[cfg(feature = "verify_determinism")]
1925 stateful,
1926 ..
1927 } = &mut *cell.write().unwrap();
1928 #[cfg(feature = "verify_determinism")]
1929 {
1930 *stateful = true;
1931 }
1932 let Some(task_id) = *task_id else {
1933 panic!(
1934 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1935 task execution"
1936 );
1937 };
1938 SerializationInvalidator::new(task_id)
1939 })
1940}
1941
1942pub fn mark_invalidator() {
1943 CURRENT_TASK_STATE.with(|cell| {
1944 let CurrentTaskState {
1945 has_invalidator, ..
1946 } = &mut *cell.write().unwrap();
1947 *has_invalidator = true;
1948 })
1949}
1950
1951pub fn mark_stateful() {
1957 #[cfg(feature = "verify_determinism")]
1958 {
1959 CURRENT_TASK_STATE.with(|cell| {
1960 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1961 *stateful = true;
1962 })
1963 }
1964 }
1966
1967pub fn mark_top_level_task() {
1971 if cfg!(debug_assertions) {
1972 CURRENT_TASK_STATE.with(|cell| {
1973 cell.write().unwrap().in_top_level_task = true;
1974 })
1975 }
1976}
1977
1978pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1989 if cfg!(debug_assertions) {
1990 CURRENT_TASK_STATE.with(|cell| {
1991 cell.write().unwrap().in_top_level_task = false;
1992 })
1993 }
1994}
1995
1996pub fn prevent_gc() {
1997 }
1999
2000pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
2001 with_turbo_tasks(|tt| {
2002 let raw_vc = collectible.node.node;
2003 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2004 })
2005}
2006
2007pub(crate) async fn read_task_output(
2008 this: &dyn TurboTasksApi,
2009 id: TaskId,
2010 options: ReadOutputOptions,
2011) -> Result<RawVc> {
2012 loop {
2013 match this.try_read_task_output(id, options)? {
2014 Ok(result) => return Ok(result),
2015 Err(listener) => listener.await,
2016 }
2017 }
2018}
2019
2020#[derive(Clone, Copy)]
2026pub struct CurrentCellRef {
2027 current_task: TaskId,
2028 index: CellId,
2029}
2030
2031type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2032
2033impl CurrentCellRef {
2034 fn conditional_update<T>(
2036 &self,
2037 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2038 ) where
2039 T: VcValueType,
2040 {
2041 self.conditional_update_with_shared_reference(|old_shared_reference| {
2042 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2043 let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2044 Some((
2045 SharedReference::new(triomphe::Arc::new(new_value)),
2046 updated_key_hashes,
2047 content_hash,
2048 ))
2049 })
2050 }
2051
2052 fn conditional_update_with_shared_reference(
2054 &self,
2055 functor: impl FnOnce(
2056 Option<&SharedReference>,
2057 ) -> Option<(
2058 SharedReference,
2059 Option<SmallVec<[u64; 2]>>,
2060 Option<CellHash>,
2061 )>,
2062 ) {
2063 let tt = turbo_tasks();
2064 let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2065 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2066 if let Some((update, updated_key_hashes, content_hash)) = update {
2067 tt.update_own_task_cell(
2068 self.current_task,
2069 self.index,
2070 CellContent(Some(update)),
2071 updated_key_hashes,
2072 content_hash,
2073 VerificationMode::EqualityCheck,
2074 )
2075 }
2076 }
2077
2078 pub fn compare_and_update<T>(&self, new_value: T)
2112 where
2113 T: PartialEq + VcValueType,
2114 {
2115 self.conditional_update(|old_value| {
2116 if let Some(old_value) = old_value
2117 && old_value == &new_value
2118 {
2119 return None;
2120 }
2121 Some((new_value, None, None))
2122 });
2123 }
2124
2125 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2133 where
2134 T: VcValueType + PartialEq,
2135 {
2136 self.conditional_update_with_shared_reference(|old_sr| {
2137 if let Some(old_sr) = old_sr {
2138 let old_value = extract_sr_value::<T>(old_sr);
2139 let new_value = extract_sr_value::<T>(&new_shared_reference);
2140 if old_value == new_value {
2141 return None;
2142 }
2143 }
2144 Some((new_shared_reference, None, None))
2145 });
2146 }
2147
2148 pub fn hashed_compare_and_update<T>(&self, new_value: T)
2157 where
2158 T: PartialEq + DeterministicHash + VcValueType,
2159 {
2160 self.conditional_update(|old_value| {
2161 if let Some(old_value) = old_value
2162 && old_value == &new_value
2163 {
2164 return None;
2165 }
2166 let content_hash = hash_xxh3_hash128(&new_value);
2167 Some((new_value, None, Some(content_hash)))
2168 });
2169 }
2170
2171 pub fn hashed_compare_and_update_with_shared_reference<T>(
2177 &self,
2178 new_shared_reference: SharedReference,
2179 ) where
2180 T: VcValueType + PartialEq + DeterministicHash,
2181 {
2182 self.conditional_update_with_shared_reference(move |old_sr| {
2183 if let Some(old_sr) = old_sr {
2184 let old_value = extract_sr_value::<T>(old_sr);
2185 let new_value = extract_sr_value::<T>(&new_shared_reference);
2186 if old_value == new_value {
2187 return None;
2188 }
2189 }
2190 let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2191 Some((new_shared_reference, None, Some(content_hash)))
2192 });
2193 }
2194
2195 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2197 where
2198 T: PartialEq + VcValueType,
2199 VcReadTarget<T>: KeyedEq,
2200 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2201 {
2202 self.conditional_update(|old_value| {
2203 let Some(old_value) = old_value else {
2204 return Some((new_value, None, None));
2205 };
2206 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2207 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2208 let updated_keys = old_value.different_keys(new_value_ref);
2209 if updated_keys.is_empty() {
2210 return None;
2211 }
2212 let updated_key_hashes = updated_keys
2214 .into_iter()
2215 .map(|key| FxBuildHasher.hash_one(key))
2216 .collect();
2217 Some((new_value, Some(updated_key_hashes), None))
2218 });
2219 }
2220
2221 pub fn keyed_compare_and_update_with_shared_reference<T>(
2224 &self,
2225 new_shared_reference: SharedReference,
2226 ) where
2227 T: VcValueType + PartialEq,
2228 VcReadTarget<T>: KeyedEq,
2229 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2230 {
2231 self.conditional_update_with_shared_reference(|old_sr| {
2232 let Some(old_sr) = old_sr else {
2233 return Some((new_shared_reference, None, None));
2234 };
2235 let old_value = extract_sr_value::<T>(old_sr);
2236 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2237 let new_value = extract_sr_value::<T>(&new_shared_reference);
2238 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2239 let updated_keys = old_value.different_keys(new_value);
2240 if updated_keys.is_empty() {
2241 return None;
2242 }
2243 let updated_key_hashes = updated_keys
2245 .into_iter()
2246 .map(|key| FxBuildHasher.hash_one(key))
2247 .collect();
2248 Some((new_shared_reference, Some(updated_key_hashes), None))
2249 });
2250 }
2251
2252 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2254 where
2255 T: VcValueType,
2256 {
2257 let tt = turbo_tasks();
2258 tt.update_own_task_cell(
2259 self.current_task,
2260 self.index,
2261 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2262 None,
2263 None,
2264 verification_mode,
2265 )
2266 }
2267
2268 pub fn update_with_shared_reference(
2276 &self,
2277 shared_ref: SharedReference,
2278 verification_mode: VerificationMode,
2279 ) {
2280 let tt = turbo_tasks();
2281 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2282 let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2283 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2284 shared_ref_exp != shared_ref
2286 } else {
2287 true
2288 }
2289 } else {
2290 true
2291 };
2292 if update {
2293 tt.update_own_task_cell(
2294 self.current_task,
2295 self.index,
2296 CellContent(Some(shared_ref)),
2297 None,
2298 None,
2299 verification_mode,
2300 )
2301 }
2302 }
2303}
2304
2305impl From<CurrentCellRef> for RawVc {
2306 fn from(cell: CurrentCellRef) -> Self {
2307 RawVc::TaskCell(cell.current_task, cell.index)
2308 }
2309}
2310
2311fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2312 sr.0.downcast_ref::<T>()
2313 .expect("cannot update SharedReference of different type")
2314}
2315
2316pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2317 find_cell_by_id(T::get_value_type_id())
2318}
2319
2320pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2321 CURRENT_TASK_STATE.with(|ts| {
2322 let current_task = current_task("celling turbo_tasks values");
2323 let mut ts = ts.write().unwrap();
2324 let map = ts.cell_counters.as_mut().unwrap();
2325 let current_index = map.entry(ty).or_default();
2326 let index = *current_index;
2327 *current_index += 1;
2328 CurrentCellRef {
2329 current_task,
2330 index: CellId { type_id: ty, index },
2331 }
2332 })
2333}
2334
2335pub(crate) async fn read_local_output(
2336 this: &dyn TurboTasksApi,
2337 execution_id: ExecutionId,
2338 local_task_id: LocalTaskId,
2339) -> Result<RawVc> {
2340 loop {
2341 match this.try_read_local_output(execution_id, local_task_id)? {
2342 Ok(raw_vc) => return Ok(raw_vc),
2343 Err(event_listener) => event_listener.await,
2344 }
2345 }
2346}