1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 pin::Pin,
8 sync::{
9 Arc, Mutex, RwLock, Weak,
10 atomic::{AtomicBool, AtomicUsize, Ordering},
11 },
12 time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use bincode::{Decode, Encode};
18use either::Either;
19use futures::stream::FuturesUnordered;
20use rustc_hash::{FxBuildHasher, FxHasher};
21use serde::{Deserialize, Serialize};
22use smallvec::SmallVec;
23use tokio::{select, sync::mpsc::Receiver, task_local};
24use tracing::{Instrument, Span, instrument};
25
26use crate::{
27 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
28 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
29 VcValueTrait, VcValueType,
30 backend::{
31 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
32 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
33 },
34 capture_future::CaptureFuture,
35 event::{Event, EventListener},
36 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
37 id_factory::IdFactoryWithReuse,
38 keyed::Keyed,
39 macro_helpers::NativeFunction,
40 magic_any::MagicAny,
41 message_queue::{CompilationEvent, CompilationEventQueue},
42 priority_runner::{Executor, JoinHandle, PriorityRunner},
43 raw_vc::{CellId, RawVc},
44 registry,
45 serialization_invalidation::SerializationInvalidator,
46 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
47 task_statistics::TaskStatisticsApi,
48 trace::TraceRawVcs,
49 util::{IdFactory, StaticOrArc},
50};
51
52pub trait TurboTasksCallApi: Sync + Send {
55 fn dynamic_call(
58 &self,
59 native_fn: &'static NativeFunction,
60 this: Option<RawVc>,
61 arg: Box<dyn MagicAny>,
62 persistence: TaskPersistence,
63 ) -> RawVc;
64 fn native_call(
67 &self,
68 native_fn: &'static NativeFunction,
69 this: Option<RawVc>,
70 arg: Box<dyn MagicAny>,
71 persistence: TaskPersistence,
72 ) -> RawVc;
73 fn trait_call(
76 &self,
77 trait_method: &'static TraitMethod,
78 this: RawVc,
79 arg: Box<dyn MagicAny>,
80 persistence: TaskPersistence,
81 ) -> RawVc;
82
83 fn run(
84 &self,
85 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
86 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
87 fn run_once(
88 &self,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91 fn run_once_with_reason(
92 &self,
93 reason: StaticOrArc<dyn InvalidationReason>,
94 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
95 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
96 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
97}
98
99pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
105 fn invalidate(&self, task: TaskId);
106 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
107
108 fn invalidate_serialization(&self, task: TaskId);
109
110 fn try_read_task_output(
111 &self,
112 task: TaskId,
113 options: ReadOutputOptions,
114 ) -> Result<Result<RawVc, EventListener>>;
115
116 fn try_read_task_cell(
117 &self,
118 task: TaskId,
119 index: CellId,
120 options: ReadCellOptions,
121 ) -> Result<Result<TypedCellContent, EventListener>>;
122
123 fn try_read_local_output(
138 &self,
139 execution_id: ExecutionId,
140 local_task_id: LocalTaskId,
141 ) -> Result<Result<RawVc, EventListener>>;
142
143 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
144
145 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
146 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
147 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
148
149 fn try_read_own_task_cell(
152 &self,
153 current_task: TaskId,
154 index: CellId,
155 options: ReadCellOptions,
156 ) -> Result<TypedCellContent>;
157
158 fn read_own_task_cell(
159 &self,
160 task: TaskId,
161 index: CellId,
162 options: ReadCellOptions,
163 ) -> Result<TypedCellContent>;
164 fn update_own_task_cell(
165 &self,
166 task: TaskId,
167 index: CellId,
168 is_serializable_cell_content: bool,
169 content: CellContent,
170 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
171 verification_mode: VerificationMode,
172 );
173 fn mark_own_task_as_finished(&self, task: TaskId);
174 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
175 fn mark_own_task_as_session_dependent(&self, task: TaskId);
176
177 fn connect_task(&self, task: TaskId);
178
179 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
184
185 fn task_statistics(&self) -> &TaskStatisticsApi;
186
187 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
188
189 fn subscribe_to_compilation_events(
190 &self,
191 event_types: Option<Vec<String>>,
192 ) -> Receiver<Arc<dyn CompilationEvent>>;
193 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
194
195 fn is_tracking_dependencies(&self) -> bool;
197}
198
199pub struct Unused<T> {
201 inner: T,
202}
203
204impl<T> Unused<T> {
205 pub unsafe fn new_unchecked(inner: T) -> Self {
211 Self { inner }
212 }
213
214 pub unsafe fn get_unchecked(&self) -> &T {
220 &self.inner
221 }
222
223 pub fn into(self) -> T {
225 self.inner
226 }
227}
228
229pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
231 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
232
233 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
234 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
235 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
239 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
243
244 fn schedule(&self, task: TaskId, priority: TaskPriority);
246
247 fn get_current_task_priority(&self) -> TaskPriority;
249
250 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
252
253 fn schedule_backend_background_job(&self, job: B::BackendJob);
258
259 fn program_duration_until(&self, instant: Instant) -> Duration;
261
262 fn is_idle(&self) -> bool;
264
265 fn backend(&self) -> &B;
267}
268
269#[allow(clippy::manual_non_exhaustive)]
270pub struct UpdateInfo {
271 pub duration: Duration,
272 pub tasks: usize,
273 pub reasons: InvalidationReasonSet,
274 #[allow(dead_code)]
275 placeholder_for_future_fields: (),
276}
277
278#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
279pub enum TaskPersistence {
280 Persistent,
282
283 Transient,
290}
291
292impl Display for TaskPersistence {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 match self {
295 TaskPersistence::Persistent => write!(f, "persistent"),
296 TaskPersistence::Transient => write!(f, "transient"),
297 }
298 }
299}
300
301#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
302pub enum ReadConsistency {
303 #[default]
306 Eventual,
307 Strong,
312}
313
314#[derive(Clone, Copy, Debug, Eq, PartialEq)]
315pub enum ReadCellTracking {
316 Tracked {
318 key: Option<u64>,
320 },
321 TrackOnlyError,
326 Untracked,
331}
332
333impl ReadCellTracking {
334 pub fn should_track(&self, is_err: bool) -> bool {
335 match self {
336 ReadCellTracking::Tracked { .. } => true,
337 ReadCellTracking::TrackOnlyError => is_err,
338 ReadCellTracking::Untracked => false,
339 }
340 }
341
342 pub fn key(&self) -> Option<u64> {
343 match self {
344 ReadCellTracking::Tracked { key } => *key,
345 ReadCellTracking::TrackOnlyError => None,
346 ReadCellTracking::Untracked => None,
347 }
348 }
349}
350
351impl Default for ReadCellTracking {
352 fn default() -> Self {
353 ReadCellTracking::Tracked { key: None }
354 }
355}
356
357impl Display for ReadCellTracking {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 match self {
360 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
361 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
362 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
363 ReadCellTracking::Untracked => write!(f, "untracked"),
364 }
365 }
366}
367
368#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
369pub enum ReadTracking {
370 #[default]
372 Tracked,
373 TrackOnlyError,
378 Untracked,
383}
384
385impl ReadTracking {
386 pub fn should_track(&self, is_err: bool) -> bool {
387 match self {
388 ReadTracking::Tracked => true,
389 ReadTracking::TrackOnlyError => is_err,
390 ReadTracking::Untracked => false,
391 }
392 }
393}
394
395impl Display for ReadTracking {
396 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
397 match self {
398 ReadTracking::Tracked => write!(f, "tracked"),
399 ReadTracking::TrackOnlyError => write!(f, "track only error"),
400 ReadTracking::Untracked => write!(f, "untracked"),
401 }
402 }
403}
404
405#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
406pub enum TaskPriority {
407 #[default]
408 Initial,
409 Invalidation {
410 priority: Reverse<u32>,
411 },
412}
413
414impl TaskPriority {
415 pub fn invalidation(priority: u32) -> Self {
416 Self::Invalidation {
417 priority: Reverse(priority),
418 }
419 }
420
421 pub fn initial() -> Self {
422 Self::Initial
423 }
424
425 pub fn leaf() -> Self {
426 Self::Invalidation {
427 priority: Reverse(0),
428 }
429 }
430
431 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
432 match self {
433 TaskPriority::Initial => parent_priority,
434 TaskPriority::Invalidation { priority } => {
435 if let TaskPriority::Invalidation {
436 priority: parent_priority,
437 } = parent_priority
438 && priority.0 < parent_priority.0
439 {
440 Self::Invalidation {
441 priority: Reverse(parent_priority.0.saturating_add(1)),
442 }
443 } else {
444 *self
445 }
446 }
447 }
448 }
449}
450
451impl Display for TaskPriority {
452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453 match self {
454 TaskPriority::Initial => write!(f, "initial"),
455 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
456 }
457 }
458}
459
460enum ScheduledTask {
461 Task {
462 task_id: TaskId,
463 span: Span,
464 },
465 LocalTask {
466 ty: LocalTaskSpec,
467 persistence: TaskPersistence,
468 local_task_id: LocalTaskId,
469 global_task_state: Arc<RwLock<CurrentTaskState>>,
470 span: Span,
471 },
472}
473
474pub struct TurboTasks<B: Backend + 'static> {
475 this: Weak<Self>,
476 backend: B,
477 task_id_factory: IdFactoryWithReuse<TaskId>,
478 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
479 execution_id_factory: IdFactory<ExecutionId>,
480 stopped: AtomicBool,
481 currently_scheduled_foreground_jobs: AtomicUsize,
482 currently_scheduled_background_jobs: AtomicUsize,
483 scheduled_tasks: AtomicUsize,
484 priority_runner:
485 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
486 start: Mutex<Option<Instant>>,
487 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
488 event_foreground_start: Event,
490 event_foreground_done: Event,
493 event_background_done: Event,
495 program_start: Instant,
496 compilation_events: CompilationEventQueue,
497}
498
499type LocalTaskTracker = Option<
500 FuturesUnordered<Either<JoinHandle, Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>>,
501>;
502
503struct CurrentTaskState {
512 task_id: Option<TaskId>,
513 execution_id: ExecutionId,
514 priority: TaskPriority,
515
516 has_invalidator: bool,
518
519 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
524
525 local_tasks: Vec<LocalTask>,
527
528 local_task_tracker: LocalTaskTracker,
531}
532
533impl CurrentTaskState {
534 fn new(task_id: TaskId, execution_id: ExecutionId, priority: TaskPriority) -> Self {
535 Self {
536 task_id: Some(task_id),
537 execution_id,
538 priority,
539 has_invalidator: false,
540 cell_counters: Some(AutoMap::default()),
541 local_tasks: Vec::new(),
542 local_task_tracker: None,
543 }
544 }
545
546 fn new_temporary(execution_id: ExecutionId, priority: TaskPriority) -> Self {
547 Self {
548 task_id: None,
549 execution_id,
550 priority,
551 has_invalidator: false,
552 cell_counters: None,
553 local_tasks: Vec::new(),
554 local_task_tracker: None,
555 }
556 }
557
558 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
559 if self.execution_id != expected_execution_id {
560 panic!(
561 "Local tasks can only be scheduled/awaited within the same execution of the \
562 parent task that created them"
563 );
564 }
565 }
566
567 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
568 self.local_tasks.push(local_task);
569 if cfg!(debug_assertions) {
571 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
572 } else {
573 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
574 }
575 }
576
577 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
578 &self.local_tasks[(*local_task_id as usize) - 1]
580 }
581
582 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
583 &mut self.local_tasks[(*local_task_id as usize) - 1]
584 }
585}
586
587task_local! {
589 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
591
592 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
593}
594
595impl<B: Backend + 'static> TurboTasks<B> {
596 pub fn new(backend: B) -> Arc<Self> {
602 let task_id_factory = IdFactoryWithReuse::new(
603 TaskId::MIN,
604 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
605 );
606 let transient_task_id_factory =
607 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
608 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
609 let this = Arc::new_cyclic(|this| Self {
610 this: this.clone(),
611 backend,
612 task_id_factory,
613 transient_task_id_factory,
614 execution_id_factory,
615 stopped: AtomicBool::new(false),
616 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
617 currently_scheduled_background_jobs: AtomicUsize::new(0),
618 scheduled_tasks: AtomicUsize::new(0),
619 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
620 start: Default::default(),
621 aggregated_update: Default::default(),
622 event_foreground_done: Event::new(|| {
623 || "TurboTasks::event_foreground_done".to_string()
624 }),
625 event_foreground_start: Event::new(|| {
626 || "TurboTasks::event_foreground_start".to_string()
627 }),
628 event_background_done: Event::new(|| {
629 || "TurboTasks::event_background_done".to_string()
630 }),
631 program_start: Instant::now(),
632 compilation_events: CompilationEventQueue::default(),
633 });
634 this.backend.startup(&*this);
635 this
636 }
637
638 pub fn pin(&self) -> Arc<Self> {
639 self.this.upgrade().unwrap()
640 }
641
642 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
644 where
645 T: ?Sized,
646 F: Fn() -> Fut + Send + Sync + Clone + 'static,
647 Fut: Future<Output = Result<Vc<T>>> + Send,
648 {
649 let id = self.backend.create_transient_task(
650 TransientTaskType::Root(Box::new(move || {
651 let functor = functor.clone();
652 Box::pin(async move {
653 let raw_vc = functor().await?.node;
654 raw_vc.to_non_local().await
655 })
656 })),
657 self,
658 );
659 self.schedule(id, TaskPriority::initial());
660 id
661 }
662
663 pub fn dispose_root_task(&self, task_id: TaskId) {
664 self.backend.dispose_root_task(task_id, self);
665 }
666
667 #[track_caller]
671 fn spawn_once_task<T, Fut>(&self, future: Fut)
672 where
673 T: ?Sized,
674 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
675 {
676 let id = self.backend.create_transient_task(
677 TransientTaskType::Once(Box::pin(async move {
678 let raw_vc = future.await?.node;
679 raw_vc.to_non_local().await
680 })),
681 self,
682 );
683 self.schedule(id, TaskPriority::initial());
684 }
685
686 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
687 &self,
688 future: impl Future<Output = Result<T>> + Send + 'static,
689 ) -> Result<T> {
690 let (tx, rx) = tokio::sync::oneshot::channel();
691 self.spawn_once_task(async move {
692 let result = future.await;
693 tx.send(result)
694 .map_err(|_| anyhow!("unable to send result"))?;
695 Ok(Completion::new())
696 });
697
698 rx.await?
699 }
700
701 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
702 pub async fn run<T: TraceRawVcs + Send + 'static>(
703 &self,
704 future: impl Future<Output = Result<T>> + Send + 'static,
705 ) -> Result<T, TurboTasksExecutionError> {
706 self.begin_foreground_job();
707 let execution_id = self.execution_id_factory.wrapping_get();
709 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
710 execution_id,
711 TaskPriority::initial(),
712 )));
713
714 let result = TURBO_TASKS
715 .scope(
716 self.pin(),
717 CURRENT_TASK_STATE.scope(current_task_state, async {
718 let result = CaptureFuture::new(future).await;
719
720 wait_for_local_tasks().await;
722
723 match result {
724 Ok(Ok(raw_vc)) => Ok(raw_vc),
725 Ok(Err(err)) => Err(err.into()),
726 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
727 }
728 }),
729 )
730 .await;
731 self.finish_foreground_job();
732 result
733 }
734
735 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
736 let this = self.pin();
737 tokio::spawn(async move {
738 this.pin()
739 .run_once(async move {
740 this.finish_foreground_job();
741 future.await;
742 this.begin_foreground_job();
743 Ok(())
744 })
745 .await
746 .unwrap()
747 });
748 }
749
750 pub(crate) fn native_call(
751 &self,
752 native_fn: &'static NativeFunction,
753 this: Option<RawVc>,
754 arg: Box<dyn MagicAny>,
755 persistence: TaskPersistence,
756 ) -> RawVc {
757 let task_type = CachedTaskType {
758 native_fn,
759 this,
760 arg,
761 };
762 RawVc::TaskOutput(match persistence {
763 TaskPersistence::Transient => self.backend.get_or_create_transient_task(
764 task_type,
765 current_task_if_available("turbo_function calls"),
766 self,
767 ),
768 TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
769 task_type,
770 current_task_if_available("turbo_function calls"),
771 self,
772 ),
773 })
774 }
775
776 pub fn dynamic_call(
777 &self,
778 native_fn: &'static NativeFunction,
779 this: Option<RawVc>,
780 arg: Box<dyn MagicAny>,
781 persistence: TaskPersistence,
782 ) -> RawVc {
783 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
784 return self.native_call(native_fn, this, arg, persistence);
785 }
786 let task_type = LocalTaskSpec {
787 task_type: LocalTaskType::ResolveNative { native_fn },
788 this,
789 arg,
790 };
791 self.schedule_local_task(task_type, persistence)
792 }
793
794 pub fn trait_call(
795 &self,
796 trait_method: &'static TraitMethod,
797 this: RawVc,
798 arg: Box<dyn MagicAny>,
799 persistence: TaskPersistence,
800 ) -> RawVc {
801 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
805 match registry::get_value_type(type_id).get_trait_method(trait_method) {
806 Some(native_fn) => {
807 let arg = native_fn.arg_meta.filter_owned(arg);
808 return self.dynamic_call(native_fn, Some(this), arg, persistence);
809 }
810 None => {
811 }
815 }
816 }
817
818 let task_type = LocalTaskSpec {
820 task_type: LocalTaskType::ResolveTrait { trait_method },
821 this: Some(this),
822 arg,
823 };
824
825 self.schedule_local_task(task_type, persistence)
826 }
827
828 #[track_caller]
829 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
830 self.begin_foreground_job();
831 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
832
833 self.priority_runner.schedule(
834 &self.pin(),
835 ScheduledTask::Task {
836 task_id,
837 span: Span::current(),
838 },
839 priority,
840 );
841 }
842
843 fn schedule_local_task(
844 &self,
845 ty: LocalTaskSpec,
846 persistence: TaskPersistence,
848 ) -> RawVc {
849 let task_type = ty.task_type;
850 let (global_task_state, execution_id, priority, local_task_id) =
851 CURRENT_TASK_STATE.with(|gts| {
852 let mut gts_write = gts.write().unwrap();
853 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
854 done_event: Event::new(move || {
855 move || format!("LocalTask({task_type})::done_event")
856 }),
857 });
858 (
859 Arc::clone(gts),
860 gts_write.execution_id,
861 gts_write.priority,
862 local_task_id,
863 )
864 });
865
866 let future = self.priority_runner.schedule_with_join_handle(
867 &self.pin(),
868 ScheduledTask::LocalTask {
869 ty,
870 persistence,
871 local_task_id,
872 global_task_state: global_task_state.clone(),
873 span: Span::current(),
874 },
875 priority,
876 );
877 global_task_state
878 .write()
879 .unwrap()
880 .local_task_tracker
881 .get_or_insert_default()
882 .push(Either::Left(future));
883
884 RawVc::LocalOutput(execution_id, local_task_id, persistence)
885 }
886
887 fn begin_foreground_job(&self) {
888 if self
889 .currently_scheduled_foreground_jobs
890 .fetch_add(1, Ordering::AcqRel)
891 == 0
892 {
893 *self.start.lock().unwrap() = Some(Instant::now());
894 self.event_foreground_start.notify(usize::MAX);
895 self.backend.idle_end(self);
896 }
897 }
898
899 fn finish_foreground_job(&self) {
900 if self
901 .currently_scheduled_foreground_jobs
902 .fetch_sub(1, Ordering::AcqRel)
903 == 1
904 {
905 self.backend.idle_start(self);
906 let total = self.scheduled_tasks.load(Ordering::Acquire);
909 self.scheduled_tasks.store(0, Ordering::Release);
910 if let Some(start) = *self.start.lock().unwrap() {
911 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
912 if let Some(update) = update.as_mut() {
913 update.0 += start.elapsed();
914 update.1 += total;
915 } else {
916 *update = Some((start.elapsed(), total));
917 }
918 }
919 self.event_foreground_done.notify(usize::MAX);
920 }
921 }
922
923 fn begin_background_job(&self) {
924 self.currently_scheduled_background_jobs
925 .fetch_add(1, Ordering::Relaxed);
926 }
927
928 fn finish_background_job(&self) {
929 if self
930 .currently_scheduled_background_jobs
931 .fetch_sub(1, Ordering::Relaxed)
932 == 1
933 {
934 self.event_background_done.notify(usize::MAX);
935 }
936 }
937
938 pub fn get_in_progress_count(&self) -> usize {
939 self.currently_scheduled_foreground_jobs
940 .load(Ordering::Acquire)
941 }
942
943 pub async fn wait_task_completion(
955 &self,
956 id: TaskId,
957 consistency: ReadConsistency,
958 ) -> Result<()> {
959 read_task_output(
960 self,
961 id,
962 ReadOutputOptions {
963 tracking: ReadTracking::Untracked,
965 consistency,
966 },
967 )
968 .await?;
969 Ok(())
970 }
971
972 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
975 self.aggregated_update_info(aggregation, Duration::MAX)
976 .await
977 .unwrap()
978 }
979
980 pub async fn aggregated_update_info(
984 &self,
985 aggregation: Duration,
986 timeout: Duration,
987 ) -> Option<UpdateInfo> {
988 let listener = self
989 .event_foreground_done
990 .listen_with_note(|| || "wait for update info".to_string());
991 let wait_for_finish = {
992 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
993 if aggregation.is_zero() {
994 if let Some((duration, tasks)) = update.take() {
995 return Some(UpdateInfo {
996 duration,
997 tasks,
998 reasons: take(reason_set),
999 placeholder_for_future_fields: (),
1000 });
1001 } else {
1002 true
1003 }
1004 } else {
1005 update.is_none()
1006 }
1007 };
1008 if wait_for_finish {
1009 if timeout == Duration::MAX {
1010 listener.await;
1012 } else {
1013 let start_listener = self
1015 .event_foreground_start
1016 .listen_with_note(|| || "wait for update info".to_string());
1017 if self
1018 .currently_scheduled_foreground_jobs
1019 .load(Ordering::Acquire)
1020 == 0
1021 {
1022 start_listener.await;
1023 } else {
1024 drop(start_listener);
1025 }
1026 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1027 return None;
1029 }
1030 }
1031 }
1032 if !aggregation.is_zero() {
1033 loop {
1034 select! {
1035 () = tokio::time::sleep(aggregation) => {
1036 break;
1037 }
1038 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1039 }
1041 }
1042 }
1043 }
1044 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1045 if let Some((duration, tasks)) = update.take() {
1046 Some(UpdateInfo {
1047 duration,
1048 tasks,
1049 reasons: take(reason_set),
1050 placeholder_for_future_fields: (),
1051 })
1052 } else {
1053 panic!("aggregated_update_info must not called concurrently")
1054 }
1055 }
1056
1057 pub async fn wait_background_done(&self) {
1058 let listener = self.event_background_done.listen();
1059 if self
1060 .currently_scheduled_background_jobs
1061 .load(Ordering::Acquire)
1062 != 0
1063 {
1064 listener.await;
1065 }
1066 }
1067
1068 pub async fn stop_and_wait(&self) {
1069 turbo_tasks_future_scope(self.pin(), async move {
1070 self.backend.stopping(self);
1071 self.stopped.store(true, Ordering::Release);
1072 {
1073 let listener = self
1074 .event_foreground_done
1075 .listen_with_note(|| || "wait for stop".to_string());
1076 if self
1077 .currently_scheduled_foreground_jobs
1078 .load(Ordering::Acquire)
1079 != 0
1080 {
1081 listener.await;
1082 }
1083 }
1084 {
1085 let listener = self.event_background_done.listen();
1086 if self
1087 .currently_scheduled_background_jobs
1088 .load(Ordering::Acquire)
1089 != 0
1090 {
1091 listener.await;
1092 }
1093 }
1094 self.backend.stop(self);
1095 })
1096 .await;
1097 }
1098
1099 #[track_caller]
1100 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1101 where
1102 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1103 T::CallOnceFuture: Send,
1104 {
1105 let mut this = self.pin();
1106 this.begin_foreground_job();
1107 tokio::spawn(
1108 TURBO_TASKS
1109 .scope(this.clone(), async move {
1110 if !this.stopped.load(Ordering::Acquire) {
1111 this = func(this.clone()).await;
1112 }
1113 this.finish_foreground_job();
1114 })
1115 .in_current_span(),
1116 );
1117 }
1118
1119 #[track_caller]
1120 pub(crate) fn schedule_background_job<T>(&self, func: T)
1121 where
1122 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1123 T::CallOnceFuture: Send,
1124 {
1125 let mut this = self.pin();
1126 self.begin_background_job();
1127 tokio::spawn(
1128 TURBO_TASKS
1129 .scope(this.clone(), async move {
1130 if !this.stopped.load(Ordering::Acquire) {
1131 this = func(this).await;
1132 }
1133 this.finish_background_job();
1134 })
1135 .in_current_span(),
1136 );
1137 }
1138
1139 fn finish_current_task_state(&self) -> FinishedTaskState {
1140 let has_invalidator = CURRENT_TASK_STATE.with(|cell| {
1141 let CurrentTaskState {
1142 has_invalidator, ..
1143 } = &mut *cell.write().unwrap();
1144 *has_invalidator
1145 });
1146
1147 FinishedTaskState { has_invalidator }
1148 }
1149
1150 pub fn backend(&self) -> &B {
1151 &self.backend
1152 }
1153}
1154
1155struct TurboTasksExecutor;
1156
1157impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1158 type Future = impl Future<Output = ()> + Send + 'static;
1159
1160 fn execute(
1161 &self,
1162 this: &Arc<TurboTasks<B>>,
1163 scheduled_task: ScheduledTask,
1164 priority: TaskPriority,
1165 ) -> Self::Future {
1166 match scheduled_task {
1167 ScheduledTask::Task { task_id, span } => {
1168 let this2 = this.clone();
1169 let this = this.clone();
1170 let future = async move {
1171 let mut schedule_again = true;
1172 while schedule_again {
1173 let execution_id = this.execution_id_factory.wrapping_get();
1176 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1177 task_id,
1178 execution_id,
1179 priority,
1180 )));
1181 let single_execution_future = async {
1182 if this.stopped.load(Ordering::Acquire) {
1183 this.backend.task_execution_canceled(task_id, &*this);
1184 return false;
1185 }
1186
1187 let Some(TaskExecutionSpec { future, span }) = this
1188 .backend
1189 .try_start_task_execution(task_id, priority, &*this)
1190 else {
1191 return false;
1192 };
1193
1194 async {
1195 let result = CaptureFuture::new(future).await;
1196
1197 wait_for_local_tasks().await;
1199
1200 let result = match result {
1201 Ok(Ok(raw_vc)) => Ok(raw_vc),
1202 Ok(Err(err)) => Err(err.into()),
1203 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1204 };
1205
1206 let FinishedTaskState { has_invalidator } =
1207 this.finish_current_task_state();
1208 let cell_counters = CURRENT_TASK_STATE
1209 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1210 this.backend.task_execution_completed(
1211 task_id,
1212 result,
1213 &cell_counters,
1214 has_invalidator,
1215 &*this,
1216 )
1217 }
1218 .instrument(span)
1219 .await
1220 };
1221 schedule_again = CURRENT_TASK_STATE
1222 .scope(current_task_state, single_execution_future)
1223 .await;
1224 }
1225 this.finish_foreground_job();
1226 };
1227
1228 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1229 }
1230 ScheduledTask::LocalTask {
1231 ty,
1232 persistence,
1233 local_task_id,
1234 global_task_state,
1235 span,
1236 } => {
1237 let this2 = this.clone();
1238 let this = this.clone();
1239 let task_type = ty.task_type;
1240 let future = async move {
1241 let span = match &ty.task_type {
1242 LocalTaskType::ResolveNative { native_fn } => {
1243 native_fn.resolve_span(priority)
1244 }
1245 LocalTaskType::ResolveTrait { trait_method } => {
1246 trait_method.resolve_span(priority)
1247 }
1248 };
1249 async move {
1250 let result = match ty.task_type {
1251 LocalTaskType::ResolveNative { native_fn } => {
1252 LocalTaskType::run_resolve_native(
1253 native_fn,
1254 ty.this,
1255 &*ty.arg,
1256 persistence,
1257 this,
1258 )
1259 .await
1260 }
1261 LocalTaskType::ResolveTrait { trait_method } => {
1262 LocalTaskType::run_resolve_trait(
1263 trait_method,
1264 ty.this.unwrap(),
1265 &*ty.arg,
1266 persistence,
1267 this,
1268 )
1269 .await
1270 }
1271 };
1272
1273 let output = match result {
1274 Ok(raw_vc) => OutputContent::Link(raw_vc),
1275 Err(err) => OutputContent::Error(
1276 TurboTasksExecutionError::from(err)
1277 .with_task_context(task_type, None),
1278 ),
1279 };
1280
1281 let local_task = LocalTask::Done { output };
1282
1283 let done_event = CURRENT_TASK_STATE.with(move |gts| {
1284 let mut gts_write = gts.write().unwrap();
1285 let scheduled_task = std::mem::replace(
1286 gts_write.get_mut_local_task(local_task_id),
1287 local_task,
1288 );
1289 let LocalTask::Scheduled { done_event } = scheduled_task else {
1290 panic!("local task finished, but was not in the scheduled state?");
1291 };
1292 done_event
1293 });
1294 done_event.notify(usize::MAX)
1295 }
1296 .instrument(span)
1297 .await
1298 };
1299 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1300
1301 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1302 }
1303 }
1304 }
1305}
1306
1307struct FinishedTaskState {
1308 has_invalidator: bool,
1310}
1311
1312impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1313 fn dynamic_call(
1314 &self,
1315 native_fn: &'static NativeFunction,
1316 this: Option<RawVc>,
1317 arg: Box<dyn MagicAny>,
1318 persistence: TaskPersistence,
1319 ) -> RawVc {
1320 self.dynamic_call(native_fn, this, arg, persistence)
1321 }
1322 fn native_call(
1323 &self,
1324 native_fn: &'static NativeFunction,
1325 this: Option<RawVc>,
1326 arg: Box<dyn MagicAny>,
1327 persistence: TaskPersistence,
1328 ) -> RawVc {
1329 self.native_call(native_fn, this, arg, persistence)
1330 }
1331 fn trait_call(
1332 &self,
1333 trait_method: &'static TraitMethod,
1334 this: RawVc,
1335 arg: Box<dyn MagicAny>,
1336 persistence: TaskPersistence,
1337 ) -> RawVc {
1338 self.trait_call(trait_method, this, arg, persistence)
1339 }
1340
1341 #[track_caller]
1342 fn run(
1343 &self,
1344 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1345 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1346 let this = self.pin();
1347 Box::pin(async move { this.run(future).await })
1348 }
1349
1350 #[track_caller]
1351 fn run_once(
1352 &self,
1353 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1354 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1355 let this = self.pin();
1356 Box::pin(async move { this.run_once(future).await })
1357 }
1358
1359 #[track_caller]
1360 fn run_once_with_reason(
1361 &self,
1362 reason: StaticOrArc<dyn InvalidationReason>,
1363 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1364 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1365 {
1366 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1367 reason_set.insert(reason);
1368 }
1369 let this = self.pin();
1370 Box::pin(async move { this.run_once(future).await })
1371 }
1372
1373 #[track_caller]
1374 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1375 self.start_once_process(future)
1376 }
1377}
1378
1379impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1380 #[instrument(level = "info", skip_all, name = "invalidate")]
1381 fn invalidate(&self, task: TaskId) {
1382 self.backend.invalidate_task(task, self);
1383 }
1384
1385 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1386 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1387 {
1388 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1389 reason_set.insert(reason);
1390 }
1391 self.backend.invalidate_task(task, self);
1392 }
1393
1394 fn invalidate_serialization(&self, task: TaskId) {
1395 self.backend.invalidate_serialization(task, self);
1396 }
1397
1398 fn try_read_task_output(
1399 &self,
1400 task: TaskId,
1401 options: ReadOutputOptions,
1402 ) -> Result<Result<RawVc, EventListener>> {
1403 self.backend.try_read_task_output(
1404 task,
1405 current_task_if_available("reading Vcs"),
1406 options,
1407 self,
1408 )
1409 }
1410
1411 fn try_read_task_cell(
1412 &self,
1413 task: TaskId,
1414 index: CellId,
1415 options: ReadCellOptions,
1416 ) -> Result<Result<TypedCellContent, EventListener>> {
1417 self.backend.try_read_task_cell(
1418 task,
1419 index,
1420 current_task_if_available("reading Vcs"),
1421 options,
1422 self,
1423 )
1424 }
1425
1426 fn try_read_own_task_cell(
1427 &self,
1428 current_task: TaskId,
1429 index: CellId,
1430 options: ReadCellOptions,
1431 ) -> Result<TypedCellContent> {
1432 self.backend
1433 .try_read_own_task_cell(current_task, index, options, self)
1434 }
1435
1436 fn try_read_local_output(
1437 &self,
1438 execution_id: ExecutionId,
1439 local_task_id: LocalTaskId,
1440 ) -> Result<Result<RawVc, EventListener>> {
1441 CURRENT_TASK_STATE.with(|gts| {
1442 let gts_read = gts.read().unwrap();
1443
1444 gts_read.assert_execution_id(execution_id);
1449
1450 match gts_read.get_local_task(local_task_id) {
1451 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1452 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1453 }
1454 })
1455 }
1456
1457 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1458 self.backend.read_task_collectibles(
1459 task,
1460 trait_id,
1461 current_task_if_available("reading collectibles"),
1462 self,
1463 )
1464 }
1465
1466 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1467 self.backend.emit_collectible(
1468 trait_type,
1469 collectible,
1470 current_task("emitting collectible"),
1471 self,
1472 );
1473 }
1474
1475 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1476 self.backend.unemit_collectible(
1477 trait_type,
1478 collectible,
1479 count,
1480 current_task("emitting collectible"),
1481 self,
1482 );
1483 }
1484
1485 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1486 for (&collectible, &count) in collectibles {
1487 if count > 0 {
1488 self.backend.unemit_collectible(
1489 trait_type,
1490 collectible,
1491 count as u32,
1492 current_task("emitting collectible"),
1493 self,
1494 );
1495 }
1496 }
1497 }
1498
1499 fn read_own_task_cell(
1500 &self,
1501 task: TaskId,
1502 index: CellId,
1503 options: ReadCellOptions,
1504 ) -> Result<TypedCellContent> {
1505 self.try_read_own_task_cell(task, index, options)
1506 }
1507
1508 fn update_own_task_cell(
1509 &self,
1510 task: TaskId,
1511 index: CellId,
1512 is_serializable_cell_content: bool,
1513 content: CellContent,
1514 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1515 verification_mode: VerificationMode,
1516 ) {
1517 self.backend.update_task_cell(
1518 task,
1519 index,
1520 is_serializable_cell_content,
1521 content,
1522 updated_key_hashes,
1523 verification_mode,
1524 self,
1525 );
1526 }
1527
1528 fn connect_task(&self, task: TaskId) {
1529 self.backend
1530 .connect_task(task, current_task_if_available("connecting task"), self);
1531 }
1532
1533 fn mark_own_task_as_finished(&self, task: TaskId) {
1534 self.backend.mark_own_task_as_finished(task, self);
1535 }
1536
1537 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1538 self.backend
1539 .set_own_task_aggregation_number(task, aggregation_number, self);
1540 }
1541
1542 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1543 self.backend.mark_own_task_as_session_dependent(task, self);
1544 }
1545
1546 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1549 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1552 let fut = tokio::spawn(TURBO_TASKS.scope(
1553 turbo_tasks(),
1554 CURRENT_TASK_STATE.scope(global_task_state.clone(), fut),
1555 ));
1556 let fut = Box::pin(async move {
1557 fut.await.unwrap();
1558 });
1559 let mut ts = global_task_state.write().unwrap();
1560 ts.local_task_tracker
1561 .get_or_insert_default()
1562 .push(Either::Right(fut));
1563 }
1564
1565 fn task_statistics(&self) -> &TaskStatisticsApi {
1566 self.backend.task_statistics()
1567 }
1568
1569 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1570 let this = self.pin();
1571 Box::pin(async move {
1572 this.stop_and_wait().await;
1573 })
1574 }
1575
1576 fn subscribe_to_compilation_events(
1577 &self,
1578 event_types: Option<Vec<String>>,
1579 ) -> Receiver<Arc<dyn CompilationEvent>> {
1580 self.compilation_events.subscribe(event_types)
1581 }
1582
1583 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1584 if let Err(e) = self.compilation_events.send(event) {
1585 tracing::warn!("Failed to send compilation event: {e}");
1586 }
1587 }
1588
1589 fn is_tracking_dependencies(&self) -> bool {
1590 self.backend.is_tracking_dependencies()
1591 }
1592}
1593
1594impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1595 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1596 self.pin()
1597 }
1598 fn backend(&self) -> &B {
1599 &self.backend
1600 }
1601
1602 #[track_caller]
1603 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1604 self.schedule_background_job(async move |this| {
1605 this.backend.run_backend_job(job, &*this).await;
1606 this
1607 })
1608 }
1609
1610 #[track_caller]
1611 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1612 self.schedule_foreground_job(async move |this| {
1613 this.backend.run_backend_job(job, &*this).await;
1614 this
1615 })
1616 }
1617
1618 #[track_caller]
1619 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1620 self.schedule(task, priority)
1621 }
1622
1623 fn get_current_task_priority(&self) -> TaskPriority {
1624 CURRENT_TASK_STATE
1625 .try_with(|task_state| task_state.read().unwrap().priority)
1626 .unwrap_or(TaskPriority::initial())
1627 }
1628
1629 fn program_duration_until(&self, instant: Instant) -> Duration {
1630 instant - self.program_start
1631 }
1632
1633 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1634 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1636 }
1637
1638 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1639 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1641 }
1642
1643 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1644 unsafe { self.task_id_factory.reuse(id.into()) }
1645 }
1646
1647 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1648 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1649 }
1650
1651 fn is_idle(&self) -> bool {
1652 self.currently_scheduled_foreground_jobs
1653 .load(Ordering::Acquire)
1654 == 0
1655 }
1656}
1657
1658async fn wait_for_local_tasks() {
1659 while let Some(mut ltt) =
1660 CURRENT_TASK_STATE.with(|ts| ts.write().unwrap().local_task_tracker.take())
1661 {
1662 use futures::StreamExt;
1663 while ltt.next().await.is_some() {}
1664 }
1665}
1666
1667pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1668 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1669 Ok(id) => id,
1670 Err(_) => panic!(
1671 "{from} can only be used in the context of a turbo_tasks task execution or \
1672 turbo_tasks run"
1673 ),
1674 }
1675}
1676
1677pub(crate) fn current_task(from: &str) -> TaskId {
1678 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1679 Ok(Some(id)) => id,
1680 Ok(None) | Err(_) => {
1681 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1682 }
1683 }
1684}
1685
1686pub async fn run<T: Send + 'static>(
1687 tt: Arc<dyn TurboTasksApi>,
1688 future: impl Future<Output = Result<T>> + Send + 'static,
1689) -> Result<T> {
1690 let (tx, rx) = tokio::sync::oneshot::channel();
1691
1692 tt.run(Box::pin(async move {
1693 let result = future.await?;
1694 tx.send(result)
1695 .map_err(|_| anyhow!("unable to send result"))?;
1696 Ok(())
1697 }))
1698 .await?;
1699
1700 Ok(rx.await?)
1701}
1702
1703pub async fn run_once<T: Send + 'static>(
1704 tt: Arc<dyn TurboTasksApi>,
1705 future: impl Future<Output = Result<T>> + Send + 'static,
1706) -> Result<T> {
1707 let (tx, rx) = tokio::sync::oneshot::channel();
1708
1709 tt.run_once(Box::pin(async move {
1710 let result = future.await?;
1711 tx.send(result)
1712 .map_err(|_| anyhow!("unable to send result"))?;
1713 Ok(())
1714 }))
1715 .await?;
1716
1717 Ok(rx.await?)
1718}
1719
1720pub async fn run_once_with_reason<T: Send + 'static>(
1721 tt: Arc<dyn TurboTasksApi>,
1722 reason: impl InvalidationReason,
1723 future: impl Future<Output = Result<T>> + Send + 'static,
1724) -> Result<T> {
1725 let (tx, rx) = tokio::sync::oneshot::channel();
1726
1727 tt.run_once_with_reason(
1728 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1729 Box::pin(async move {
1730 let result = future.await?;
1731 tx.send(result)
1732 .map_err(|_| anyhow!("unable to send result"))?;
1733 Ok(())
1734 }),
1735 )
1736 .await?;
1737
1738 Ok(rx.await?)
1739}
1740
1741pub fn dynamic_call(
1743 func: &'static NativeFunction,
1744 this: Option<RawVc>,
1745 arg: Box<dyn MagicAny>,
1746 persistence: TaskPersistence,
1747) -> RawVc {
1748 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1749}
1750
1751pub fn trait_call(
1753 trait_method: &'static TraitMethod,
1754 this: RawVc,
1755 arg: Box<dyn MagicAny>,
1756 persistence: TaskPersistence,
1757) -> RawVc {
1758 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1759}
1760
1761pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1762 TURBO_TASKS.with(|arc| arc.clone())
1763}
1764
1765pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1766 TURBO_TASKS.with(Arc::downgrade)
1767}
1768
1769pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1770 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1771}
1772
1773pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1774 TURBO_TASKS.with(|arc| func(arc))
1775}
1776
1777pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1778 TURBO_TASKS.sync_scope(tt, f)
1779}
1780
1781pub fn turbo_tasks_future_scope<T>(
1782 tt: Arc<dyn TurboTasksApi>,
1783 f: impl Future<Output = T>,
1784) -> impl Future<Output = T> {
1785 TURBO_TASKS.scope(tt, f)
1786}
1787
1788pub fn with_turbo_tasks_for_testing<T>(
1789 tt: Arc<dyn TurboTasksApi>,
1790 current_task: TaskId,
1791 execution_id: ExecutionId,
1792 f: impl Future<Output = T>,
1793) -> impl Future<Output = T> {
1794 TURBO_TASKS.scope(
1795 tt,
1796 CURRENT_TASK_STATE.scope(
1797 Arc::new(RwLock::new(CurrentTaskState::new(
1798 current_task,
1799 execution_id,
1800 TaskPriority::initial(),
1801 ))),
1802 f,
1803 ),
1804 )
1805}
1806
1807pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1812 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1813}
1814
1815pub fn current_task_for_testing() -> Option<TaskId> {
1816 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1817}
1818
1819pub fn mark_session_dependent() {
1821 with_turbo_tasks(|tt| {
1822 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1823 });
1824}
1825
1826pub fn mark_root() {
1829 with_turbo_tasks(|tt| {
1830 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1831 });
1832}
1833
1834pub fn mark_finished() {
1837 with_turbo_tasks(|tt| {
1838 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1839 });
1840}
1841
1842pub fn get_serialization_invalidator() -> SerializationInvalidator {
1845 CURRENT_TASK_STATE.with(|cell| {
1846 let CurrentTaskState { task_id, .. } = &mut *cell.write().unwrap();
1847 let Some(task_id) = *task_id else {
1848 panic!(
1849 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1850 task execution"
1851 );
1852 };
1853 SerializationInvalidator::new(task_id)
1854 })
1855}
1856
1857pub fn mark_invalidator() {
1858 CURRENT_TASK_STATE.with(|cell| {
1859 let CurrentTaskState {
1860 has_invalidator, ..
1861 } = &mut *cell.write().unwrap();
1862 *has_invalidator = true;
1863 })
1864}
1865
1866pub fn prevent_gc() {
1867 }
1869
1870pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1871 with_turbo_tasks(|tt| {
1872 let raw_vc = collectible.node.node;
1873 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1874 })
1875}
1876
1877pub(crate) async fn read_task_output(
1878 this: &dyn TurboTasksApi,
1879 id: TaskId,
1880 options: ReadOutputOptions,
1881) -> Result<RawVc> {
1882 loop {
1883 match this.try_read_task_output(id, options)? {
1884 Ok(result) => return Ok(result),
1885 Err(listener) => listener.await,
1886 }
1887 }
1888}
1889
1890pub(crate) async fn read_task_cell(
1891 this: &dyn TurboTasksApi,
1892 id: TaskId,
1893 index: CellId,
1894 options: ReadCellOptions,
1895) -> Result<TypedCellContent> {
1896 loop {
1897 match this.try_read_task_cell(id, index, options)? {
1898 Ok(result) => return Ok(result),
1899 Err(listener) => listener.await,
1900 }
1901 }
1902}
1903
1904#[derive(Clone, Copy)]
1910pub struct CurrentCellRef {
1911 current_task: TaskId,
1912 index: CellId,
1913 is_serializable_cell_content: bool,
1914}
1915
1916type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
1917
1918impl CurrentCellRef {
1919 fn conditional_update<T>(
1921 &self,
1922 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>)>,
1923 ) where
1924 T: VcValueType,
1925 {
1926 self.conditional_update_with_shared_reference(|old_shared_reference| {
1927 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
1928 let (new_value, updated_key_hashes) = functor(old_ref)?;
1929 Some((
1930 SharedReference::new(triomphe::Arc::new(new_value)),
1931 updated_key_hashes,
1932 ))
1933 })
1934 }
1935
1936 fn conditional_update_with_shared_reference(
1938 &self,
1939 functor: impl FnOnce(
1940 Option<&SharedReference>,
1941 ) -> Option<(SharedReference, Option<SmallVec<[u64; 2]>>)>,
1942 ) {
1943 let tt = turbo_tasks();
1944 let cell_content = tt
1945 .read_own_task_cell(
1946 self.current_task,
1947 self.index,
1948 ReadCellOptions {
1949 tracking: ReadCellTracking::Untracked,
1951 is_serializable_cell_content: self.is_serializable_cell_content,
1952 final_read_hint: false,
1953 },
1954 )
1955 .ok();
1956 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1957 if let Some((update, updated_key_hashes)) = update {
1958 tt.update_own_task_cell(
1959 self.current_task,
1960 self.index,
1961 self.is_serializable_cell_content,
1962 CellContent(Some(update)),
1963 updated_key_hashes,
1964 VerificationMode::EqualityCheck,
1965 )
1966 }
1967 }
1968
1969 pub fn compare_and_update<T>(&self, new_value: T)
2003 where
2004 T: PartialEq + VcValueType,
2005 {
2006 self.conditional_update(|old_value| {
2007 if let Some(old_value) = old_value
2008 && old_value == &new_value
2009 {
2010 return None;
2011 }
2012 Some((new_value, None))
2013 });
2014 }
2015
2016 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2024 where
2025 T: VcValueType + PartialEq,
2026 {
2027 self.conditional_update_with_shared_reference(|old_sr| {
2028 if let Some(old_sr) = old_sr {
2029 let old_value = extract_sr_value::<T>(old_sr);
2030 let new_value = extract_sr_value::<T>(&new_shared_reference);
2031 if old_value == new_value {
2032 return None;
2033 }
2034 }
2035 Some((new_shared_reference, None))
2036 });
2037 }
2038
2039 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2041 where
2042 T: PartialEq + VcValueType,
2043 VcReadTarget<T>: Keyed,
2044 <VcReadTarget<T> as Keyed>::Key: std::hash::Hash,
2045 {
2046 self.conditional_update(|old_value| {
2047 let Some(old_value) = old_value else {
2048 return Some((new_value, None));
2049 };
2050 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2051 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2052 let updated_keys = old_value.different_keys(new_value_ref);
2053 if updated_keys.is_empty() {
2054 return None;
2055 }
2056 let updated_key_hashes = updated_keys
2058 .into_iter()
2059 .map(|key| FxBuildHasher.hash_one(key))
2060 .collect();
2061 Some((new_value, Some(updated_key_hashes)))
2062 });
2063 }
2064
2065 pub fn keyed_compare_and_update_with_shared_reference<T>(
2068 &self,
2069 new_shared_reference: SharedReference,
2070 ) where
2071 T: VcValueType + PartialEq,
2072 VcReadTarget<T>: Keyed,
2073 <VcReadTarget<T> as Keyed>::Key: std::hash::Hash,
2074 {
2075 self.conditional_update_with_shared_reference(|old_sr| {
2076 let Some(old_sr) = old_sr else {
2077 return Some((new_shared_reference, None));
2078 };
2079 let old_value = extract_sr_value::<T>(old_sr);
2080 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2081 let new_value = extract_sr_value::<T>(&new_shared_reference);
2082 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2083 let updated_keys = old_value.different_keys(new_value);
2084 if updated_keys.is_empty() {
2085 return None;
2086 }
2087 let updated_key_hashes = updated_keys
2089 .into_iter()
2090 .map(|key| FxBuildHasher.hash_one(key))
2091 .collect();
2092 Some((new_shared_reference, Some(updated_key_hashes)))
2093 });
2094 }
2095
2096 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2098 where
2099 T: VcValueType,
2100 {
2101 let tt = turbo_tasks();
2102 tt.update_own_task_cell(
2103 self.current_task,
2104 self.index,
2105 self.is_serializable_cell_content,
2106 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2107 None,
2108 verification_mode,
2109 )
2110 }
2111
2112 pub fn update_with_shared_reference(
2120 &self,
2121 shared_ref: SharedReference,
2122 verification_mode: VerificationMode,
2123 ) {
2124 let tt = turbo_tasks();
2125 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2126 let content = tt
2127 .read_own_task_cell(
2128 self.current_task,
2129 self.index,
2130 ReadCellOptions {
2131 tracking: ReadCellTracking::Untracked,
2133 is_serializable_cell_content: self.is_serializable_cell_content,
2134 final_read_hint: false,
2135 },
2136 )
2137 .ok();
2138 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2139 shared_ref_exp != shared_ref
2141 } else {
2142 true
2143 }
2144 } else {
2145 true
2146 };
2147 if update {
2148 tt.update_own_task_cell(
2149 self.current_task,
2150 self.index,
2151 self.is_serializable_cell_content,
2152 CellContent(Some(shared_ref)),
2153 None,
2154 verification_mode,
2155 )
2156 }
2157 }
2158}
2159
2160impl From<CurrentCellRef> for RawVc {
2161 fn from(cell: CurrentCellRef) -> Self {
2162 RawVc::TaskCell(cell.current_task, cell.index)
2163 }
2164}
2165
2166fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2167 sr.0.downcast_ref::<T>()
2168 .expect("cannot update SharedReference of different type")
2169}
2170
2171pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2172 find_cell_by_id(T::get_value_type_id(), T::has_serialization())
2173}
2174
2175pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
2176 CURRENT_TASK_STATE.with(|ts| {
2177 let current_task = current_task("celling turbo_tasks values");
2178 let mut ts = ts.write().unwrap();
2179 let map = ts.cell_counters.as_mut().unwrap();
2180 let current_index = map.entry(ty).or_default();
2181 let index = *current_index;
2182 *current_index += 1;
2183 CurrentCellRef {
2184 current_task,
2185 index: CellId { type_id: ty, index },
2186 is_serializable_cell_content,
2187 }
2188 })
2189}
2190
2191pub(crate) async fn read_local_output(
2192 this: &dyn TurboTasksApi,
2193 execution_id: ExecutionId,
2194 local_task_id: LocalTaskId,
2195) -> Result<RawVc> {
2196 loop {
2197 match this.try_read_local_output(execution_id, local_task_id)? {
2198 Ok(raw_vc) => return Ok(raw_vc),
2199 Err(event_listener) => event_listener.await,
2200 }
2201 }
2202}