1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 panic::AssertUnwindSafe,
8 pin::Pin,
9 process::abort,
10 sync::{
11 Arc, Mutex, RwLock, Weak,
12 atomic::{AtomicBool, AtomicUsize, Ordering},
13 },
14 time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30 CellId, Completion, InvalidationReason, InvalidationReasonSet, OutputContent, RawVc,
31 ReadCellOptions, ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod,
32 ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType,
33 backend::{
34 Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35 TurboTasksExecutionError, TypedCellContent, VerificationMode,
36 },
37 capture_future::CaptureFuture,
38 dyn_task_inputs::DynTaskInputsStorage,
39 event::{Event, EventListener},
40 id::{ExecutionId, LocalTaskId, TraitTypeId},
41 keyed::KeyedEq,
42 local_task_tracker::LocalTaskTracker,
43 macro_helpers::NativeFunction,
44 message_queue::{CompilationEvent, CompilationEventQueue},
45 priority_runner::{Executor, PriorityRunner},
46 registry,
47 serialization_invalidation::SerializationInvalidator,
48 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
49 task_statistics::TaskStatisticsApi,
50 trace::TraceRawVcs,
51 util::{IdFactory, StaticOrArc},
52};
53
54pub trait TurboTasksCallApi: Sync + Send {
57 fn dynamic_call(
64 &self,
65 native_fn: &'static NativeFunction,
66 this: Option<RawVc>,
67 arg: &mut dyn DynTaskInputsStorage,
68 inputs_resolved: InputResolution,
69 persistence: TaskPersistence,
70 ) -> RawVc;
71 fn native_call(
74 &self,
75 native_fn: &'static NativeFunction,
76 this: Option<RawVc>,
77 arg: &mut dyn DynTaskInputsStorage,
78 persistence: TaskPersistence,
79 ) -> RawVc;
80 fn trait_call(
87 &self,
88 trait_method: &'static TraitMethod,
89 this: RawVc,
90 arg: &mut dyn DynTaskInputsStorage,
91 inputs_resolved: InputResolution,
92 persistence: TaskPersistence,
93 ) -> RawVc;
94
95 fn run(
96 &self,
97 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
98 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
99 fn run_once(
100 &self,
101 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
102 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
103 fn run_once_with_reason(
104 &self,
105 reason: StaticOrArc<dyn InvalidationReason>,
106 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
107 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
108 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
109
110 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
112
113 fn get_task_name(&self, task: TaskId) -> String;
115}
116
117pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
123 fn invalidate(&self, task: TaskId);
124 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
125
126 fn invalidate_serialization(&self, task: TaskId);
127
128 fn try_read_task_output(
129 &self,
130 task: TaskId,
131 options: ReadOutputOptions,
132 ) -> Result<Result<RawVc, EventListener>>;
133
134 fn try_read_task_cell(
135 &self,
136 task: TaskId,
137 index: CellId,
138 options: ReadCellOptions,
139 ) -> Result<Result<TypedCellContent, EventListener>>;
140
141 fn try_read_local_output(
156 &self,
157 execution_id: ExecutionId,
158 local_task_id: LocalTaskId,
159 ) -> Result<Result<RawVc, EventListener>>;
160
161 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
162
163 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
164 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
165 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
166
167 fn try_read_own_task_cell(
170 &self,
171 current_task: TaskId,
172 index: CellId,
173 ) -> Result<TypedCellContent>;
174
175 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
176 fn update_own_task_cell(
177 &self,
178 task: TaskId,
179 index: CellId,
180 content: CellContent,
181 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
182 content_hash: Option<CellHash>,
183 verification_mode: VerificationMode,
184 );
185 fn mark_own_task_as_finished(&self, task: TaskId);
186
187 fn connect_task(&self, task: TaskId);
188
189 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
194
195 fn task_statistics(&self) -> &TaskStatisticsApi;
196
197 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
198
199 fn subscribe_to_compilation_events(
200 &self,
201 event_types: Option<Vec<String>>,
202 ) -> Receiver<Arc<dyn CompilationEvent>>;
203
204 fn is_tracking_dependencies(&self) -> bool;
206}
207
208pub struct Unused<T> {
210 inner: T,
211}
212
213impl<T> Unused<T> {
214 pub unsafe fn new_unchecked(inner: T) -> Self {
220 Self { inner }
221 }
222
223 pub unsafe fn get_unchecked(&self) -> &T {
229 &self.inner
230 }
231
232 pub fn into(self) -> T {
234 self.inner
235 }
236}
237
238#[allow(clippy::manual_non_exhaustive)]
239pub struct UpdateInfo {
240 pub duration: Duration,
241 pub tasks: usize,
242 pub reasons: InvalidationReasonSet,
243 #[allow(dead_code)]
244 placeholder_for_future_fields: (),
245}
246
247#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
248pub enum TaskPersistence {
249 Persistent,
251
252 Transient,
259}
260
261impl Display for TaskPersistence {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 match self {
264 TaskPersistence::Persistent => write!(f, "persistent"),
265 TaskPersistence::Transient => write!(f, "transient"),
266 }
267 }
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
273pub enum InputResolution {
274 Resolved,
277 Unresolved,
279}
280
281impl InputResolution {
282 #[inline]
283 pub fn from_is_resolved(is_resolved: bool) -> Self {
284 if is_resolved {
285 Self::Resolved
286 } else {
287 Self::Unresolved
288 }
289 }
290
291 #[inline]
292 pub fn is_resolved(self) -> bool {
293 matches!(self, Self::Resolved)
294 }
295}
296
297#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
298pub enum ReadConsistency {
299 #[default]
302 Eventual,
303 Strong,
308}
309
310#[derive(Clone, Copy, Debug, Eq, PartialEq)]
311pub enum ReadCellTracking {
312 Tracked {
314 key: Option<u64>,
316 },
317 TrackOnlyError,
322 Untracked,
327}
328
329impl ReadCellTracking {
330 pub fn should_track(&self, is_err: bool) -> bool {
331 match self {
332 ReadCellTracking::Tracked { .. } => true,
333 ReadCellTracking::TrackOnlyError => is_err,
334 ReadCellTracking::Untracked => false,
335 }
336 }
337
338 pub fn key(&self) -> Option<u64> {
339 match self {
340 ReadCellTracking::Tracked { key } => *key,
341 ReadCellTracking::TrackOnlyError => None,
342 ReadCellTracking::Untracked => None,
343 }
344 }
345}
346
347impl Default for ReadCellTracking {
348 fn default() -> Self {
349 ReadCellTracking::Tracked { key: None }
350 }
351}
352
353impl Display for ReadCellTracking {
354 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355 match self {
356 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
357 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
358 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
359 ReadCellTracking::Untracked => write!(f, "untracked"),
360 }
361 }
362}
363
364#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
365pub enum ReadTracking {
366 #[default]
368 Tracked,
369 TrackOnlyError,
374 Untracked,
379}
380
381impl ReadTracking {
382 pub fn should_track(&self, is_err: bool) -> bool {
383 match self {
384 ReadTracking::Tracked => true,
385 ReadTracking::TrackOnlyError => is_err,
386 ReadTracking::Untracked => false,
387 }
388 }
389}
390
391impl Display for ReadTracking {
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 match self {
394 ReadTracking::Tracked => write!(f, "tracked"),
395 ReadTracking::TrackOnlyError => write!(f, "track only error"),
396 ReadTracking::Untracked => write!(f, "untracked"),
397 }
398 }
399}
400
401#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
402pub enum TaskPriority {
403 #[default]
404 Initial,
405 Invalidation {
406 priority: Reverse<u32>,
407 },
408 Recomputation,
409}
410
411impl TaskPriority {
412 pub fn invalidation(priority: u32) -> Self {
413 Self::Invalidation {
414 priority: Reverse(priority),
415 }
416 }
417
418 pub fn initial() -> Self {
419 Self::Initial
420 }
421
422 pub fn leaf() -> Self {
423 Self::Invalidation {
424 priority: Reverse(0),
425 }
426 }
427
428 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
429 match self {
430 TaskPriority::Initial => parent_priority,
431 TaskPriority::Invalidation { priority } => {
432 if let TaskPriority::Invalidation {
433 priority: parent_priority,
434 } = parent_priority
435 && priority.0 < parent_priority.0
436 {
437 Self::Invalidation {
438 priority: Reverse(parent_priority.0.saturating_add(1)),
439 }
440 } else {
441 *self
442 }
443 }
444 TaskPriority::Recomputation => TaskPriority::Recomputation,
445 }
446 }
447}
448
449impl Display for TaskPriority {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 match self {
452 TaskPriority::Initial => write!(f, "initial"),
453 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
454 TaskPriority::Recomputation => write!(f, "recomputation"),
455 }
456 }
457}
458
459enum ScheduledTask {
460 Task {
461 task_id: TaskId,
462 span: Span,
463 },
464 LocalTask {
465 ty: LocalTaskSpec,
466 persistence: TaskPersistence,
467 local_task_id: LocalTaskId,
468 global_task_state: Arc<RwLock<CurrentTaskState>>,
469 span: Span,
470 },
471}
472
473pub struct TurboTasks<B: Backend + 'static> {
474 this: Weak<Self>,
475 backend: B,
476 execution_id_factory: IdFactory<ExecutionId>,
477 stopped: AtomicBool,
478 currently_scheduled_foreground_jobs: AtomicUsize,
479 currently_scheduled_background_jobs: AtomicUsize,
480 scheduled_tasks: AtomicUsize,
481 priority_runner:
482 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
483 start: Mutex<Option<Instant>>,
484 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
485 event_foreground_start: Event,
487 event_foreground_done: Event,
490 event_background_done: Event,
492 compilation_events: CompilationEventQueue,
493}
494
495struct CurrentTaskState {
504 task_id: Option<TaskId>,
505 execution_id: ExecutionId,
506 priority: TaskPriority,
507
508 #[cfg(feature = "verify_determinism")]
511 stateful: bool,
512
513 has_invalidator: bool,
515
516 in_top_level_task: bool,
519
520 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
525
526 local_tasks: LocalTaskTracker,
529}
530
531impl CurrentTaskState {
532 fn new(
533 task_id: TaskId,
534 execution_id: ExecutionId,
535 priority: TaskPriority,
536 in_top_level_task: bool,
537 ) -> Self {
538 Self {
539 task_id: Some(task_id),
540 execution_id,
541 priority,
542 #[cfg(feature = "verify_determinism")]
543 stateful: false,
544 has_invalidator: false,
545 in_top_level_task,
546 cell_counters: Some(AutoMap::default()),
547 local_tasks: LocalTaskTracker::new(),
548 }
549 }
550
551 fn new_temporary(
552 execution_id: ExecutionId,
553 priority: TaskPriority,
554 in_top_level_task: bool,
555 ) -> Self {
556 Self {
557 task_id: None,
558 execution_id,
559 priority,
560 #[cfg(feature = "verify_determinism")]
561 stateful: false,
562 has_invalidator: false,
563 in_top_level_task,
564 cell_counters: None,
565 local_tasks: LocalTaskTracker::new(),
566 }
567 }
568
569 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
570 if self.execution_id != expected_execution_id {
571 panic!(
572 "Local tasks can only be scheduled/awaited within the same execution of the \
573 parent task that created them"
574 );
575 }
576 }
577}
578
579task_local! {
581 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
583
584 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
585
586 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
591}
592
593impl<B: Backend + 'static> TurboTasks<B> {
594 pub fn new(backend: B) -> Arc<Self> {
600 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
601 let this = Arc::new_cyclic(|this| Self {
602 this: this.clone(),
603 backend,
604 execution_id_factory,
605 stopped: AtomicBool::new(false),
606 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
607 currently_scheduled_background_jobs: AtomicUsize::new(0),
608 scheduled_tasks: AtomicUsize::new(0),
609 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
610 start: Default::default(),
611 aggregated_update: Default::default(),
612 event_foreground_done: Event::new(|| {
613 || "TurboTasks::event_foreground_done".to_string()
614 }),
615 event_foreground_start: Event::new(|| {
616 || "TurboTasks::event_foreground_start".to_string()
617 }),
618 event_background_done: Event::new(|| {
619 || "TurboTasks::event_background_done".to_string()
620 }),
621 compilation_events: CompilationEventQueue::default(),
622 });
623 this.backend.startup(&*this);
624 this
625 }
626
627 pub fn pin(&self) -> Arc<Self> {
628 self.this.upgrade().unwrap()
629 }
630
631 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
633 where
634 T: ?Sized,
635 F: Fn() -> Fut + Send + Sync + Clone + 'static,
636 Fut: Future<Output = Result<Vc<T>>> + Send,
637 {
638 let id = self.backend.create_transient_task(
639 TransientTaskType::Root(Box::new(move || {
640 let functor = functor.clone();
641 Box::pin(async move {
642 mark_top_level_task();
643 let raw_vc = functor().await?.node;
644 raw_vc.to_non_local().await
645 })
646 })),
647 self,
648 );
649 self.schedule(id, TaskPriority::initial());
650 id
651 }
652
653 pub fn dispose_root_task(&self, task_id: TaskId) {
654 self.backend.dispose_root_task(task_id, self);
655 }
656
657 #[track_caller]
661 fn spawn_once_task<T, Fut>(&self, future: Fut)
662 where
663 T: ?Sized,
664 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
665 {
666 let id = self.backend.create_transient_task(
667 TransientTaskType::Once(Box::pin(async move {
668 mark_top_level_task();
669 let raw_vc = future.await?.node;
670 raw_vc.to_non_local().await
671 })),
672 self,
673 );
674 self.schedule(id, TaskPriority::initial());
675 }
676
677 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
678 &self,
679 future: impl Future<Output = Result<T>> + Send + 'static,
680 ) -> Result<T> {
681 let (tx, rx) = tokio::sync::oneshot::channel();
682 self.spawn_once_task(async move {
683 mark_top_level_task();
684 let result = future.await;
685 tx.send(result)
686 .map_err(|_| anyhow!("unable to send result"))?;
687 Ok(Completion::new())
688 });
689
690 rx.await?
691 }
692
693 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
694 pub async fn run<T: TraceRawVcs + Send + 'static>(
695 &self,
696 future: impl Future<Output = Result<T>> + Send + 'static,
697 ) -> Result<T, TurboTasksExecutionError> {
698 self.begin_foreground_job();
699 let execution_id = self.execution_id_factory.wrapping_get();
701 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
702 execution_id,
703 TaskPriority::initial(),
704 true, )));
706
707 let result = TURBO_TASKS
708 .scope(
709 self.pin(),
710 CURRENT_TASK_STATE.scope(current_task_state, async {
711 let result = CaptureFuture::new(future).await;
712
713 wait_for_local_tasks().await;
715
716 match result {
717 Ok(Ok(value)) => Ok(value),
718 Ok(Err(err)) => Err(err.into()),
719 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
720 }
721 }),
722 )
723 .await;
724 self.finish_foreground_job();
725 result
726 }
727
728 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
729 let this = self.pin();
730 tokio::spawn(async move {
731 this.pin()
732 .run_once(async move {
733 this.finish_foreground_job();
734 future.await;
735 this.begin_foreground_job();
736 Ok(())
737 })
738 .await
739 .unwrap()
740 });
741 }
742
743 pub(crate) fn native_call(
744 &self,
745 native_fn: &'static NativeFunction,
746 this: Option<RawVc>,
747 arg: &mut dyn DynTaskInputsStorage,
748 persistence: TaskPersistence,
749 ) -> RawVc {
750 RawVc::TaskOutput(self.backend.get_or_create_task(
751 native_fn,
752 this,
753 arg,
754 current_task_if_available("turbo_function calls"),
755 persistence,
756 self,
757 ))
758 }
759
760 pub fn dynamic_call(
761 &self,
762 native_fn: &'static NativeFunction,
763 this: Option<RawVc>,
764 arg: &mut dyn DynTaskInputsStorage,
765 inputs_resolved: InputResolution,
766 persistence: TaskPersistence,
767 ) -> RawVc {
768 if inputs_resolved.is_resolved() && this.is_none_or(|this| this.is_resolved()) {
769 return self.native_call(native_fn, this, arg, persistence);
770 }
771 let arg = arg.take_box();
773 let task_type = LocalTaskSpec {
774 task_type: LocalTaskType::ResolveNative { native_fn },
775 this,
776 arg,
777 };
778 self.schedule_local_task(task_type, persistence)
779 }
780
781 pub fn trait_call(
782 &self,
783 trait_method: &'static TraitMethod,
784 this: RawVc,
785 arg: &mut dyn DynTaskInputsStorage,
786 inputs_resolved: InputResolution,
787 persistence: TaskPersistence,
788 ) -> RawVc {
789 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
793 match registry::get_value_type(type_id).get_trait_method(trait_method) {
794 Some(native_fn) => {
795 if let Some(filter) = native_fn.arg_meta.filter_owned {
796 let (resolved, mut arg) = (filter)(arg);
797 return self.dynamic_call(
798 native_fn,
799 Some(this),
800 &mut arg,
801 resolved,
802 persistence,
803 );
804 } else {
805 return self.dynamic_call(
806 native_fn,
807 Some(this),
808 arg,
809 inputs_resolved,
810 persistence,
811 );
812 }
813 }
814 None => {
815 }
819 }
820 }
821
822 let task_type = LocalTaskSpec {
824 task_type: LocalTaskType::ResolveTrait { trait_method },
825 this: Some(this),
826 arg: arg.take_box(),
827 };
828
829 self.schedule_local_task(task_type, persistence)
830 }
831
832 #[track_caller]
833 pub fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
834 self.begin_foreground_job();
835 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
836
837 self.priority_runner.schedule(
838 &self.pin(),
839 ScheduledTask::Task {
840 task_id,
841 span: Span::current(),
842 },
843 priority,
844 );
845 }
846
847 fn schedule_local_task(
848 &self,
849 ty: LocalTaskSpec,
850 persistence: TaskPersistence,
852 ) -> RawVc {
853 let task_type = ty.task_type;
854 let (global_task_state, execution_id, priority, local_task_id) =
855 CURRENT_TASK_STATE.with(|gts| {
856 let mut gts_write = gts.write().unwrap();
857 let local_task_id = gts_write.local_tasks.create(task_type);
858 (
859 Arc::clone(gts),
860 gts_write.execution_id,
861 gts_write.priority,
862 local_task_id,
863 )
864 });
865
866 self.priority_runner.schedule(
867 &self.pin(),
868 ScheduledTask::LocalTask {
869 ty,
870 persistence,
871 local_task_id,
872 global_task_state,
873 span: Span::current(),
874 },
875 priority,
876 );
877
878 RawVc::LocalOutput(execution_id, local_task_id, persistence)
879 }
880
881 fn begin_foreground_job(&self) {
882 if self
883 .currently_scheduled_foreground_jobs
884 .fetch_add(1, Ordering::AcqRel)
885 == 0
886 {
887 *self.start.lock().unwrap() = Some(Instant::now());
888 self.event_foreground_start.notify(usize::MAX);
889 self.backend.idle_end(self);
890 }
891 }
892
893 fn finish_foreground_job(&self) {
894 if self
895 .currently_scheduled_foreground_jobs
896 .fetch_sub(1, Ordering::AcqRel)
897 == 1
898 {
899 self.backend.idle_start(self);
900 let total = self.scheduled_tasks.load(Ordering::Acquire);
903 self.scheduled_tasks.store(0, Ordering::Release);
904 if let Some(start) = *self.start.lock().unwrap() {
905 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
906 if let Some(update) = update.as_mut() {
907 update.0 += start.elapsed();
908 update.1 += total;
909 } else {
910 *update = Some((start.elapsed(), total));
911 }
912 }
913 self.event_foreground_done.notify(usize::MAX);
914 }
915 }
916
917 fn begin_background_job(&self) {
918 self.currently_scheduled_background_jobs
919 .fetch_add(1, Ordering::Relaxed);
920 }
921
922 fn finish_background_job(&self) {
923 if self
924 .currently_scheduled_background_jobs
925 .fetch_sub(1, Ordering::Relaxed)
926 == 1
927 {
928 self.event_background_done.notify(usize::MAX);
929 }
930 }
931
932 pub fn get_in_progress_count(&self) -> usize {
933 self.currently_scheduled_foreground_jobs
934 .load(Ordering::Acquire)
935 }
936
937 pub async fn wait_task_completion(
949 &self,
950 id: TaskId,
951 consistency: ReadConsistency,
952 ) -> Result<()> {
953 read_task_output(
954 self,
955 id,
956 ReadOutputOptions {
957 tracking: ReadTracking::Untracked,
959 consistency,
960 },
961 )
962 .await?;
963 Ok(())
964 }
965
966 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
969 self.aggregated_update_info(aggregation, Duration::MAX)
970 .await
971 .unwrap()
972 }
973
974 pub async fn aggregated_update_info(
978 &self,
979 aggregation: Duration,
980 timeout: Duration,
981 ) -> Option<UpdateInfo> {
982 let listener = self
983 .event_foreground_done
984 .listen_with_note(|| || "wait for update info".to_string());
985 let wait_for_finish = {
986 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
987 if aggregation.is_zero() {
988 if let Some((duration, tasks)) = update.take() {
989 return Some(UpdateInfo {
990 duration,
991 tasks,
992 reasons: take(reason_set),
993 placeholder_for_future_fields: (),
994 });
995 } else {
996 true
997 }
998 } else {
999 update.is_none()
1000 }
1001 };
1002 if wait_for_finish {
1003 if timeout == Duration::MAX {
1004 listener.await;
1006 } else {
1007 let start_listener = self
1009 .event_foreground_start
1010 .listen_with_note(|| || "wait for update info".to_string());
1011 if self
1012 .currently_scheduled_foreground_jobs
1013 .load(Ordering::Acquire)
1014 == 0
1015 {
1016 start_listener.await;
1017 } else {
1018 drop(start_listener);
1019 }
1020 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1021 return None;
1023 }
1024 }
1025 }
1026 if !aggregation.is_zero() {
1027 loop {
1028 select! {
1029 () = tokio::time::sleep(aggregation) => {
1030 break;
1031 }
1032 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1033 }
1035 }
1036 }
1037 }
1038 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1039 if let Some((duration, tasks)) = update.take() {
1040 Some(UpdateInfo {
1041 duration,
1042 tasks,
1043 reasons: take(reason_set),
1044 placeholder_for_future_fields: (),
1045 })
1046 } else {
1047 panic!("aggregated_update_info must not called concurrently")
1048 }
1049 }
1050
1051 pub async fn wait_background_done(&self) {
1052 let listener = self.event_background_done.listen();
1053 if self
1054 .currently_scheduled_background_jobs
1055 .load(Ordering::Acquire)
1056 != 0
1057 {
1058 listener.await;
1059 }
1060 }
1061
1062 pub async fn stop_and_wait(&self) {
1063 turbo_tasks_future_scope(self.pin(), async move {
1064 self.backend.stopping(self);
1065 self.stopped.store(true, Ordering::Release);
1066 {
1067 let listener = self
1068 .event_foreground_done
1069 .listen_with_note(|| || "wait for stop".to_string());
1070 if self
1071 .currently_scheduled_foreground_jobs
1072 .load(Ordering::Acquire)
1073 != 0
1074 {
1075 listener.await;
1076 }
1077 }
1078 {
1079 let listener = self.event_background_done.listen();
1080 if self
1081 .currently_scheduled_background_jobs
1082 .load(Ordering::Acquire)
1083 != 0
1084 {
1085 listener.await;
1086 }
1087 }
1088 self.backend.stop(self);
1089 })
1090 .await;
1091 }
1092
1093 #[track_caller]
1094 pub(crate) fn schedule_background_job<T>(&self, func: T)
1095 where
1096 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1097 T::CallOnceFuture: Send,
1098 {
1099 let mut this = self.pin();
1100 self.begin_background_job();
1101 tokio::spawn(
1102 TURBO_TASKS
1103 .scope(this.clone(), async move {
1104 if !this.stopped.load(Ordering::Acquire) {
1105 this = func(this).await;
1106 }
1107 this.finish_background_job();
1108 })
1109 .in_current_span(),
1110 );
1111 }
1112
1113 fn finish_current_task_state(&self) -> FinishedTaskState {
1114 CURRENT_TASK_STATE.with(|cell| {
1115 let current_task_state = &*cell.write().unwrap();
1116 FinishedTaskState {
1117 #[cfg(feature = "verify_determinism")]
1118 stateful: current_task_state.stateful,
1119 has_invalidator: current_task_state.has_invalidator,
1120 }
1121 })
1122 }
1123
1124 pub fn backend(&self) -> &B {
1125 &self.backend
1126 }
1127
1128 pub fn get_current_task_priority(&self) -> TaskPriority {
1129 CURRENT_TASK_STATE
1130 .try_with(|task_state| task_state.read().unwrap().priority)
1131 .unwrap_or(TaskPriority::initial())
1132 }
1133
1134 pub fn is_idle(&self) -> bool {
1135 self.currently_scheduled_foreground_jobs
1136 .load(Ordering::Acquire)
1137 == 0
1138 }
1139
1140 #[track_caller]
1141 pub fn schedule_backend_background_job(&self, job: B::BackendJob) {
1142 self.schedule_background_job(async move |this| {
1143 this.backend.run_backend_job(job, &*this).await;
1144 this
1145 })
1146 }
1147}
1148
1149struct TurboTasksExecutor;
1150
1151async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1156 match AssertUnwindSafe(f).catch_unwind().await {
1157 Ok(r) => r,
1158 Err(_) => {
1159 eprintln!(
1160 "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1161 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1162 https://github.com/vercel/next.js/discussions and include the panic message \
1163 and stack trace above.\n\nAborting."
1164 );
1165 abort();
1166 }
1167 }
1168}
1169
1170impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1171 type Future = impl Future<Output = ()> + Send + 'static;
1172
1173 fn execute(
1174 &self,
1175 this: &Arc<TurboTasks<B>>,
1176 scheduled_task: ScheduledTask,
1177 priority: TaskPriority,
1178 ) -> Self::Future {
1179 match scheduled_task {
1180 ScheduledTask::Task { task_id, span } => {
1181 let this2 = this.clone();
1182 let this = this.clone();
1183 let future = async move {
1184 abort_on_panic(async {
1185 let execution_id = this.execution_id_factory.wrapping_get();
1188 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1189 task_id,
1190 execution_id,
1191 priority,
1192 false, )));
1194 let single_execution_future = async {
1195 if this.stopped.load(Ordering::Acquire) {
1196 this.backend.task_execution_canceled(task_id, &*this);
1197 return None;
1198 }
1199
1200 let TaskExecutionSpec { future, span } = this
1201 .backend
1202 .try_start_task_execution(task_id, priority, &*this)?;
1203
1204 async {
1205 let result = CaptureFuture::new(future).await;
1206
1207 wait_for_local_tasks().await;
1209
1210 let result = match result {
1211 Ok(Ok(raw_vc)) => {
1212 raw_vc
1215 .to_non_local_unchecked_sync(&*this)
1216 .map_err(|err| err.into())
1217 }
1218 Ok(Err(err)) => Err(err.into()),
1219 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1220 };
1221
1222 let finished_state = this.finish_current_task_state();
1223 let cell_counters = CURRENT_TASK_STATE
1224 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1225 this.backend.task_execution_completed(
1226 task_id,
1227 result,
1228 &cell_counters,
1229 #[cfg(feature = "verify_determinism")]
1230 finished_state.stateful,
1231 finished_state.has_invalidator,
1232 &*this,
1233 )
1234 }
1235 .instrument(span)
1236 .await
1237 };
1238 if let Some(stale_priority) = CURRENT_TASK_STATE
1239 .scope(current_task_state, single_execution_future)
1240 .await
1241 {
1242 this.schedule(task_id, stale_priority);
1245 }
1246 this.finish_foreground_job();
1247 })
1248 .await
1249 };
1250
1251 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1252 }
1253 ScheduledTask::LocalTask {
1254 ty,
1255 persistence,
1256 local_task_id,
1257 global_task_state,
1258 span,
1259 } => {
1260 let this2 = this.clone();
1261 let this = this.clone();
1262 let task_type = ty.task_type;
1263 let future = async move {
1264 let span = match &ty.task_type {
1265 LocalTaskType::ResolveNative { native_fn } => {
1266 native_fn.resolve_span(priority)
1267 }
1268 LocalTaskType::ResolveTrait { trait_method } => {
1269 trait_method.resolve_span(priority)
1270 }
1271 };
1272 abort_on_panic(
1273 async move {
1274 let result = match ty.task_type {
1275 LocalTaskType::ResolveNative { native_fn } => {
1276 LocalTaskType::run_resolve_native(
1277 native_fn,
1278 ty.this,
1279 &*ty.arg,
1280 persistence,
1281 this,
1282 )
1283 .await
1284 }
1285 LocalTaskType::ResolveTrait { trait_method } => {
1286 LocalTaskType::run_resolve_trait(
1287 trait_method,
1288 ty.this.unwrap(),
1289 &*ty.arg,
1290 persistence,
1291 this,
1292 )
1293 .await
1294 }
1295 };
1296
1297 let output = match result {
1298 Ok(raw_vc) => OutputContent::Link(raw_vc),
1299 Err(err) => OutputContent::Error(
1300 TurboTasksExecutionError::from(err)
1301 .with_local_task_context(task_type.to_string()),
1302 ),
1303 };
1304
1305 CURRENT_TASK_STATE.with(move |gts| {
1306 gts.write()
1307 .unwrap()
1308 .local_tasks
1309 .complete(local_task_id, output);
1310 });
1311 }
1312 .instrument(span),
1313 )
1314 .await
1315 };
1316 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1317
1318 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1319 }
1320 }
1321 }
1322}
1323
1324struct FinishedTaskState {
1325 #[cfg(feature = "verify_determinism")]
1328 stateful: bool,
1329
1330 has_invalidator: bool,
1332}
1333
1334impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1335 fn dynamic_call(
1336 &self,
1337 native_fn: &'static NativeFunction,
1338 this: Option<RawVc>,
1339 arg: &mut dyn DynTaskInputsStorage,
1340 inputs_resolved: InputResolution,
1341 persistence: TaskPersistence,
1342 ) -> RawVc {
1343 self.dynamic_call(native_fn, this, arg, inputs_resolved, persistence)
1344 }
1345 fn native_call(
1346 &self,
1347 native_fn: &'static NativeFunction,
1348 this: Option<RawVc>,
1349 arg: &mut dyn DynTaskInputsStorage,
1350 persistence: TaskPersistence,
1351 ) -> RawVc {
1352 self.native_call(native_fn, this, arg, persistence)
1353 }
1354 fn trait_call(
1355 &self,
1356 trait_method: &'static TraitMethod,
1357 this: RawVc,
1358 arg: &mut dyn DynTaskInputsStorage,
1359 inputs_resolved: InputResolution,
1360 persistence: TaskPersistence,
1361 ) -> RawVc {
1362 self.trait_call(trait_method, this, arg, inputs_resolved, persistence)
1363 }
1364
1365 #[track_caller]
1366 fn run(
1367 &self,
1368 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1369 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1370 let this = self.pin();
1371 Box::pin(async move { this.run(future).await })
1372 }
1373
1374 #[track_caller]
1375 fn run_once(
1376 &self,
1377 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1378 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1379 let this = self.pin();
1380 Box::pin(async move { this.run_once(future).await })
1381 }
1382
1383 #[track_caller]
1384 fn run_once_with_reason(
1385 &self,
1386 reason: StaticOrArc<dyn InvalidationReason>,
1387 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1389 {
1390 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1391 reason_set.insert(reason);
1392 }
1393 let this = self.pin();
1394 Box::pin(async move { this.run_once(future).await })
1395 }
1396
1397 #[track_caller]
1398 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1399 self.start_once_process(future)
1400 }
1401
1402 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1403 if let Err(e) = self.compilation_events.send(event) {
1404 tracing::warn!("Failed to send compilation event: {e}");
1405 }
1406 }
1407
1408 fn get_task_name(&self, task: TaskId) -> String {
1409 self.backend.get_task_name(task, self)
1410 }
1411}
1412
1413impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1414 #[instrument(level = "info", skip_all, name = "invalidate")]
1415 fn invalidate(&self, task: TaskId) {
1416 self.backend.invalidate_task(task, self);
1417 }
1418
1419 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1420 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1421 {
1422 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1423 reason_set.insert(reason);
1424 }
1425 self.backend.invalidate_task(task, self);
1426 }
1427
1428 fn invalidate_serialization(&self, task: TaskId) {
1429 self.backend.invalidate_serialization(task, self);
1430 }
1431
1432 #[track_caller]
1433 fn try_read_task_output(
1434 &self,
1435 task: TaskId,
1436 options: ReadOutputOptions,
1437 ) -> Result<Result<RawVc, EventListener>> {
1438 if options.consistency == ReadConsistency::Eventual {
1439 debug_assert_not_in_top_level_task("read_task_output");
1440 }
1441 self.backend.try_read_task_output(
1442 task,
1443 current_task_if_available("reading Vcs"),
1444 options,
1445 self,
1446 )
1447 }
1448
1449 #[track_caller]
1450 fn try_read_task_cell(
1451 &self,
1452 task: TaskId,
1453 index: CellId,
1454 options: ReadCellOptions,
1455 ) -> Result<Result<TypedCellContent, EventListener>> {
1456 let reader = current_task_if_available("reading Vcs");
1457 self.backend
1458 .try_read_task_cell(task, index, reader, options, self)
1459 }
1460
1461 fn try_read_own_task_cell(
1462 &self,
1463 current_task: TaskId,
1464 index: CellId,
1465 ) -> Result<TypedCellContent> {
1466 self.backend
1467 .try_read_own_task_cell(current_task, index, self)
1468 }
1469
1470 #[track_caller]
1471 fn try_read_local_output(
1472 &self,
1473 execution_id: ExecutionId,
1474 local_task_id: LocalTaskId,
1475 ) -> Result<Result<RawVc, EventListener>> {
1476 debug_assert_not_in_top_level_task("read_local_output");
1477 CURRENT_TASK_STATE.with(|gts| {
1478 let gts_read = gts.read().unwrap();
1479
1480 gts_read.assert_execution_id(execution_id);
1485
1486 match gts_read.local_tasks.get(local_task_id) {
1487 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1488 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1489 }
1490 })
1491 }
1492
1493 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1494 self.backend.read_task_collectibles(
1497 task,
1498 trait_id,
1499 current_task_if_available("reading collectibles"),
1500 self,
1501 )
1502 }
1503
1504 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1505 self.backend.emit_collectible(
1506 trait_type,
1507 collectible,
1508 current_task("emitting collectible"),
1509 self,
1510 );
1511 }
1512
1513 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1514 self.backend.unemit_collectible(
1515 trait_type,
1516 collectible,
1517 count,
1518 current_task("emitting collectible"),
1519 self,
1520 );
1521 }
1522
1523 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1524 for (&collectible, &count) in collectibles {
1525 if count > 0 {
1526 self.backend.unemit_collectible(
1527 trait_type,
1528 collectible,
1529 count as u32,
1530 current_task("emitting collectible"),
1531 self,
1532 );
1533 }
1534 }
1535 }
1536
1537 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1538 self.try_read_own_task_cell(task, index)
1539 }
1540
1541 fn update_own_task_cell(
1542 &self,
1543 task: TaskId,
1544 index: CellId,
1545 content: CellContent,
1546 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1547 content_hash: Option<CellHash>,
1548 verification_mode: VerificationMode,
1549 ) {
1550 self.backend.update_task_cell(
1551 task,
1552 index,
1553 content,
1554 updated_key_hashes,
1555 content_hash,
1556 verification_mode,
1557 self,
1558 );
1559 }
1560
1561 fn connect_task(&self, task: TaskId) {
1562 self.backend
1563 .connect_task(task, current_task_if_available("connecting task"), self);
1564 }
1565
1566 fn mark_own_task_as_finished(&self, task: TaskId) {
1567 self.backend.mark_own_task_as_finished(task, self);
1568 }
1569
1570 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1573 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1576 global_task_state
1577 .write()
1578 .unwrap()
1579 .local_tasks
1580 .register_detached();
1581 let wrapped = async move {
1582 struct DropGuard;
1584 impl Drop for DropGuard {
1585 fn drop(&mut self) {
1586 CURRENT_TASK_STATE
1587 .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1588 }
1589 }
1590 let _guard = DropGuard;
1591 fut.await;
1592 };
1593 tokio::spawn(TURBO_TASKS.scope(
1594 turbo_tasks(),
1595 CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1596 ));
1597 }
1598
1599 fn task_statistics(&self) -> &TaskStatisticsApi {
1600 self.backend.task_statistics()
1601 }
1602
1603 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1604 let this = self.pin();
1605 Box::pin(async move {
1606 this.stop_and_wait().await;
1607 })
1608 }
1609
1610 fn subscribe_to_compilation_events(
1611 &self,
1612 event_types: Option<Vec<String>>,
1613 ) -> Receiver<Arc<dyn CompilationEvent>> {
1614 self.compilation_events.subscribe(event_types)
1615 }
1616
1617 fn is_tracking_dependencies(&self) -> bool {
1618 self.backend.is_tracking_dependencies()
1619 }
1620}
1621
1622async fn wait_for_local_tasks() {
1623 let listener =
1624 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1625 let Some(listener) = listener else {
1626 return;
1627 };
1628 listener.await;
1629}
1630
1631pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1632 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1633 Ok(id) => id,
1634 Err(_) => panic!(
1635 "{from} can only be used in the context of a turbo_tasks task execution or \
1636 turbo_tasks run"
1637 ),
1638 }
1639}
1640
1641pub(crate) fn current_task(from: &str) -> TaskId {
1642 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1643 Ok(Some(id)) => id,
1644 Ok(None) | Err(_) => {
1645 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1646 }
1647 }
1648}
1649
1650#[track_caller]
1653pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1654 if !cfg!(debug_assertions) {
1655 return;
1656 }
1657
1658 let in_top_level = CURRENT_TASK_STATE
1659 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1660 .unwrap_or(true);
1661 if !in_top_level {
1662 panic!("{message}");
1663 }
1664}
1665
1666#[track_caller]
1667pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1668 if !cfg!(debug_assertions) {
1669 return;
1670 }
1671
1672 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1675 .try_with(|&suppressed| suppressed)
1676 .unwrap_or(false);
1677 if suppressed {
1678 return;
1679 }
1680
1681 let in_top_level = CURRENT_TASK_STATE
1682 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1683 .unwrap_or(false);
1684 if in_top_level {
1685 panic!(
1686 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1687 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1688 reads to avoid leaking inconsistent return values."
1689 );
1690 }
1691}
1692
1693pub async fn run<T: Send + 'static>(
1694 tt: Arc<dyn TurboTasksApi>,
1695 future: impl Future<Output = Result<T>> + Send + 'static,
1696) -> Result<T> {
1697 let (tx, rx) = tokio::sync::oneshot::channel();
1698
1699 tt.run(Box::pin(async move {
1700 let result = future.await?;
1701 tx.send(result)
1702 .map_err(|_| anyhow!("unable to send result"))?;
1703 Ok(())
1704 }))
1705 .await?;
1706
1707 Ok(rx.await?)
1708}
1709
1710pub async fn run_once<T: Send + 'static>(
1711 tt: Arc<dyn TurboTasksApi>,
1712 future: impl Future<Output = Result<T>> + Send + 'static,
1713) -> Result<T> {
1714 let (tx, rx) = tokio::sync::oneshot::channel();
1715
1716 tt.run_once(Box::pin(async move {
1717 let result = future.await?;
1718 tx.send(result)
1719 .map_err(|_| anyhow!("unable to send result"))?;
1720 Ok(())
1721 }))
1722 .await?;
1723
1724 Ok(rx.await?)
1725}
1726
1727pub async fn run_once_with_reason<T: Send + 'static>(
1728 tt: Arc<dyn TurboTasksApi>,
1729 reason: impl InvalidationReason,
1730 future: impl Future<Output = Result<T>> + Send + 'static,
1731) -> Result<T> {
1732 let (tx, rx) = tokio::sync::oneshot::channel();
1733
1734 tt.run_once_with_reason(
1735 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1736 Box::pin(async move {
1737 let result = future.await?;
1738 tx.send(result)
1739 .map_err(|_| anyhow!("unable to send result"))?;
1740 Ok(())
1741 }),
1742 )
1743 .await?;
1744
1745 Ok(rx.await?)
1746}
1747
1748pub fn dynamic_call(
1750 func: &'static NativeFunction,
1751 this: Option<RawVc>,
1752 arg: &mut dyn DynTaskInputsStorage,
1753 inputs_resolved: InputResolution,
1754 persistence: TaskPersistence,
1755) -> RawVc {
1756 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, inputs_resolved, persistence))
1757}
1758
1759pub fn trait_call(
1761 trait_method: &'static TraitMethod,
1762 this: RawVc,
1763 arg: &mut dyn DynTaskInputsStorage,
1764 inputs_resolved: InputResolution,
1765 persistence: TaskPersistence,
1766) -> RawVc {
1767 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, inputs_resolved, persistence))
1768}
1769
1770pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1771 TURBO_TASKS.with(|arc| arc.clone())
1772}
1773
1774pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1775 TURBO_TASKS.with(Arc::downgrade)
1776}
1777
1778pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1779 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1780}
1781
1782pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1783 TURBO_TASKS.with(|arc| func(arc))
1784}
1785
1786pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1787 TURBO_TASKS.sync_scope(tt, f)
1788}
1789
1790pub fn turbo_tasks_future_scope<T>(
1791 tt: Arc<dyn TurboTasksApi>,
1792 f: impl Future<Output = T>,
1793) -> impl Future<Output = T> {
1794 TURBO_TASKS.scope(tt, f)
1795}
1796
1797pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1802 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1803}
1804
1805pub fn mark_finished() {
1808 with_turbo_tasks(|tt| {
1809 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1810 });
1811}
1812
1813pub fn get_serialization_invalidator() -> SerializationInvalidator {
1819 CURRENT_TASK_STATE.with(|cell| {
1820 let CurrentTaskState {
1821 task_id,
1822 #[cfg(feature = "verify_determinism")]
1823 stateful,
1824 ..
1825 } = &mut *cell.write().unwrap();
1826 #[cfg(feature = "verify_determinism")]
1827 {
1828 *stateful = true;
1829 }
1830 let Some(task_id) = *task_id else {
1831 panic!(
1832 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1833 task execution"
1834 );
1835 };
1836 SerializationInvalidator::new(task_id)
1837 })
1838}
1839
1840pub fn mark_invalidator() {
1841 CURRENT_TASK_STATE.with(|cell| {
1842 let CurrentTaskState {
1843 has_invalidator, ..
1844 } = &mut *cell.write().unwrap();
1845 *has_invalidator = true;
1846 })
1847}
1848
1849pub fn mark_stateful() {
1855 #[cfg(feature = "verify_determinism")]
1856 {
1857 CURRENT_TASK_STATE.with(|cell| {
1858 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1859 *stateful = true;
1860 })
1861 }
1862 }
1864
1865pub fn mark_top_level_task() {
1869 if cfg!(debug_assertions) {
1870 CURRENT_TASK_STATE.with(|cell| {
1871 cell.write().unwrap().in_top_level_task = true;
1872 })
1873 }
1874}
1875
1876pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1887 if cfg!(debug_assertions) {
1888 CURRENT_TASK_STATE.with(|cell| {
1889 cell.write().unwrap().in_top_level_task = false;
1890 })
1891 }
1892}
1893
1894pub fn prevent_gc() {
1895 }
1897
1898pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1899 with_turbo_tasks(|tt| {
1900 let raw_vc = collectible.node.node;
1901 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1902 })
1903}
1904
1905pub(crate) async fn read_task_output(
1906 this: &dyn TurboTasksApi,
1907 id: TaskId,
1908 options: ReadOutputOptions,
1909) -> Result<RawVc> {
1910 loop {
1911 match this.try_read_task_output(id, options)? {
1912 Ok(result) => return Ok(result),
1913 Err(listener) => listener.await,
1914 }
1915 }
1916}
1917
1918#[derive(Clone, Copy)]
1924pub struct CurrentCellRef {
1925 current_task: TaskId,
1926 index: CellId,
1927}
1928
1929type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
1930
1931impl CurrentCellRef {
1932 fn conditional_update<T>(
1934 &self,
1935 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
1936 ) where
1937 T: VcValueType,
1938 {
1939 self.conditional_update_with_shared_reference(|old_shared_reference| {
1940 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
1941 let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
1942 Some((
1943 SharedReference::new(triomphe::Arc::new(new_value)),
1944 updated_key_hashes,
1945 content_hash,
1946 ))
1947 })
1948 }
1949
1950 fn conditional_update_with_shared_reference(
1952 &self,
1953 functor: impl FnOnce(
1954 Option<&SharedReference>,
1955 ) -> Option<(
1956 SharedReference,
1957 Option<SmallVec<[u64; 2]>>,
1958 Option<CellHash>,
1959 )>,
1960 ) {
1961 let tt = turbo_tasks();
1962 let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
1963 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1964 if let Some((update, updated_key_hashes, content_hash)) = update {
1965 tt.update_own_task_cell(
1966 self.current_task,
1967 self.index,
1968 CellContent(Some(update)),
1969 updated_key_hashes,
1970 content_hash,
1971 VerificationMode::EqualityCheck,
1972 )
1973 }
1974 }
1975
1976 pub fn compare_and_update<T>(&self, new_value: T)
2010 where
2011 T: PartialEq + VcValueType,
2012 {
2013 self.conditional_update(|old_value| {
2014 if let Some(old_value) = old_value
2015 && old_value == &new_value
2016 {
2017 return None;
2018 }
2019 Some((new_value, None, None))
2020 });
2021 }
2022
2023 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2031 where
2032 T: VcValueType + PartialEq,
2033 {
2034 self.conditional_update_with_shared_reference(|old_sr| {
2035 if let Some(old_sr) = old_sr {
2036 let old_value = extract_sr_value::<T>(old_sr);
2037 let new_value = extract_sr_value::<T>(&new_shared_reference);
2038 if old_value == new_value {
2039 return None;
2040 }
2041 }
2042 Some((new_shared_reference, None, None))
2043 });
2044 }
2045
2046 pub fn hashed_compare_and_update<T>(&self, new_value: T)
2055 where
2056 T: PartialEq + DeterministicHash + VcValueType,
2057 {
2058 self.conditional_update(|old_value| {
2059 if let Some(old_value) = old_value
2060 && old_value == &new_value
2061 {
2062 return None;
2063 }
2064 let content_hash = hash_xxh3_hash128(&new_value);
2065 Some((new_value, None, Some(content_hash)))
2066 });
2067 }
2068
2069 pub fn hashed_compare_and_update_with_shared_reference<T>(
2075 &self,
2076 new_shared_reference: SharedReference,
2077 ) where
2078 T: VcValueType + PartialEq + DeterministicHash,
2079 {
2080 self.conditional_update_with_shared_reference(move |old_sr| {
2081 if let Some(old_sr) = old_sr {
2082 let old_value = extract_sr_value::<T>(old_sr);
2083 let new_value = extract_sr_value::<T>(&new_shared_reference);
2084 if old_value == new_value {
2085 return None;
2086 }
2087 }
2088 let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2089 Some((new_shared_reference, None, Some(content_hash)))
2090 });
2091 }
2092
2093 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2095 where
2096 T: PartialEq + VcValueType,
2097 VcReadTarget<T>: KeyedEq,
2098 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2099 {
2100 self.conditional_update(|old_value| {
2101 let Some(old_value) = old_value else {
2102 return Some((new_value, None, None));
2103 };
2104 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2105 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2106 let updated_keys = old_value.different_keys(new_value_ref);
2107 if updated_keys.is_empty() {
2108 return None;
2109 }
2110 let updated_key_hashes = updated_keys
2112 .into_iter()
2113 .map(|key| FxBuildHasher.hash_one(key))
2114 .collect();
2115 Some((new_value, Some(updated_key_hashes), None))
2116 });
2117 }
2118
2119 pub fn keyed_compare_and_update_with_shared_reference<T>(
2122 &self,
2123 new_shared_reference: SharedReference,
2124 ) where
2125 T: VcValueType + PartialEq,
2126 VcReadTarget<T>: KeyedEq,
2127 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2128 {
2129 self.conditional_update_with_shared_reference(|old_sr| {
2130 let Some(old_sr) = old_sr else {
2131 return Some((new_shared_reference, None, None));
2132 };
2133 let old_value = extract_sr_value::<T>(old_sr);
2134 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2135 let new_value = extract_sr_value::<T>(&new_shared_reference);
2136 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2137 let updated_keys = old_value.different_keys(new_value);
2138 if updated_keys.is_empty() {
2139 return None;
2140 }
2141 let updated_key_hashes = updated_keys
2143 .into_iter()
2144 .map(|key| FxBuildHasher.hash_one(key))
2145 .collect();
2146 Some((new_shared_reference, Some(updated_key_hashes), None))
2147 });
2148 }
2149
2150 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2152 where
2153 T: VcValueType,
2154 {
2155 let tt = turbo_tasks();
2156 tt.update_own_task_cell(
2157 self.current_task,
2158 self.index,
2159 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2160 None,
2161 None,
2162 verification_mode,
2163 )
2164 }
2165
2166 pub fn update_with_shared_reference(
2174 &self,
2175 shared_ref: SharedReference,
2176 verification_mode: VerificationMode,
2177 ) {
2178 let tt = turbo_tasks();
2179 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2180 let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2181 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2182 shared_ref_exp != shared_ref
2184 } else {
2185 true
2186 }
2187 } else {
2188 true
2189 };
2190 if update {
2191 tt.update_own_task_cell(
2192 self.current_task,
2193 self.index,
2194 CellContent(Some(shared_ref)),
2195 None,
2196 None,
2197 verification_mode,
2198 )
2199 }
2200 }
2201}
2202
2203impl From<CurrentCellRef> for RawVc {
2204 fn from(cell: CurrentCellRef) -> Self {
2205 RawVc::TaskCell(cell.current_task, cell.index)
2206 }
2207}
2208
2209fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2210 sr.0.downcast_ref::<T>()
2211 .expect("cannot update SharedReference of different type")
2212}
2213
2214pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2215 find_cell_by_id(T::get_value_type_id())
2216}
2217
2218pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2219 CURRENT_TASK_STATE.with(|ts| {
2220 let current_task = current_task("celling turbo_tasks values");
2221 let mut ts = ts.write().unwrap();
2222 let map = ts.cell_counters.as_mut().unwrap();
2223 let current_index = map.entry(ty).or_default();
2224 let index = *current_index;
2225 *current_index += 1;
2226 CurrentCellRef {
2227 current_task,
2228 index: CellId { type_id: ty, index },
2229 }
2230 })
2231}
2232
2233pub(crate) async fn read_local_output(
2234 this: &dyn TurboTasksApi,
2235 execution_id: ExecutionId,
2236 local_task_id: LocalTaskId,
2237) -> Result<RawVc> {
2238 loop {
2239 match this.try_read_local_output(execution_id, local_task_id)? {
2240 Ok(raw_vc) => return Ok(raw_vc),
2241 Err(event_listener) => event_listener.await,
2242 }
2243 }
2244}