1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 pin::Pin,
8 sync::{
9 Arc, Mutex, RwLock, Weak,
10 atomic::{AtomicBool, AtomicUsize, Ordering},
11 },
12 time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25
26use crate::{
27 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
28 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
29 VcValueTrait, VcValueType,
30 backend::{
31 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
32 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
33 },
34 capture_future::CaptureFuture,
35 event::{Event, EventListener},
36 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
37 id_factory::IdFactoryWithReuse,
38 keyed::KeyedEq,
39 macro_helpers::NativeFunction,
40 magic_any::MagicAny,
41 message_queue::{CompilationEvent, CompilationEventQueue},
42 priority_runner::{Executor, JoinHandle, PriorityRunner},
43 raw_vc::{CellId, RawVc},
44 registry,
45 serialization_invalidation::SerializationInvalidator,
46 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactory, StaticOrArc},
50};
51
52pub trait TurboTasksCallApi: Sync + Send {
55 fn dynamic_call(
58 &self,
59 native_fn: &'static NativeFunction,
60 this: Option<RawVc>,
61 arg: Box<dyn MagicAny>,
62 persistence: TaskPersistence,
63 ) -> RawVc;
64 fn native_call(
67 &self,
68 native_fn: &'static NativeFunction,
69 this: Option<RawVc>,
70 arg: Box<dyn MagicAny>,
71 persistence: TaskPersistence,
72 ) -> RawVc;
73 fn trait_call(
76 &self,
77 trait_method: &'static TraitMethod,
78 this: RawVc,
79 arg: Box<dyn MagicAny>,
80 persistence: TaskPersistence,
81 ) -> RawVc;
82
83 fn run(
84 &self,
85 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
86 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
87 fn run_once(
88 &self,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91 fn run_once_with_reason(
92 &self,
93 reason: StaticOrArc<dyn InvalidationReason>,
94 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
95 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
96 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
97
98 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
100
101 fn get_task_name(&self, task: TaskId) -> String;
103}
104
105pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
111 fn invalidate(&self, task: TaskId);
112 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
113
114 fn invalidate_serialization(&self, task: TaskId);
115
116 fn try_read_task_output(
117 &self,
118 task: TaskId,
119 options: ReadOutputOptions,
120 ) -> Result<Result<RawVc, EventListener>>;
121
122 fn try_read_task_cell(
123 &self,
124 task: TaskId,
125 index: CellId,
126 options: ReadCellOptions,
127 ) -> Result<Result<TypedCellContent, EventListener>>;
128
129 fn try_read_local_output(
144 &self,
145 execution_id: ExecutionId,
146 local_task_id: LocalTaskId,
147 ) -> Result<Result<RawVc, EventListener>>;
148
149 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
150
151 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
152 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
153 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
154
155 fn try_read_own_task_cell(
158 &self,
159 current_task: TaskId,
160 index: CellId,
161 options: ReadCellOptions,
162 ) -> Result<TypedCellContent>;
163
164 fn read_own_task_cell(
165 &self,
166 task: TaskId,
167 index: CellId,
168 options: ReadCellOptions,
169 ) -> Result<TypedCellContent>;
170 fn update_own_task_cell(
171 &self,
172 task: TaskId,
173 index: CellId,
174 is_serializable_cell_content: bool,
175 content: CellContent,
176 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
177 verification_mode: VerificationMode,
178 );
179 fn mark_own_task_as_finished(&self, task: TaskId);
180 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
181 fn mark_own_task_as_session_dependent(&self, task: TaskId);
182
183 fn connect_task(&self, task: TaskId);
184
185 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
190
191 fn task_statistics(&self) -> &TaskStatisticsApi;
192
193 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
194
195 fn subscribe_to_compilation_events(
196 &self,
197 event_types: Option<Vec<String>>,
198 ) -> Receiver<Arc<dyn CompilationEvent>>;
199
200 fn is_tracking_dependencies(&self) -> bool;
202}
203
204pub struct Unused<T> {
206 inner: T,
207}
208
209impl<T> Unused<T> {
210 pub unsafe fn new_unchecked(inner: T) -> Self {
216 Self { inner }
217 }
218
219 pub unsafe fn get_unchecked(&self) -> &T {
225 &self.inner
226 }
227
228 pub fn into(self) -> T {
230 self.inner
231 }
232}
233
234pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
236 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
237
238 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
239 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
240 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
244 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
248
249 fn schedule(&self, task: TaskId, priority: TaskPriority);
251
252 fn get_current_task_priority(&self) -> TaskPriority;
254
255 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
257
258 fn schedule_backend_background_job(&self, job: B::BackendJob);
263
264 fn program_duration_until(&self, instant: Instant) -> Duration;
266
267 fn is_idle(&self) -> bool;
269
270 fn backend(&self) -> &B;
272}
273
274#[allow(clippy::manual_non_exhaustive)]
275pub struct UpdateInfo {
276 pub duration: Duration,
277 pub tasks: usize,
278 pub reasons: InvalidationReasonSet,
279 #[allow(dead_code)]
280 placeholder_for_future_fields: (),
281}
282
283#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
284pub enum TaskPersistence {
285 Persistent,
287
288 Transient,
295}
296
297impl Display for TaskPersistence {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 match self {
300 TaskPersistence::Persistent => write!(f, "persistent"),
301 TaskPersistence::Transient => write!(f, "transient"),
302 }
303 }
304}
305
306#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
307pub enum ReadConsistency {
308 #[default]
311 Eventual,
312 Strong,
317}
318
319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
320pub enum ReadCellTracking {
321 Tracked {
323 key: Option<u64>,
325 },
326 TrackOnlyError,
331 Untracked,
336}
337
338impl ReadCellTracking {
339 pub fn should_track(&self, is_err: bool) -> bool {
340 match self {
341 ReadCellTracking::Tracked { .. } => true,
342 ReadCellTracking::TrackOnlyError => is_err,
343 ReadCellTracking::Untracked => false,
344 }
345 }
346
347 pub fn key(&self) -> Option<u64> {
348 match self {
349 ReadCellTracking::Tracked { key } => *key,
350 ReadCellTracking::TrackOnlyError => None,
351 ReadCellTracking::Untracked => None,
352 }
353 }
354}
355
356impl Default for ReadCellTracking {
357 fn default() -> Self {
358 ReadCellTracking::Tracked { key: None }
359 }
360}
361
362impl Display for ReadCellTracking {
363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364 match self {
365 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
366 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
367 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
368 ReadCellTracking::Untracked => write!(f, "untracked"),
369 }
370 }
371}
372
373#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
374pub enum ReadTracking {
375 #[default]
377 Tracked,
378 TrackOnlyError,
383 Untracked,
388}
389
390impl ReadTracking {
391 pub fn should_track(&self, is_err: bool) -> bool {
392 match self {
393 ReadTracking::Tracked => true,
394 ReadTracking::TrackOnlyError => is_err,
395 ReadTracking::Untracked => false,
396 }
397 }
398}
399
400impl Display for ReadTracking {
401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402 match self {
403 ReadTracking::Tracked => write!(f, "tracked"),
404 ReadTracking::TrackOnlyError => write!(f, "track only error"),
405 ReadTracking::Untracked => write!(f, "untracked"),
406 }
407 }
408}
409
410#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
411pub enum TaskPriority {
412 #[default]
413 Initial,
414 Invalidation {
415 priority: Reverse<u32>,
416 },
417}
418
419impl TaskPriority {
420 pub fn invalidation(priority: u32) -> Self {
421 Self::Invalidation {
422 priority: Reverse(priority),
423 }
424 }
425
426 pub fn initial() -> Self {
427 Self::Initial
428 }
429
430 pub fn leaf() -> Self {
431 Self::Invalidation {
432 priority: Reverse(0),
433 }
434 }
435
436 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
437 match self {
438 TaskPriority::Initial => parent_priority,
439 TaskPriority::Invalidation { priority } => {
440 if let TaskPriority::Invalidation {
441 priority: parent_priority,
442 } = parent_priority
443 && priority.0 < parent_priority.0
444 {
445 Self::Invalidation {
446 priority: Reverse(parent_priority.0.saturating_add(1)),
447 }
448 } else {
449 *self
450 }
451 }
452 }
453 }
454}
455
456impl Display for TaskPriority {
457 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458 match self {
459 TaskPriority::Initial => write!(f, "initial"),
460 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
461 }
462 }
463}
464
465enum ScheduledTask {
466 Task {
467 task_id: TaskId,
468 span: Span,
469 },
470 LocalTask {
471 ty: LocalTaskSpec,
472 persistence: TaskPersistence,
473 local_task_id: LocalTaskId,
474 global_task_state: Arc<RwLock<CurrentTaskState>>,
475 span: Span,
476 },
477}
478
479pub struct TurboTasks<B: Backend + 'static> {
480 this: Weak<Self>,
481 backend: B,
482 task_id_factory: IdFactoryWithReuse<TaskId>,
483 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
484 execution_id_factory: IdFactory<ExecutionId>,
485 stopped: AtomicBool,
486 currently_scheduled_foreground_jobs: AtomicUsize,
487 currently_scheduled_background_jobs: AtomicUsize,
488 scheduled_tasks: AtomicUsize,
489 priority_runner:
490 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
491 start: Mutex<Option<Instant>>,
492 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
493 event_foreground_start: Event,
495 event_foreground_done: Event,
498 event_background_done: Event,
500 program_start: Instant,
501 compilation_events: CompilationEventQueue,
502}
503
504type LocalTaskTracker = Option<
505 FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
506>;
507
508struct CurrentTaskState {
517 task_id: Option<TaskId>,
518 execution_id: ExecutionId,
519 priority: TaskPriority,
520
521 #[cfg(feature = "verify_determinism")]
524 stateful: bool,
525
526 has_invalidator: bool,
528
529 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
534
535 local_tasks: Vec<LocalTask>,
537
538 local_task_tracker: LocalTaskTracker,
541}
542
543impl CurrentTaskState {
544 fn new(task_id: TaskId, execution_id: ExecutionId, priority: TaskPriority) -> Self {
545 Self {
546 task_id: Some(task_id),
547 execution_id,
548 priority,
549 #[cfg(feature = "verify_determinism")]
550 stateful: false,
551 has_invalidator: false,
552 cell_counters: Some(AutoMap::default()),
553 local_tasks: Vec::new(),
554 local_task_tracker: None,
555 }
556 }
557
558 fn new_temporary(execution_id: ExecutionId, priority: TaskPriority) -> Self {
559 Self {
560 task_id: None,
561 execution_id,
562 priority,
563 #[cfg(feature = "verify_determinism")]
564 stateful: false,
565 has_invalidator: false,
566 cell_counters: None,
567 local_tasks: Vec::new(),
568 local_task_tracker: None,
569 }
570 }
571
572 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
573 if self.execution_id != expected_execution_id {
574 panic!(
575 "Local tasks can only be scheduled/awaited within the same execution of the \
576 parent task that created them"
577 );
578 }
579 }
580
581 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
582 self.local_tasks.push(local_task);
583 if cfg!(debug_assertions) {
585 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
586 } else {
587 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
588 }
589 }
590
591 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
592 &self.local_tasks[(*local_task_id as usize) - 1]
594 }
595
596 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
597 &mut self.local_tasks[(*local_task_id as usize) - 1]
598 }
599}
600
601task_local! {
603 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
605
606 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
607}
608
609impl<B: Backend + 'static> TurboTasks<B> {
610 pub fn new(backend: B) -> Arc<Self> {
616 let task_id_factory = IdFactoryWithReuse::new(
617 TaskId::MIN,
618 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
619 );
620 let transient_task_id_factory =
621 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
622 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
623 let this = Arc::new_cyclic(|this| Self {
624 this: this.clone(),
625 backend,
626 task_id_factory,
627 transient_task_id_factory,
628 execution_id_factory,
629 stopped: AtomicBool::new(false),
630 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
631 currently_scheduled_background_jobs: AtomicUsize::new(0),
632 scheduled_tasks: AtomicUsize::new(0),
633 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
634 start: Default::default(),
635 aggregated_update: Default::default(),
636 event_foreground_done: Event::new(|| {
637 || "TurboTasks::event_foreground_done".to_string()
638 }),
639 event_foreground_start: Event::new(|| {
640 || "TurboTasks::event_foreground_start".to_string()
641 }),
642 event_background_done: Event::new(|| {
643 || "TurboTasks::event_background_done".to_string()
644 }),
645 program_start: Instant::now(),
646 compilation_events: CompilationEventQueue::default(),
647 });
648 this.backend.startup(&*this);
649 this
650 }
651
652 pub fn pin(&self) -> Arc<Self> {
653 self.this.upgrade().unwrap()
654 }
655
656 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
658 where
659 T: ?Sized,
660 F: Fn() -> Fut + Send + Sync + Clone + 'static,
661 Fut: Future<Output = Result<Vc<T>>> + Send,
662 {
663 let id = self.backend.create_transient_task(
664 TransientTaskType::Root(Box::new(move || {
665 let functor = functor.clone();
666 Box::pin(async move {
667 let raw_vc = functor().await?.node;
668 raw_vc.to_non_local().await
669 })
670 })),
671 self,
672 );
673 self.schedule(id, TaskPriority::initial());
674 id
675 }
676
677 pub fn dispose_root_task(&self, task_id: TaskId) {
678 self.backend.dispose_root_task(task_id, self);
679 }
680
681 #[track_caller]
685 fn spawn_once_task<T, Fut>(&self, future: Fut)
686 where
687 T: ?Sized,
688 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
689 {
690 let id = self.backend.create_transient_task(
691 TransientTaskType::Once(Box::pin(async move {
692 let raw_vc = future.await?.node;
693 raw_vc.to_non_local().await
694 })),
695 self,
696 );
697 self.schedule(id, TaskPriority::initial());
698 }
699
700 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
701 &self,
702 future: impl Future<Output = Result<T>> + Send + 'static,
703 ) -> Result<T> {
704 let (tx, rx) = tokio::sync::oneshot::channel();
705 self.spawn_once_task(async move {
706 let result = future.await;
707 tx.send(result)
708 .map_err(|_| anyhow!("unable to send result"))?;
709 Ok(Completion::new())
710 });
711
712 rx.await?
713 }
714
715 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
716 pub async fn run<T: TraceRawVcs + Send + 'static>(
717 &self,
718 future: impl Future<Output = Result<T>> + Send + 'static,
719 ) -> Result<T, TurboTasksExecutionError> {
720 self.begin_foreground_job();
721 let execution_id = self.execution_id_factory.wrapping_get();
723 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
724 execution_id,
725 TaskPriority::initial(),
726 )));
727
728 let result = TURBO_TASKS
729 .scope(
730 self.pin(),
731 CURRENT_TASK_STATE.scope(current_task_state, async {
732 let result = CaptureFuture::new(future).await;
733
734 wait_for_local_tasks().await;
736
737 match result {
738 Ok(Ok(raw_vc)) => Ok(raw_vc),
739 Ok(Err(err)) => Err(err.into()),
740 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
741 }
742 }),
743 )
744 .await;
745 self.finish_foreground_job();
746 result
747 }
748
749 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
750 let this = self.pin();
751 tokio::spawn(async move {
752 this.pin()
753 .run_once(async move {
754 this.finish_foreground_job();
755 future.await;
756 this.begin_foreground_job();
757 Ok(())
758 })
759 .await
760 .unwrap()
761 });
762 }
763
764 pub(crate) fn native_call(
765 &self,
766 native_fn: &'static NativeFunction,
767 this: Option<RawVc>,
768 arg: Box<dyn MagicAny>,
769 persistence: TaskPersistence,
770 ) -> RawVc {
771 let task_type = CachedTaskType {
772 native_fn,
773 this,
774 arg,
775 };
776 RawVc::TaskOutput(match persistence {
777 TaskPersistence::Transient => self.backend.get_or_create_transient_task(
778 task_type,
779 current_task_if_available("turbo_function calls"),
780 self,
781 ),
782 TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
783 task_type,
784 current_task_if_available("turbo_function calls"),
785 self,
786 ),
787 })
788 }
789
790 pub fn dynamic_call(
791 &self,
792 native_fn: &'static NativeFunction,
793 this: Option<RawVc>,
794 arg: Box<dyn MagicAny>,
795 persistence: TaskPersistence,
796 ) -> RawVc {
797 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
798 return self.native_call(native_fn, this, arg, persistence);
799 }
800 let task_type = LocalTaskSpec {
801 task_type: LocalTaskType::ResolveNative { native_fn },
802 this,
803 arg,
804 };
805 self.schedule_local_task(task_type, persistence)
806 }
807
808 pub fn trait_call(
809 &self,
810 trait_method: &'static TraitMethod,
811 this: RawVc,
812 arg: Box<dyn MagicAny>,
813 persistence: TaskPersistence,
814 ) -> RawVc {
815 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
819 match registry::get_value_type(type_id).get_trait_method(trait_method) {
820 Some(native_fn) => {
821 let arg = native_fn.arg_meta.filter_owned(arg);
822 return self.dynamic_call(native_fn, Some(this), arg, persistence);
823 }
824 None => {
825 }
829 }
830 }
831
832 let task_type = LocalTaskSpec {
834 task_type: LocalTaskType::ResolveTrait { trait_method },
835 this: Some(this),
836 arg,
837 };
838
839 self.schedule_local_task(task_type, persistence)
840 }
841
842 #[track_caller]
843 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
844 self.begin_foreground_job();
845 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
846
847 self.priority_runner.schedule(
848 &self.pin(),
849 ScheduledTask::Task {
850 task_id,
851 span: Span::current(),
852 },
853 priority,
854 );
855 }
856
857 fn schedule_local_task(
858 &self,
859 ty: LocalTaskSpec,
860 persistence: TaskPersistence,
862 ) -> RawVc {
863 let task_type = ty.task_type;
864 let (global_task_state, execution_id, priority, local_task_id) =
865 CURRENT_TASK_STATE.with(|gts| {
866 let mut gts_write = gts.write().unwrap();
867 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
868 done_event: Event::new(move || {
869 move || format!("LocalTask({task_type})::done_event")
870 }),
871 });
872 (
873 Arc::clone(gts),
874 gts_write.execution_id,
875 gts_write.priority,
876 local_task_id,
877 )
878 });
879
880 let future = self.priority_runner.schedule_with_join_handle(
881 &self.pin(),
882 ScheduledTask::LocalTask {
883 ty,
884 persistence,
885 local_task_id,
886 global_task_state: global_task_state.clone(),
887 span: Span::current(),
888 },
889 priority,
890 );
891 global_task_state
892 .write()
893 .unwrap()
894 .local_task_tracker
895 .get_or_insert_default()
896 .push(Either::Left(future));
897
898 RawVc::LocalOutput(execution_id, local_task_id, persistence)
899 }
900
901 fn begin_foreground_job(&self) {
902 if self
903 .currently_scheduled_foreground_jobs
904 .fetch_add(1, Ordering::AcqRel)
905 == 0
906 {
907 *self.start.lock().unwrap() = Some(Instant::now());
908 self.event_foreground_start.notify(usize::MAX);
909 self.backend.idle_end(self);
910 }
911 }
912
913 fn finish_foreground_job(&self) {
914 if self
915 .currently_scheduled_foreground_jobs
916 .fetch_sub(1, Ordering::AcqRel)
917 == 1
918 {
919 self.backend.idle_start(self);
920 let total = self.scheduled_tasks.load(Ordering::Acquire);
923 self.scheduled_tasks.store(0, Ordering::Release);
924 if let Some(start) = *self.start.lock().unwrap() {
925 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
926 if let Some(update) = update.as_mut() {
927 update.0 += start.elapsed();
928 update.1 += total;
929 } else {
930 *update = Some((start.elapsed(), total));
931 }
932 }
933 self.event_foreground_done.notify(usize::MAX);
934 }
935 }
936
937 fn begin_background_job(&self) {
938 self.currently_scheduled_background_jobs
939 .fetch_add(1, Ordering::Relaxed);
940 }
941
942 fn finish_background_job(&self) {
943 if self
944 .currently_scheduled_background_jobs
945 .fetch_sub(1, Ordering::Relaxed)
946 == 1
947 {
948 self.event_background_done.notify(usize::MAX);
949 }
950 }
951
952 pub fn get_in_progress_count(&self) -> usize {
953 self.currently_scheduled_foreground_jobs
954 .load(Ordering::Acquire)
955 }
956
957 pub async fn wait_task_completion(
969 &self,
970 id: TaskId,
971 consistency: ReadConsistency,
972 ) -> Result<()> {
973 read_task_output(
974 self,
975 id,
976 ReadOutputOptions {
977 tracking: ReadTracking::Untracked,
979 consistency,
980 },
981 )
982 .await?;
983 Ok(())
984 }
985
986 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
989 self.aggregated_update_info(aggregation, Duration::MAX)
990 .await
991 .unwrap()
992 }
993
994 pub async fn aggregated_update_info(
998 &self,
999 aggregation: Duration,
1000 timeout: Duration,
1001 ) -> Option<UpdateInfo> {
1002 let listener = self
1003 .event_foreground_done
1004 .listen_with_note(|| || "wait for update info".to_string());
1005 let wait_for_finish = {
1006 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1007 if aggregation.is_zero() {
1008 if let Some((duration, tasks)) = update.take() {
1009 return Some(UpdateInfo {
1010 duration,
1011 tasks,
1012 reasons: take(reason_set),
1013 placeholder_for_future_fields: (),
1014 });
1015 } else {
1016 true
1017 }
1018 } else {
1019 update.is_none()
1020 }
1021 };
1022 if wait_for_finish {
1023 if timeout == Duration::MAX {
1024 listener.await;
1026 } else {
1027 let start_listener = self
1029 .event_foreground_start
1030 .listen_with_note(|| || "wait for update info".to_string());
1031 if self
1032 .currently_scheduled_foreground_jobs
1033 .load(Ordering::Acquire)
1034 == 0
1035 {
1036 start_listener.await;
1037 } else {
1038 drop(start_listener);
1039 }
1040 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1041 return None;
1043 }
1044 }
1045 }
1046 if !aggregation.is_zero() {
1047 loop {
1048 select! {
1049 () = tokio::time::sleep(aggregation) => {
1050 break;
1051 }
1052 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1053 }
1055 }
1056 }
1057 }
1058 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1059 if let Some((duration, tasks)) = update.take() {
1060 Some(UpdateInfo {
1061 duration,
1062 tasks,
1063 reasons: take(reason_set),
1064 placeholder_for_future_fields: (),
1065 })
1066 } else {
1067 panic!("aggregated_update_info must not called concurrently")
1068 }
1069 }
1070
1071 pub async fn wait_background_done(&self) {
1072 let listener = self.event_background_done.listen();
1073 if self
1074 .currently_scheduled_background_jobs
1075 .load(Ordering::Acquire)
1076 != 0
1077 {
1078 listener.await;
1079 }
1080 }
1081
1082 pub async fn stop_and_wait(&self) {
1083 turbo_tasks_future_scope(self.pin(), async move {
1084 self.backend.stopping(self);
1085 self.stopped.store(true, Ordering::Release);
1086 {
1087 let listener = self
1088 .event_foreground_done
1089 .listen_with_note(|| || "wait for stop".to_string());
1090 if self
1091 .currently_scheduled_foreground_jobs
1092 .load(Ordering::Acquire)
1093 != 0
1094 {
1095 listener.await;
1096 }
1097 }
1098 {
1099 let listener = self.event_background_done.listen();
1100 if self
1101 .currently_scheduled_background_jobs
1102 .load(Ordering::Acquire)
1103 != 0
1104 {
1105 listener.await;
1106 }
1107 }
1108 self.backend.stop(self);
1109 })
1110 .await;
1111 }
1112
1113 #[track_caller]
1114 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1115 where
1116 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1117 T::CallOnceFuture: Send,
1118 {
1119 let mut this = self.pin();
1120 this.begin_foreground_job();
1121 tokio::spawn(
1122 TURBO_TASKS
1123 .scope(this.clone(), async move {
1124 if !this.stopped.load(Ordering::Acquire) {
1125 this = func(this.clone()).await;
1126 }
1127 this.finish_foreground_job();
1128 })
1129 .in_current_span(),
1130 );
1131 }
1132
1133 #[track_caller]
1134 pub(crate) fn schedule_background_job<T>(&self, func: T)
1135 where
1136 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1137 T::CallOnceFuture: Send,
1138 {
1139 let mut this = self.pin();
1140 self.begin_background_job();
1141 tokio::spawn(
1142 TURBO_TASKS
1143 .scope(this.clone(), async move {
1144 if !this.stopped.load(Ordering::Acquire) {
1145 this = func(this).await;
1146 }
1147 this.finish_background_job();
1148 })
1149 .in_current_span(),
1150 );
1151 }
1152
1153 fn finish_current_task_state(&self) -> FinishedTaskState {
1154 CURRENT_TASK_STATE.with(|cell| {
1155 let current_task_state = &*cell.write().unwrap();
1156 FinishedTaskState {
1157 #[cfg(feature = "verify_determinism")]
1158 stateful: current_task_state.stateful,
1159 has_invalidator: current_task_state.has_invalidator,
1160 }
1161 })
1162 }
1163
1164 pub fn backend(&self) -> &B {
1165 &self.backend
1166 }
1167}
1168
1169struct TurboTasksExecutor;
1170
1171impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1172 type Future = impl Future<Output = ()> + Send + 'static;
1173
1174 fn execute(
1175 &self,
1176 this: &Arc<TurboTasks<B>>,
1177 scheduled_task: ScheduledTask,
1178 priority: TaskPriority,
1179 ) -> Self::Future {
1180 match scheduled_task {
1181 ScheduledTask::Task { task_id, span } => {
1182 let this2 = this.clone();
1183 let this = this.clone();
1184 let future = async move {
1185 let mut schedule_again = true;
1186 while schedule_again {
1187 let execution_id = this.execution_id_factory.wrapping_get();
1190 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1191 task_id,
1192 execution_id,
1193 priority,
1194 )));
1195 let single_execution_future = async {
1196 if this.stopped.load(Ordering::Acquire) {
1197 this.backend.task_execution_canceled(task_id, &*this);
1198 return false;
1199 }
1200
1201 let Some(TaskExecutionSpec { future, span }) = this
1202 .backend
1203 .try_start_task_execution(task_id, priority, &*this)
1204 else {
1205 return false;
1206 };
1207
1208 async {
1209 let result = CaptureFuture::new(future).await;
1210
1211 wait_for_local_tasks().await;
1213
1214 let result = match result {
1215 Ok(Ok(raw_vc)) => Ok(raw_vc),
1216 Ok(Err(err)) => Err(err.into()),
1217 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1218 };
1219
1220 let finihed_state = this.finish_current_task_state();
1221 let cell_counters = CURRENT_TASK_STATE
1222 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1223 this.backend.task_execution_completed(
1224 task_id,
1225 result,
1226 &cell_counters,
1227 #[cfg(feature = "verify_determinism")]
1228 finihed_state.stateful,
1229 finihed_state.has_invalidator,
1230 &*this,
1231 )
1232 }
1233 .instrument(span)
1234 .await
1235 };
1236 schedule_again = CURRENT_TASK_STATE
1237 .scope(current_task_state, single_execution_future)
1238 .await;
1239 }
1240 this.finish_foreground_job();
1241 };
1242
1243 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1244 }
1245 ScheduledTask::LocalTask {
1246 ty,
1247 persistence,
1248 local_task_id,
1249 global_task_state,
1250 span,
1251 } => {
1252 let this2 = this.clone();
1253 let this = this.clone();
1254 let task_type = ty.task_type;
1255 let future = async move {
1256 let span = match &ty.task_type {
1257 LocalTaskType::ResolveNative { native_fn } => {
1258 native_fn.resolve_span(priority)
1259 }
1260 LocalTaskType::ResolveTrait { trait_method } => {
1261 trait_method.resolve_span(priority)
1262 }
1263 };
1264 async move {
1265 let result = match ty.task_type {
1266 LocalTaskType::ResolveNative { native_fn } => {
1267 LocalTaskType::run_resolve_native(
1268 native_fn,
1269 ty.this,
1270 &*ty.arg,
1271 persistence,
1272 this,
1273 )
1274 .await
1275 }
1276 LocalTaskType::ResolveTrait { trait_method } => {
1277 LocalTaskType::run_resolve_trait(
1278 trait_method,
1279 ty.this.unwrap(),
1280 &*ty.arg,
1281 persistence,
1282 this,
1283 )
1284 .await
1285 }
1286 };
1287
1288 let output = match result {
1289 Ok(raw_vc) => OutputContent::Link(raw_vc),
1290 Err(err) => OutputContent::Error(
1291 TurboTasksExecutionError::from(err)
1292 .with_local_task_context(task_type.to_string()),
1293 ),
1294 };
1295
1296 let local_task = LocalTask::Done { output };
1297
1298 let done_event = CURRENT_TASK_STATE.with(move |gts| {
1299 let mut gts_write = gts.write().unwrap();
1300 let scheduled_task = std::mem::replace(
1301 gts_write.get_mut_local_task(local_task_id),
1302 local_task,
1303 );
1304 let LocalTask::Scheduled { done_event } = scheduled_task else {
1305 panic!("local task finished, but was not in the scheduled state?");
1306 };
1307 done_event
1308 });
1309 done_event.notify(usize::MAX)
1310 }
1311 .instrument(span)
1312 .await
1313 };
1314 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1315
1316 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1317 }
1318 }
1319 }
1320}
1321
1322struct FinishedTaskState {
1323 #[cfg(feature = "verify_determinism")]
1326 stateful: bool,
1327
1328 has_invalidator: bool,
1330}
1331
1332impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1333 fn dynamic_call(
1334 &self,
1335 native_fn: &'static NativeFunction,
1336 this: Option<RawVc>,
1337 arg: Box<dyn MagicAny>,
1338 persistence: TaskPersistence,
1339 ) -> RawVc {
1340 self.dynamic_call(native_fn, this, arg, persistence)
1341 }
1342 fn native_call(
1343 &self,
1344 native_fn: &'static NativeFunction,
1345 this: Option<RawVc>,
1346 arg: Box<dyn MagicAny>,
1347 persistence: TaskPersistence,
1348 ) -> RawVc {
1349 self.native_call(native_fn, this, arg, persistence)
1350 }
1351 fn trait_call(
1352 &self,
1353 trait_method: &'static TraitMethod,
1354 this: RawVc,
1355 arg: Box<dyn MagicAny>,
1356 persistence: TaskPersistence,
1357 ) -> RawVc {
1358 self.trait_call(trait_method, this, arg, persistence)
1359 }
1360
1361 #[track_caller]
1362 fn run(
1363 &self,
1364 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1365 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1366 let this = self.pin();
1367 Box::pin(async move { this.run(future).await })
1368 }
1369
1370 #[track_caller]
1371 fn run_once(
1372 &self,
1373 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1374 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1375 let this = self.pin();
1376 Box::pin(async move { this.run_once(future).await })
1377 }
1378
1379 #[track_caller]
1380 fn run_once_with_reason(
1381 &self,
1382 reason: StaticOrArc<dyn InvalidationReason>,
1383 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1384 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1385 {
1386 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1387 reason_set.insert(reason);
1388 }
1389 let this = self.pin();
1390 Box::pin(async move { this.run_once(future).await })
1391 }
1392
1393 #[track_caller]
1394 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1395 self.start_once_process(future)
1396 }
1397
1398 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1399 if let Err(e) = self.compilation_events.send(event) {
1400 tracing::warn!("Failed to send compilation event: {e}");
1401 }
1402 }
1403
1404 fn get_task_name(&self, task: TaskId) -> String {
1405 self.backend.get_task_name(task, self)
1406 }
1407}
1408
1409impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1410 #[instrument(level = "info", skip_all, name = "invalidate")]
1411 fn invalidate(&self, task: TaskId) {
1412 self.backend.invalidate_task(task, self);
1413 }
1414
1415 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1416 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1417 {
1418 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1419 reason_set.insert(reason);
1420 }
1421 self.backend.invalidate_task(task, self);
1422 }
1423
1424 fn invalidate_serialization(&self, task: TaskId) {
1425 self.backend.invalidate_serialization(task, self);
1426 }
1427
1428 fn try_read_task_output(
1429 &self,
1430 task: TaskId,
1431 options: ReadOutputOptions,
1432 ) -> Result<Result<RawVc, EventListener>> {
1433 self.backend.try_read_task_output(
1434 task,
1435 current_task_if_available("reading Vcs"),
1436 options,
1437 self,
1438 )
1439 }
1440
1441 fn try_read_task_cell(
1442 &self,
1443 task: TaskId,
1444 index: CellId,
1445 options: ReadCellOptions,
1446 ) -> Result<Result<TypedCellContent, EventListener>> {
1447 self.backend.try_read_task_cell(
1448 task,
1449 index,
1450 current_task_if_available("reading Vcs"),
1451 options,
1452 self,
1453 )
1454 }
1455
1456 fn try_read_own_task_cell(
1457 &self,
1458 current_task: TaskId,
1459 index: CellId,
1460 options: ReadCellOptions,
1461 ) -> Result<TypedCellContent> {
1462 self.backend
1463 .try_read_own_task_cell(current_task, index, options, self)
1464 }
1465
1466 fn try_read_local_output(
1467 &self,
1468 execution_id: ExecutionId,
1469 local_task_id: LocalTaskId,
1470 ) -> Result<Result<RawVc, EventListener>> {
1471 CURRENT_TASK_STATE.with(|gts| {
1472 let gts_read = gts.read().unwrap();
1473
1474 gts_read.assert_execution_id(execution_id);
1479
1480 match gts_read.get_local_task(local_task_id) {
1481 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1482 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1483 }
1484 })
1485 }
1486
1487 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1488 self.backend.read_task_collectibles(
1489 task,
1490 trait_id,
1491 current_task_if_available("reading collectibles"),
1492 self,
1493 )
1494 }
1495
1496 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1497 self.backend.emit_collectible(
1498 trait_type,
1499 collectible,
1500 current_task("emitting collectible"),
1501 self,
1502 );
1503 }
1504
1505 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1506 self.backend.unemit_collectible(
1507 trait_type,
1508 collectible,
1509 count,
1510 current_task("emitting collectible"),
1511 self,
1512 );
1513 }
1514
1515 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1516 for (&collectible, &count) in collectibles {
1517 if count > 0 {
1518 self.backend.unemit_collectible(
1519 trait_type,
1520 collectible,
1521 count as u32,
1522 current_task("emitting collectible"),
1523 self,
1524 );
1525 }
1526 }
1527 }
1528
1529 fn read_own_task_cell(
1530 &self,
1531 task: TaskId,
1532 index: CellId,
1533 options: ReadCellOptions,
1534 ) -> Result<TypedCellContent> {
1535 self.try_read_own_task_cell(task, index, options)
1536 }
1537
1538 fn update_own_task_cell(
1539 &self,
1540 task: TaskId,
1541 index: CellId,
1542 is_serializable_cell_content: bool,
1543 content: CellContent,
1544 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1545 verification_mode: VerificationMode,
1546 ) {
1547 self.backend.update_task_cell(
1548 task,
1549 index,
1550 is_serializable_cell_content,
1551 content,
1552 updated_key_hashes,
1553 verification_mode,
1554 self,
1555 );
1556 }
1557
1558 fn connect_task(&self, task: TaskId) {
1559 self.backend
1560 .connect_task(task, current_task_if_available("connecting task"), self);
1561 }
1562
1563 fn mark_own_task_as_finished(&self, task: TaskId) {
1564 self.backend.mark_own_task_as_finished(task, self);
1565 }
1566
1567 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1568 self.backend
1569 .set_own_task_aggregation_number(task, aggregation_number, self);
1570 }
1571
1572 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1573 self.backend.mark_own_task_as_session_dependent(task, self);
1574 }
1575
1576 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1579 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1582 let fut = tokio::spawn(TURBO_TASKS.scope(
1583 turbo_tasks(),
1584 CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1585 ));
1586 let fut = Box::pin(async move {
1587 fut.await.unwrap();
1588 });
1589 let mut ts = global_task_state.write().unwrap();
1590 ts.local_task_tracker
1591 .get_or_insert_default()
1592 .push(Either::Right(fut));
1593 }
1594
1595 fn task_statistics(&self) -> &TaskStatisticsApi {
1596 self.backend.task_statistics()
1597 }
1598
1599 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1600 let this = self.pin();
1601 Box::pin(async move {
1602 this.stop_and_wait().await;
1603 })
1604 }
1605
1606 fn subscribe_to_compilation_events(
1607 &self,
1608 event_types: Option<Vec<String>>,
1609 ) -> Receiver<Arc<dyn CompilationEvent>> {
1610 self.compilation_events.subscribe(event_types)
1611 }
1612
1613 fn is_tracking_dependencies(&self) -> bool {
1614 self.backend.is_tracking_dependencies()
1615 }
1616}
1617
1618impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1619 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1620 self.pin()
1621 }
1622 fn backend(&self) -> &B {
1623 &self.backend
1624 }
1625
1626 #[track_caller]
1627 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1628 self.schedule_background_job(async move |this| {
1629 this.backend.run_backend_job(job, &*this).await;
1630 this
1631 })
1632 }
1633
1634 #[track_caller]
1635 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1636 self.schedule_foreground_job(async move |this| {
1637 this.backend.run_backend_job(job, &*this).await;
1638 this
1639 })
1640 }
1641
1642 #[track_caller]
1643 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1644 self.schedule(task, priority)
1645 }
1646
1647 fn get_current_task_priority(&self) -> TaskPriority {
1648 CURRENT_TASK_STATE
1649 .try_with(|task_state| task_state.read().unwrap().priority)
1650 .unwrap_or(TaskPriority::initial())
1651 }
1652
1653 fn program_duration_until(&self, instant: Instant) -> Duration {
1654 instant - self.program_start
1655 }
1656
1657 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1658 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1660 }
1661
1662 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1663 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1665 }
1666
1667 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1668 unsafe { self.task_id_factory.reuse(id.into()) }
1669 }
1670
1671 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1672 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1673 }
1674
1675 fn is_idle(&self) -> bool {
1676 self.currently_scheduled_foreground_jobs
1677 .load(Ordering::Acquire)
1678 == 0
1679 }
1680}
1681
1682async fn wait_for_local_tasks() {
1683 while let Some(mut ltt) =
1684 CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1685 {
1686 use futures::StreamExt;
1687 while ltt.next().await.is_some() {}
1688 }
1689}
1690
1691pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1692 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1693 Ok(id) => id,
1694 Err(_) => panic!(
1695 "{from} can only be used in the context of a turbo_tasks task execution or \
1696 turbo_tasks run"
1697 ),
1698 }
1699}
1700
1701pub(crate) fn current_task(from: &str) -> TaskId {
1702 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1703 Ok(Some(id)) => id,
1704 Ok(None) | Err(_) => {
1705 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1706 }
1707 }
1708}
1709
1710pub async fn run<T: Send + 'static>(
1711 tt: Arc<dyn TurboTasksApi>,
1712 future: impl Future<Output = Result<T>> + Send + 'static,
1713) -> Result<T> {
1714 let (tx, rx) = tokio::sync::oneshot::channel();
1715
1716 tt.run(Box::pin(async move {
1717 let result = future.await?;
1718 tx.send(result)
1719 .map_err(|_| anyhow!("unable to send result"))?;
1720 Ok(())
1721 }))
1722 .await?;
1723
1724 Ok(rx.await?)
1725}
1726
1727pub async fn run_once<T: Send + 'static>(
1728 tt: Arc<dyn TurboTasksApi>,
1729 future: impl Future<Output = Result<T>> + Send + 'static,
1730) -> Result<T> {
1731 let (tx, rx) = tokio::sync::oneshot::channel();
1732
1733 tt.run_once(Box::pin(async move {
1734 let result = future.await?;
1735 tx.send(result)
1736 .map_err(|_| anyhow!("unable to send result"))?;
1737 Ok(())
1738 }))
1739 .await?;
1740
1741 Ok(rx.await?)
1742}
1743
1744pub async fn run_once_with_reason<T: Send + 'static>(
1745 tt: Arc<dyn TurboTasksApi>,
1746 reason: impl InvalidationReason,
1747 future: impl Future<Output = Result<T>> + Send + 'static,
1748) -> Result<T> {
1749 let (tx, rx) = tokio::sync::oneshot::channel();
1750
1751 tt.run_once_with_reason(
1752 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1753 Box::pin(async move {
1754 let result = future.await?;
1755 tx.send(result)
1756 .map_err(|_| anyhow!("unable to send result"))?;
1757 Ok(())
1758 }),
1759 )
1760 .await?;
1761
1762 Ok(rx.await?)
1763}
1764
1765pub fn dynamic_call(
1767 func: &'static NativeFunction,
1768 this: Option<RawVc>,
1769 arg: Box<dyn MagicAny>,
1770 persistence: TaskPersistence,
1771) -> RawVc {
1772 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1773}
1774
1775pub fn trait_call(
1777 trait_method: &'static TraitMethod,
1778 this: RawVc,
1779 arg: Box<dyn MagicAny>,
1780 persistence: TaskPersistence,
1781) -> RawVc {
1782 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1783}
1784
1785pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1786 TURBO_TASKS.with(|arc| arc.clone())
1787}
1788
1789pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1790 TURBO_TASKS.with(Arc::downgrade)
1791}
1792
1793pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1794 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1795}
1796
1797pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1798 TURBO_TASKS.with(|arc| func(arc))
1799}
1800
1801pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1802 TURBO_TASKS.sync_scope(tt, f)
1803}
1804
1805pub fn turbo_tasks_future_scope<T>(
1806 tt: Arc<dyn TurboTasksApi>,
1807 f: impl Future<Output = T>,
1808) -> impl Future<Output = T> {
1809 TURBO_TASKS.scope(tt, f)
1810}
1811
1812pub fn with_turbo_tasks_for_testing<T>(
1813 tt: Arc<dyn TurboTasksApi>,
1814 current_task: TaskId,
1815 execution_id: ExecutionId,
1816 f: impl Future<Output = T>,
1817) -> impl Future<Output = T> {
1818 TURBO_TASKS.scope(
1819 tt,
1820 CURRENT_TASK_STATE.scope(
1821 Arc::new(RwLock::new(CurrentTaskState::new(
1822 current_task,
1823 execution_id,
1824 TaskPriority::initial(),
1825 ))),
1826 f,
1827 ),
1828 )
1829}
1830
1831pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1836 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1837}
1838
1839pub fn current_task_for_testing() -> Option<TaskId> {
1840 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1841}
1842
1843pub fn mark_session_dependent() {
1845 with_turbo_tasks(|tt| {
1846 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1847 });
1848}
1849
1850pub fn mark_root() {
1853 with_turbo_tasks(|tt| {
1854 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1855 });
1856}
1857
1858pub fn mark_finished() {
1861 with_turbo_tasks(|tt| {
1862 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1863 });
1864}
1865
1866pub fn get_serialization_invalidator() -> SerializationInvalidator {
1872 CURRENT_TASK_STATE.with(|cell| {
1873 let CurrentTaskState {
1874 task_id,
1875 #[cfg(feature = "verify_determinism")]
1876 stateful,
1877 ..
1878 } = &mut *cell.write().unwrap();
1879 #[cfg(feature = "verify_determinism")]
1880 {
1881 *stateful = true;
1882 }
1883 let Some(task_id) = *task_id else {
1884 panic!(
1885 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1886 task execution"
1887 );
1888 };
1889 SerializationInvalidator::new(task_id)
1890 })
1891}
1892
1893pub fn mark_invalidator() {
1894 CURRENT_TASK_STATE.with(|cell| {
1895 let CurrentTaskState {
1896 has_invalidator, ..
1897 } = &mut *cell.write().unwrap();
1898 *has_invalidator = true;
1899 })
1900}
1901
1902pub fn mark_stateful() {
1908 #[cfg(feature = "verify_determinism")]
1909 {
1910 CURRENT_TASK_STATE.with(|cell| {
1911 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1912 *stateful = true;
1913 })
1914 }
1915 }
1917
1918pub fn prevent_gc() {
1919 }
1921
1922pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1923 with_turbo_tasks(|tt| {
1924 let raw_vc = collectible.node.node;
1925 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1926 })
1927}
1928
1929pub(crate) async fn read_task_output(
1930 this: &dyn TurboTasksApi,
1931 id: TaskId,
1932 options: ReadOutputOptions,
1933) -> Result<RawVc> {
1934 loop {
1935 match this.try_read_task_output(id, options)? {
1936 Ok(result) => return Ok(result),
1937 Err(listener) => listener.await,
1938 }
1939 }
1940}
1941
1942#[derive(Clone, Copy)]
1948pub struct CurrentCellRef {
1949 current_task: TaskId,
1950 index: CellId,
1951 is_serializable_cell_content: bool,
1952}
1953
1954type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
1955
1956impl CurrentCellRef {
1957 fn conditional_update<T>(
1959 &self,
1960 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>)>,
1961 ) where
1962 T: VcValueType,
1963 {
1964 self.conditional_update_with_shared_reference(|old_shared_reference| {
1965 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
1966 let (new_value, updated_key_hashes) = functor(old_ref)?;
1967 Some((
1968 SharedReference::new(triomphe::Arc::new(new_value)),
1969 updated_key_hashes,
1970 ))
1971 })
1972 }
1973
1974 fn conditional_update_with_shared_reference(
1976 &self,
1977 functor: impl FnOnce(
1978 Option<&SharedReference>,
1979 ) -> Option<(SharedReference, Option<SmallVec<[u64; 2]>>)>,
1980 ) {
1981 let tt = turbo_tasks();
1982 let cell_content = tt
1983 .read_own_task_cell(
1984 self.current_task,
1985 self.index,
1986 ReadCellOptions {
1987 tracking: ReadCellTracking::Untracked,
1989 is_serializable_cell_content: self.is_serializable_cell_content,
1990 final_read_hint: false,
1991 },
1992 )
1993 .ok();
1994 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1995 if let Some((update, updated_key_hashes)) = update {
1996 tt.update_own_task_cell(
1997 self.current_task,
1998 self.index,
1999 self.is_serializable_cell_content,
2000 CellContent(Some(update)),
2001 updated_key_hashes,
2002 VerificationMode::EqualityCheck,
2003 )
2004 }
2005 }
2006
2007 pub fn compare_and_update<T>(&self, new_value: T)
2041 where
2042 T: PartialEq + VcValueType,
2043 {
2044 self.conditional_update(|old_value| {
2045 if let Some(old_value) = old_value
2046 && old_value == &new_value
2047 {
2048 return None;
2049 }
2050 Some((new_value, None))
2051 });
2052 }
2053
2054 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2062 where
2063 T: VcValueType + PartialEq,
2064 {
2065 self.conditional_update_with_shared_reference(|old_sr| {
2066 if let Some(old_sr) = old_sr {
2067 let old_value = extract_sr_value::<T>(old_sr);
2068 let new_value = extract_sr_value::<T>(&new_shared_reference);
2069 if old_value == new_value {
2070 return None;
2071 }
2072 }
2073 Some((new_shared_reference, None))
2074 });
2075 }
2076
2077 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2079 where
2080 T: PartialEq + VcValueType,
2081 VcReadTarget<T>: KeyedEq,
2082 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2083 {
2084 self.conditional_update(|old_value| {
2085 let Some(old_value) = old_value else {
2086 return Some((new_value, None));
2087 };
2088 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2089 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2090 let updated_keys = old_value.different_keys(new_value_ref);
2091 if updated_keys.is_empty() {
2092 return None;
2093 }
2094 let updated_key_hashes = updated_keys
2096 .into_iter()
2097 .map(|key| FxBuildHasher.hash_one(key))
2098 .collect();
2099 Some((new_value, Some(updated_key_hashes)))
2100 });
2101 }
2102
2103 pub fn keyed_compare_and_update_with_shared_reference<T>(
2106 &self,
2107 new_shared_reference: SharedReference,
2108 ) where
2109 T: VcValueType + PartialEq,
2110 VcReadTarget<T>: KeyedEq,
2111 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2112 {
2113 self.conditional_update_with_shared_reference(|old_sr| {
2114 let Some(old_sr) = old_sr else {
2115 return Some((new_shared_reference, None));
2116 };
2117 let old_value = extract_sr_value::<T>(old_sr);
2118 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2119 let new_value = extract_sr_value::<T>(&new_shared_reference);
2120 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2121 let updated_keys = old_value.different_keys(new_value);
2122 if updated_keys.is_empty() {
2123 return None;
2124 }
2125 let updated_key_hashes = updated_keys
2127 .into_iter()
2128 .map(|key| FxBuildHasher.hash_one(key))
2129 .collect();
2130 Some((new_shared_reference, Some(updated_key_hashes)))
2131 });
2132 }
2133
2134 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2136 where
2137 T: VcValueType,
2138 {
2139 let tt = turbo_tasks();
2140 tt.update_own_task_cell(
2141 self.current_task,
2142 self.index,
2143 self.is_serializable_cell_content,
2144 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2145 None,
2146 verification_mode,
2147 )
2148 }
2149
2150 pub fn update_with_shared_reference(
2158 &self,
2159 shared_ref: SharedReference,
2160 verification_mode: VerificationMode,
2161 ) {
2162 let tt = turbo_tasks();
2163 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2164 let content = tt
2165 .read_own_task_cell(
2166 self.current_task,
2167 self.index,
2168 ReadCellOptions {
2169 tracking: ReadCellTracking::Untracked,
2171 is_serializable_cell_content: self.is_serializable_cell_content,
2172 final_read_hint: false,
2173 },
2174 )
2175 .ok();
2176 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2177 shared_ref_exp != shared_ref
2179 } else {
2180 true
2181 }
2182 } else {
2183 true
2184 };
2185 if update {
2186 tt.update_own_task_cell(
2187 self.current_task,
2188 self.index,
2189 self.is_serializable_cell_content,
2190 CellContent(Some(shared_ref)),
2191 None,
2192 verification_mode,
2193 )
2194 }
2195 }
2196}
2197
2198impl From<CurrentCellRef> for RawVc {
2199 fn from(cell: CurrentCellRef) -> Self {
2200 RawVc::TaskCell(cell.current_task, cell.index)
2201 }
2202}
2203
2204fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2205 sr.0.downcast_ref::<T>()
2206 .expect("cannot update SharedReference of different type")
2207}
2208
2209pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2210 find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2211}
2212
2213pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2214 CURRENT_TASK_STATE.with(|ts| {
2215 let current_task = current_task("celling turbo_tasks values");
2216 let mut ts = ts.write().unwrap();
2217 let map = ts.cell_counters.as_mut().unwrap();
2218 let current_index = map.entry(ty).or_default();
2219 let index = *current_index;
2220 *current_index += 1;
2221 CurrentCellRef {
2222 current_task,
2223 index: CellId { type_id: ty, index },
2224 is_serializable_cell_content,
2225 }
2226 })
2227}
2228
2229pub(crate) async fn read_local_output(
2230 this: &dyn TurboTasksApi,
2231 execution_id: ExecutionId,
2232 local_task_id: LocalTaskId,
2233) -> Result<RawVc> {
2234 loop {
2235 match this.try_read_local_output(execution_id, local_task_id)? {
2236 Ok(raw_vc) => return Ok(raw_vc),
2237 Err(event_listener) => event_listener.await,
2238 }
2239 }
2240}