1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 panic::AssertUnwindSafe,
8 pin::Pin,
9 process::abort,
10 sync::{
11 Arc, Mutex, RwLock, Weak,
12 atomic::{AtomicBool, AtomicUsize, Ordering},
13 },
14 time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32 VcValueTrait, VcValueType,
33 backend::{
34 Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35 TurboTasksExecutionError, TypedCellContent, VerificationMode,
36 },
37 capture_future::CaptureFuture,
38 dyn_task_inputs::StackDynTaskInputs,
39 event::{Event, EventListener},
40 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41 id_factory::IdFactoryWithReuse,
42 keyed::KeyedEq,
43 local_task_tracker::LocalTaskTracker,
44 macro_helpers::NativeFunction,
45 message_queue::{CompilationEvent, CompilationEventQueue},
46 priority_runner::{Executor, PriorityRunner},
47 raw_vc::{CellId, RawVc},
48 registry,
49 serialization_invalidation::SerializationInvalidator,
50 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51 task_statistics::TaskStatisticsApi,
52 trace::TraceRawVcs,
53 util::{IdFactory, StaticOrArc},
54};
55
56pub trait TurboTasksCallApi: Sync + Send {
59 fn dynamic_call(
62 &self,
63 native_fn: &'static NativeFunction,
64 this: Option<RawVc>,
65 arg: &mut dyn StackDynTaskInputs,
66 persistence: TaskPersistence,
67 ) -> RawVc;
68 fn native_call(
71 &self,
72 native_fn: &'static NativeFunction,
73 this: Option<RawVc>,
74 arg: &mut dyn StackDynTaskInputs,
75 persistence: TaskPersistence,
76 ) -> RawVc;
77 fn trait_call(
80 &self,
81 trait_method: &'static TraitMethod,
82 this: RawVc,
83 arg: &mut dyn StackDynTaskInputs,
84 persistence: TaskPersistence,
85 ) -> RawVc;
86
87 fn run(
88 &self,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91 fn run_once(
92 &self,
93 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95 fn run_once_with_reason(
96 &self,
97 reason: StaticOrArc<dyn InvalidationReason>,
98 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105 fn get_task_name(&self, task: TaskId) -> String;
107}
108
109pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115 fn invalidate(&self, task: TaskId);
116 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118 fn invalidate_serialization(&self, task: TaskId);
119
120 fn try_read_task_output(
121 &self,
122 task: TaskId,
123 options: ReadOutputOptions,
124 ) -> Result<Result<RawVc, EventListener>>;
125
126 fn try_read_task_cell(
127 &self,
128 task: TaskId,
129 index: CellId,
130 options: ReadCellOptions,
131 ) -> Result<Result<TypedCellContent, EventListener>>;
132
133 fn try_read_local_output(
148 &self,
149 execution_id: ExecutionId,
150 local_task_id: LocalTaskId,
151 ) -> Result<Result<RawVc, EventListener>>;
152
153 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159 fn try_read_own_task_cell(
162 &self,
163 current_task: TaskId,
164 index: CellId,
165 ) -> Result<TypedCellContent>;
166
167 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168 fn update_own_task_cell(
169 &self,
170 task: TaskId,
171 index: CellId,
172 content: CellContent,
173 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174 content_hash: Option<CellHash>,
175 verification_mode: VerificationMode,
176 );
177 fn mark_own_task_as_finished(&self, task: TaskId);
178
179 fn connect_task(&self, task: TaskId);
180
181 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
186
187 fn task_statistics(&self) -> &TaskStatisticsApi;
188
189 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
190
191 fn subscribe_to_compilation_events(
192 &self,
193 event_types: Option<Vec<String>>,
194 ) -> Receiver<Arc<dyn CompilationEvent>>;
195
196 fn is_tracking_dependencies(&self) -> bool;
198}
199
200pub struct Unused<T> {
202 inner: T,
203}
204
205impl<T> Unused<T> {
206 pub unsafe fn new_unchecked(inner: T) -> Self {
212 Self { inner }
213 }
214
215 pub unsafe fn get_unchecked(&self) -> &T {
221 &self.inner
222 }
223
224 pub fn into(self) -> T {
226 self.inner
227 }
228}
229
230pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
232 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
233
234 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
235 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
236 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
240 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
244
245 fn schedule(&self, task: TaskId, priority: TaskPriority);
247
248 fn get_current_task_priority(&self) -> TaskPriority;
250
251 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
253
254 fn schedule_backend_background_job(&self, job: B::BackendJob);
259
260 fn program_duration_until(&self, instant: Instant) -> Duration;
262
263 fn is_idle(&self) -> bool;
265
266 fn backend(&self) -> &B;
268}
269
270#[allow(clippy::manual_non_exhaustive)]
271pub struct UpdateInfo {
272 pub duration: Duration,
273 pub tasks: usize,
274 pub reasons: InvalidationReasonSet,
275 #[allow(dead_code)]
276 placeholder_for_future_fields: (),
277}
278
279#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
280pub enum TaskPersistence {
281 Persistent,
283
284 Transient,
291}
292
293impl Display for TaskPersistence {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 match self {
296 TaskPersistence::Persistent => write!(f, "persistent"),
297 TaskPersistence::Transient => write!(f, "transient"),
298 }
299 }
300}
301
302#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
303pub enum ReadConsistency {
304 #[default]
307 Eventual,
308 Strong,
313}
314
315#[derive(Clone, Copy, Debug, Eq, PartialEq)]
316pub enum ReadCellTracking {
317 Tracked {
319 key: Option<u64>,
321 },
322 TrackOnlyError,
327 Untracked,
332}
333
334impl ReadCellTracking {
335 pub fn should_track(&self, is_err: bool) -> bool {
336 match self {
337 ReadCellTracking::Tracked { .. } => true,
338 ReadCellTracking::TrackOnlyError => is_err,
339 ReadCellTracking::Untracked => false,
340 }
341 }
342
343 pub fn key(&self) -> Option<u64> {
344 match self {
345 ReadCellTracking::Tracked { key } => *key,
346 ReadCellTracking::TrackOnlyError => None,
347 ReadCellTracking::Untracked => None,
348 }
349 }
350}
351
352impl Default for ReadCellTracking {
353 fn default() -> Self {
354 ReadCellTracking::Tracked { key: None }
355 }
356}
357
358impl Display for ReadCellTracking {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 match self {
361 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
362 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
363 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
364 ReadCellTracking::Untracked => write!(f, "untracked"),
365 }
366 }
367}
368
369#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
370pub enum ReadTracking {
371 #[default]
373 Tracked,
374 TrackOnlyError,
379 Untracked,
384}
385
386impl ReadTracking {
387 pub fn should_track(&self, is_err: bool) -> bool {
388 match self {
389 ReadTracking::Tracked => true,
390 ReadTracking::TrackOnlyError => is_err,
391 ReadTracking::Untracked => false,
392 }
393 }
394}
395
396impl Display for ReadTracking {
397 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398 match self {
399 ReadTracking::Tracked => write!(f, "tracked"),
400 ReadTracking::TrackOnlyError => write!(f, "track only error"),
401 ReadTracking::Untracked => write!(f, "untracked"),
402 }
403 }
404}
405
406#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
407pub enum TaskPriority {
408 #[default]
409 Initial,
410 Invalidation {
411 priority: Reverse<u32>,
412 },
413}
414
415impl TaskPriority {
416 pub fn invalidation(priority: u32) -> Self {
417 Self::Invalidation {
418 priority: Reverse(priority),
419 }
420 }
421
422 pub fn initial() -> Self {
423 Self::Initial
424 }
425
426 pub fn leaf() -> Self {
427 Self::Invalidation {
428 priority: Reverse(0),
429 }
430 }
431
432 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
433 match self {
434 TaskPriority::Initial => parent_priority,
435 TaskPriority::Invalidation { priority } => {
436 if let TaskPriority::Invalidation {
437 priority: parent_priority,
438 } = parent_priority
439 && priority.0 < parent_priority.0
440 {
441 Self::Invalidation {
442 priority: Reverse(parent_priority.0.saturating_add(1)),
443 }
444 } else {
445 *self
446 }
447 }
448 }
449 }
450}
451
452impl Display for TaskPriority {
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 match self {
455 TaskPriority::Initial => write!(f, "initial"),
456 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
457 }
458 }
459}
460
461enum ScheduledTask {
462 Task {
463 task_id: TaskId,
464 span: Span,
465 },
466 LocalTask {
467 ty: LocalTaskSpec,
468 persistence: TaskPersistence,
469 local_task_id: LocalTaskId,
470 global_task_state: Arc<RwLock<CurrentTaskState>>,
471 span: Span,
472 },
473}
474
475pub struct TurboTasks<B: Backend + 'static> {
476 this: Weak<Self>,
477 backend: B,
478 task_id_factory: IdFactoryWithReuse<TaskId>,
479 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
480 execution_id_factory: IdFactory<ExecutionId>,
481 stopped: AtomicBool,
482 currently_scheduled_foreground_jobs: AtomicUsize,
483 currently_scheduled_background_jobs: AtomicUsize,
484 scheduled_tasks: AtomicUsize,
485 priority_runner:
486 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
487 start: Mutex<Option<Instant>>,
488 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
489 event_foreground_start: Event,
491 event_foreground_done: Event,
494 event_background_done: Event,
496 program_start: Instant,
497 compilation_events: CompilationEventQueue,
498}
499
500struct CurrentTaskState {
509 task_id: Option<TaskId>,
510 execution_id: ExecutionId,
511 priority: TaskPriority,
512
513 #[cfg(feature = "verify_determinism")]
516 stateful: bool,
517
518 has_invalidator: bool,
520
521 in_top_level_task: bool,
524
525 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
530
531 local_tasks: LocalTaskTracker,
534}
535
536impl CurrentTaskState {
537 fn new(
538 task_id: TaskId,
539 execution_id: ExecutionId,
540 priority: TaskPriority,
541 in_top_level_task: bool,
542 ) -> Self {
543 Self {
544 task_id: Some(task_id),
545 execution_id,
546 priority,
547 #[cfg(feature = "verify_determinism")]
548 stateful: false,
549 has_invalidator: false,
550 in_top_level_task,
551 cell_counters: Some(AutoMap::default()),
552 local_tasks: LocalTaskTracker::new(),
553 }
554 }
555
556 fn new_temporary(
557 execution_id: ExecutionId,
558 priority: TaskPriority,
559 in_top_level_task: bool,
560 ) -> Self {
561 Self {
562 task_id: None,
563 execution_id,
564 priority,
565 #[cfg(feature = "verify_determinism")]
566 stateful: false,
567 has_invalidator: false,
568 in_top_level_task,
569 cell_counters: None,
570 local_tasks: LocalTaskTracker::new(),
571 }
572 }
573
574 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
575 if self.execution_id != expected_execution_id {
576 panic!(
577 "Local tasks can only be scheduled/awaited within the same execution of the \
578 parent task that created them"
579 );
580 }
581 }
582}
583
584task_local! {
586 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
588
589 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
590
591 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
596}
597
598impl<B: Backend + 'static> TurboTasks<B> {
599 pub fn new(backend: B) -> Arc<Self> {
605 let task_id_factory = IdFactoryWithReuse::new(
606 TaskId::MIN,
607 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
608 );
609 let transient_task_id_factory =
610 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
611 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
612 let this = Arc::new_cyclic(|this| Self {
613 this: this.clone(),
614 backend,
615 task_id_factory,
616 transient_task_id_factory,
617 execution_id_factory,
618 stopped: AtomicBool::new(false),
619 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
620 currently_scheduled_background_jobs: AtomicUsize::new(0),
621 scheduled_tasks: AtomicUsize::new(0),
622 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
623 start: Default::default(),
624 aggregated_update: Default::default(),
625 event_foreground_done: Event::new(|| {
626 || "TurboTasks::event_foreground_done".to_string()
627 }),
628 event_foreground_start: Event::new(|| {
629 || "TurboTasks::event_foreground_start".to_string()
630 }),
631 event_background_done: Event::new(|| {
632 || "TurboTasks::event_background_done".to_string()
633 }),
634 program_start: Instant::now(),
635 compilation_events: CompilationEventQueue::default(),
636 });
637 this.backend.startup(&*this);
638 this
639 }
640
641 pub fn pin(&self) -> Arc<Self> {
642 self.this.upgrade().unwrap()
643 }
644
645 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
647 where
648 T: ?Sized,
649 F: Fn() -> Fut + Send + Sync + Clone + 'static,
650 Fut: Future<Output = Result<Vc<T>>> + Send,
651 {
652 let id = self.backend.create_transient_task(
653 TransientTaskType::Root(Box::new(move || {
654 let functor = functor.clone();
655 Box::pin(async move {
656 mark_top_level_task();
657 let raw_vc = functor().await?.node;
658 raw_vc.to_non_local().await
659 })
660 })),
661 self,
662 );
663 self.schedule(id, TaskPriority::initial());
664 id
665 }
666
667 pub fn dispose_root_task(&self, task_id: TaskId) {
668 self.backend.dispose_root_task(task_id, self);
669 }
670
671 #[track_caller]
675 fn spawn_once_task<T, Fut>(&self, future: Fut)
676 where
677 T: ?Sized,
678 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
679 {
680 let id = self.backend.create_transient_task(
681 TransientTaskType::Once(Box::pin(async move {
682 mark_top_level_task();
683 let raw_vc = future.await?.node;
684 raw_vc.to_non_local().await
685 })),
686 self,
687 );
688 self.schedule(id, TaskPriority::initial());
689 }
690
691 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
692 &self,
693 future: impl Future<Output = Result<T>> + Send + 'static,
694 ) -> Result<T> {
695 let (tx, rx) = tokio::sync::oneshot::channel();
696 self.spawn_once_task(async move {
697 mark_top_level_task();
698 let result = future.await;
699 tx.send(result)
700 .map_err(|_| anyhow!("unable to send result"))?;
701 Ok(Completion::new())
702 });
703
704 rx.await?
705 }
706
707 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
708 pub async fn run<T: TraceRawVcs + Send + 'static>(
709 &self,
710 future: impl Future<Output = Result<T>> + Send + 'static,
711 ) -> Result<T, TurboTasksExecutionError> {
712 self.begin_foreground_job();
713 let execution_id = self.execution_id_factory.wrapping_get();
715 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
716 execution_id,
717 TaskPriority::initial(),
718 true, )));
720
721 let result = TURBO_TASKS
722 .scope(
723 self.pin(),
724 CURRENT_TASK_STATE.scope(current_task_state, async {
725 let result = CaptureFuture::new(future).await;
726
727 wait_for_local_tasks().await;
729
730 match result {
731 Ok(Ok(value)) => Ok(value),
732 Ok(Err(err)) => Err(err.into()),
733 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
734 }
735 }),
736 )
737 .await;
738 self.finish_foreground_job();
739 result
740 }
741
742 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
743 let this = self.pin();
744 tokio::spawn(async move {
745 this.pin()
746 .run_once(async move {
747 this.finish_foreground_job();
748 future.await;
749 this.begin_foreground_job();
750 Ok(())
751 })
752 .await
753 .unwrap()
754 });
755 }
756
757 pub(crate) fn native_call(
758 &self,
759 native_fn: &'static NativeFunction,
760 this: Option<RawVc>,
761 arg: &mut dyn StackDynTaskInputs,
762 persistence: TaskPersistence,
763 ) -> RawVc {
764 RawVc::TaskOutput(self.backend.get_or_create_task(
765 native_fn,
766 this,
767 arg,
768 current_task_if_available("turbo_function calls"),
769 persistence,
770 self,
771 ))
772 }
773
774 pub fn dynamic_call(
775 &self,
776 native_fn: &'static NativeFunction,
777 this: Option<RawVc>,
778 arg: &mut dyn StackDynTaskInputs,
779 persistence: TaskPersistence,
780 ) -> RawVc {
781 if this.is_none_or(|this| this.is_resolved())
782 && native_fn.arg_meta.is_resolved(arg.as_ref())
783 {
784 return self.native_call(native_fn, this, arg, persistence);
785 }
786 let arg = arg.take_box();
788 let task_type = LocalTaskSpec {
789 task_type: LocalTaskType::ResolveNative { native_fn },
790 this,
791 arg,
792 };
793 self.schedule_local_task(task_type, persistence)
794 }
795
796 pub fn trait_call(
797 &self,
798 trait_method: &'static TraitMethod,
799 this: RawVc,
800 arg: &mut dyn StackDynTaskInputs,
801 persistence: TaskPersistence,
802 ) -> RawVc {
803 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
807 match registry::get_value_type(type_id).get_trait_method(trait_method) {
808 Some(native_fn) => {
809 if let Some(filter) = native_fn.arg_meta.filter_owned {
810 let mut arg = (filter)(arg);
811 return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
812 } else {
813 return self.dynamic_call(native_fn, Some(this), arg, persistence);
814 }
815 }
816 None => {
817 }
821 }
822 }
823
824 let task_type = LocalTaskSpec {
826 task_type: LocalTaskType::ResolveTrait { trait_method },
827 this: Some(this),
828 arg: arg.take_box(),
829 };
830
831 self.schedule_local_task(task_type, persistence)
832 }
833
834 #[track_caller]
835 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
836 self.begin_foreground_job();
837 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
838
839 self.priority_runner.schedule(
840 &self.pin(),
841 ScheduledTask::Task {
842 task_id,
843 span: Span::current(),
844 },
845 priority,
846 );
847 }
848
849 fn schedule_local_task(
850 &self,
851 ty: LocalTaskSpec,
852 persistence: TaskPersistence,
854 ) -> RawVc {
855 let task_type = ty.task_type;
856 let (global_task_state, execution_id, priority, local_task_id) =
857 CURRENT_TASK_STATE.with(|gts| {
858 let mut gts_write = gts.write().unwrap();
859 let local_task_id = gts_write.local_tasks.create(task_type);
860 (
861 Arc::clone(gts),
862 gts_write.execution_id,
863 gts_write.priority,
864 local_task_id,
865 )
866 });
867
868 self.priority_runner.schedule(
869 &self.pin(),
870 ScheduledTask::LocalTask {
871 ty,
872 persistence,
873 local_task_id,
874 global_task_state,
875 span: Span::current(),
876 },
877 priority,
878 );
879
880 RawVc::LocalOutput(execution_id, local_task_id, persistence)
881 }
882
883 fn begin_foreground_job(&self) {
884 if self
885 .currently_scheduled_foreground_jobs
886 .fetch_add(1, Ordering::AcqRel)
887 == 0
888 {
889 *self.start.lock().unwrap() = Some(Instant::now());
890 self.event_foreground_start.notify(usize::MAX);
891 self.backend.idle_end(self);
892 }
893 }
894
895 fn finish_foreground_job(&self) {
896 if self
897 .currently_scheduled_foreground_jobs
898 .fetch_sub(1, Ordering::AcqRel)
899 == 1
900 {
901 self.backend.idle_start(self);
902 let total = self.scheduled_tasks.load(Ordering::Acquire);
905 self.scheduled_tasks.store(0, Ordering::Release);
906 if let Some(start) = *self.start.lock().unwrap() {
907 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
908 if let Some(update) = update.as_mut() {
909 update.0 += start.elapsed();
910 update.1 += total;
911 } else {
912 *update = Some((start.elapsed(), total));
913 }
914 }
915 self.event_foreground_done.notify(usize::MAX);
916 }
917 }
918
919 fn begin_background_job(&self) {
920 self.currently_scheduled_background_jobs
921 .fetch_add(1, Ordering::Relaxed);
922 }
923
924 fn finish_background_job(&self) {
925 if self
926 .currently_scheduled_background_jobs
927 .fetch_sub(1, Ordering::Relaxed)
928 == 1
929 {
930 self.event_background_done.notify(usize::MAX);
931 }
932 }
933
934 pub fn get_in_progress_count(&self) -> usize {
935 self.currently_scheduled_foreground_jobs
936 .load(Ordering::Acquire)
937 }
938
939 pub async fn wait_task_completion(
951 &self,
952 id: TaskId,
953 consistency: ReadConsistency,
954 ) -> Result<()> {
955 read_task_output(
956 self,
957 id,
958 ReadOutputOptions {
959 tracking: ReadTracking::Untracked,
961 consistency,
962 },
963 )
964 .await?;
965 Ok(())
966 }
967
968 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
971 self.aggregated_update_info(aggregation, Duration::MAX)
972 .await
973 .unwrap()
974 }
975
976 pub async fn aggregated_update_info(
980 &self,
981 aggregation: Duration,
982 timeout: Duration,
983 ) -> Option<UpdateInfo> {
984 let listener = self
985 .event_foreground_done
986 .listen_with_note(|| || "wait for update info".to_string());
987 let wait_for_finish = {
988 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
989 if aggregation.is_zero() {
990 if let Some((duration, tasks)) = update.take() {
991 return Some(UpdateInfo {
992 duration,
993 tasks,
994 reasons: take(reason_set),
995 placeholder_for_future_fields: (),
996 });
997 } else {
998 true
999 }
1000 } else {
1001 update.is_none()
1002 }
1003 };
1004 if wait_for_finish {
1005 if timeout == Duration::MAX {
1006 listener.await;
1008 } else {
1009 let start_listener = self
1011 .event_foreground_start
1012 .listen_with_note(|| || "wait for update info".to_string());
1013 if self
1014 .currently_scheduled_foreground_jobs
1015 .load(Ordering::Acquire)
1016 == 0
1017 {
1018 start_listener.await;
1019 } else {
1020 drop(start_listener);
1021 }
1022 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1023 return None;
1025 }
1026 }
1027 }
1028 if !aggregation.is_zero() {
1029 loop {
1030 select! {
1031 () = tokio::time::sleep(aggregation) => {
1032 break;
1033 }
1034 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1035 }
1037 }
1038 }
1039 }
1040 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1041 if let Some((duration, tasks)) = update.take() {
1042 Some(UpdateInfo {
1043 duration,
1044 tasks,
1045 reasons: take(reason_set),
1046 placeholder_for_future_fields: (),
1047 })
1048 } else {
1049 panic!("aggregated_update_info must not called concurrently")
1050 }
1051 }
1052
1053 pub async fn wait_background_done(&self) {
1054 let listener = self.event_background_done.listen();
1055 if self
1056 .currently_scheduled_background_jobs
1057 .load(Ordering::Acquire)
1058 != 0
1059 {
1060 listener.await;
1061 }
1062 }
1063
1064 pub async fn stop_and_wait(&self) {
1065 turbo_tasks_future_scope(self.pin(), async move {
1066 self.backend.stopping(self);
1067 self.stopped.store(true, Ordering::Release);
1068 {
1069 let listener = self
1070 .event_foreground_done
1071 .listen_with_note(|| || "wait for stop".to_string());
1072 if self
1073 .currently_scheduled_foreground_jobs
1074 .load(Ordering::Acquire)
1075 != 0
1076 {
1077 listener.await;
1078 }
1079 }
1080 {
1081 let listener = self.event_background_done.listen();
1082 if self
1083 .currently_scheduled_background_jobs
1084 .load(Ordering::Acquire)
1085 != 0
1086 {
1087 listener.await;
1088 }
1089 }
1090 self.backend.stop(self);
1091 })
1092 .await;
1093 }
1094
1095 #[track_caller]
1096 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1097 where
1098 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1099 T::CallOnceFuture: Send,
1100 {
1101 let mut this = self.pin();
1102 this.begin_foreground_job();
1103 tokio::spawn(
1104 TURBO_TASKS
1105 .scope(this.clone(), async move {
1106 if !this.stopped.load(Ordering::Acquire) {
1107 this = func(this.clone()).await;
1108 }
1109 this.finish_foreground_job();
1110 })
1111 .in_current_span(),
1112 );
1113 }
1114
1115 #[track_caller]
1116 pub(crate) fn schedule_background_job<T>(&self, func: T)
1117 where
1118 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1119 T::CallOnceFuture: Send,
1120 {
1121 let mut this = self.pin();
1122 self.begin_background_job();
1123 tokio::spawn(
1124 TURBO_TASKS
1125 .scope(this.clone(), async move {
1126 if !this.stopped.load(Ordering::Acquire) {
1127 this = func(this).await;
1128 }
1129 this.finish_background_job();
1130 })
1131 .in_current_span(),
1132 );
1133 }
1134
1135 fn finish_current_task_state(&self) -> FinishedTaskState {
1136 CURRENT_TASK_STATE.with(|cell| {
1137 let current_task_state = &*cell.write().unwrap();
1138 FinishedTaskState {
1139 #[cfg(feature = "verify_determinism")]
1140 stateful: current_task_state.stateful,
1141 has_invalidator: current_task_state.has_invalidator,
1142 }
1143 })
1144 }
1145
1146 pub fn backend(&self) -> &B {
1147 &self.backend
1148 }
1149}
1150
1151struct TurboTasksExecutor;
1152
1153async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1158 match AssertUnwindSafe(f).catch_unwind().await {
1159 Ok(r) => r,
1160 Err(_) => {
1161 eprintln!(
1162 "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1163 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1164 https://github.com/vercel/next.js/discussions and include the panic message \
1165 and stack trace above.\n\nAborting."
1166 );
1167 abort();
1168 }
1169 }
1170}
1171
1172impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1173 type Future = impl Future<Output = ()> + Send + 'static;
1174
1175 fn execute(
1176 &self,
1177 this: &Arc<TurboTasks<B>>,
1178 scheduled_task: ScheduledTask,
1179 priority: TaskPriority,
1180 ) -> Self::Future {
1181 match scheduled_task {
1182 ScheduledTask::Task { task_id, span } => {
1183 let this2 = this.clone();
1184 let this = this.clone();
1185 let future = async move {
1186 abort_on_panic(async {
1187 let execution_id = this.execution_id_factory.wrapping_get();
1190 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1191 task_id,
1192 execution_id,
1193 priority,
1194 false, )));
1196 let single_execution_future = async {
1197 if this.stopped.load(Ordering::Acquire) {
1198 this.backend.task_execution_canceled(task_id, &*this);
1199 return None;
1200 }
1201
1202 let TaskExecutionSpec { future, span } = this
1203 .backend
1204 .try_start_task_execution(task_id, priority, &*this)?;
1205
1206 async {
1207 let result = CaptureFuture::new(future).await;
1208
1209 wait_for_local_tasks().await;
1211
1212 let result = match result {
1213 Ok(Ok(raw_vc)) => {
1214 raw_vc
1217 .to_non_local_unchecked_sync(&*this)
1218 .map_err(|err| err.into())
1219 }
1220 Ok(Err(err)) => Err(err.into()),
1221 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1222 };
1223
1224 let finished_state = this.finish_current_task_state();
1225 let cell_counters = CURRENT_TASK_STATE
1226 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1227 this.backend.task_execution_completed(
1228 task_id,
1229 result,
1230 &cell_counters,
1231 #[cfg(feature = "verify_determinism")]
1232 finished_state.stateful,
1233 finished_state.has_invalidator,
1234 &*this,
1235 )
1236 }
1237 .instrument(span)
1238 .await
1239 };
1240 if let Some(stale_priority) = CURRENT_TASK_STATE
1241 .scope(current_task_state, single_execution_future)
1242 .await
1243 {
1244 this.schedule(task_id, stale_priority);
1247 }
1248 this.finish_foreground_job();
1249 })
1250 .await
1251 };
1252
1253 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1254 }
1255 ScheduledTask::LocalTask {
1256 ty,
1257 persistence,
1258 local_task_id,
1259 global_task_state,
1260 span,
1261 } => {
1262 let this2 = this.clone();
1263 let this = this.clone();
1264 let task_type = ty.task_type;
1265 let future = async move {
1266 let span = match &ty.task_type {
1267 LocalTaskType::ResolveNative { native_fn } => {
1268 native_fn.resolve_span(priority)
1269 }
1270 LocalTaskType::ResolveTrait { trait_method } => {
1271 trait_method.resolve_span(priority)
1272 }
1273 };
1274 abort_on_panic(
1275 async move {
1276 let result = match ty.task_type {
1277 LocalTaskType::ResolveNative { native_fn } => {
1278 LocalTaskType::run_resolve_native(
1279 native_fn,
1280 ty.this,
1281 &*ty.arg,
1282 persistence,
1283 this,
1284 )
1285 .await
1286 }
1287 LocalTaskType::ResolveTrait { trait_method } => {
1288 LocalTaskType::run_resolve_trait(
1289 trait_method,
1290 ty.this.unwrap(),
1291 &*ty.arg,
1292 persistence,
1293 this,
1294 )
1295 .await
1296 }
1297 };
1298
1299 let output = match result {
1300 Ok(raw_vc) => OutputContent::Link(raw_vc),
1301 Err(err) => OutputContent::Error(
1302 TurboTasksExecutionError::from(err)
1303 .with_local_task_context(task_type.to_string()),
1304 ),
1305 };
1306
1307 CURRENT_TASK_STATE.with(move |gts| {
1308 gts.write()
1309 .unwrap()
1310 .local_tasks
1311 .complete(local_task_id, output);
1312 });
1313 }
1314 .instrument(span),
1315 )
1316 .await
1317 };
1318 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1319
1320 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1321 }
1322 }
1323 }
1324}
1325
1326struct FinishedTaskState {
1327 #[cfg(feature = "verify_determinism")]
1330 stateful: bool,
1331
1332 has_invalidator: bool,
1334}
1335
1336impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1337 fn dynamic_call(
1338 &self,
1339 native_fn: &'static NativeFunction,
1340 this: Option<RawVc>,
1341 arg: &mut dyn StackDynTaskInputs,
1342 persistence: TaskPersistence,
1343 ) -> RawVc {
1344 self.dynamic_call(native_fn, this, arg, persistence)
1345 }
1346 fn native_call(
1347 &self,
1348 native_fn: &'static NativeFunction,
1349 this: Option<RawVc>,
1350 arg: &mut dyn StackDynTaskInputs,
1351 persistence: TaskPersistence,
1352 ) -> RawVc {
1353 self.native_call(native_fn, this, arg, persistence)
1354 }
1355 fn trait_call(
1356 &self,
1357 trait_method: &'static TraitMethod,
1358 this: RawVc,
1359 arg: &mut dyn StackDynTaskInputs,
1360 persistence: TaskPersistence,
1361 ) -> RawVc {
1362 self.trait_call(trait_method, this, arg, persistence)
1363 }
1364
1365 #[track_caller]
1366 fn run(
1367 &self,
1368 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1369 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1370 let this = self.pin();
1371 Box::pin(async move { this.run(future).await })
1372 }
1373
1374 #[track_caller]
1375 fn run_once(
1376 &self,
1377 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1378 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1379 let this = self.pin();
1380 Box::pin(async move { this.run_once(future).await })
1381 }
1382
1383 #[track_caller]
1384 fn run_once_with_reason(
1385 &self,
1386 reason: StaticOrArc<dyn InvalidationReason>,
1387 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1389 {
1390 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1391 reason_set.insert(reason);
1392 }
1393 let this = self.pin();
1394 Box::pin(async move { this.run_once(future).await })
1395 }
1396
1397 #[track_caller]
1398 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1399 self.start_once_process(future)
1400 }
1401
1402 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1403 if let Err(e) = self.compilation_events.send(event) {
1404 tracing::warn!("Failed to send compilation event: {e}");
1405 }
1406 }
1407
1408 fn get_task_name(&self, task: TaskId) -> String {
1409 self.backend.get_task_name(task, self)
1410 }
1411}
1412
1413impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1414 #[instrument(level = "info", skip_all, name = "invalidate")]
1415 fn invalidate(&self, task: TaskId) {
1416 self.backend.invalidate_task(task, self);
1417 }
1418
1419 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1420 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1421 {
1422 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1423 reason_set.insert(reason);
1424 }
1425 self.backend.invalidate_task(task, self);
1426 }
1427
1428 fn invalidate_serialization(&self, task: TaskId) {
1429 self.backend.invalidate_serialization(task, self);
1430 }
1431
1432 #[track_caller]
1433 fn try_read_task_output(
1434 &self,
1435 task: TaskId,
1436 options: ReadOutputOptions,
1437 ) -> Result<Result<RawVc, EventListener>> {
1438 if options.consistency == ReadConsistency::Eventual {
1439 debug_assert_not_in_top_level_task("read_task_output");
1440 }
1441 self.backend.try_read_task_output(
1442 task,
1443 current_task_if_available("reading Vcs"),
1444 options,
1445 self,
1446 )
1447 }
1448
1449 #[track_caller]
1450 fn try_read_task_cell(
1451 &self,
1452 task: TaskId,
1453 index: CellId,
1454 options: ReadCellOptions,
1455 ) -> Result<Result<TypedCellContent, EventListener>> {
1456 let reader = current_task_if_available("reading Vcs");
1457 self.backend
1458 .try_read_task_cell(task, index, reader, options, self)
1459 }
1460
1461 fn try_read_own_task_cell(
1462 &self,
1463 current_task: TaskId,
1464 index: CellId,
1465 ) -> Result<TypedCellContent> {
1466 self.backend
1467 .try_read_own_task_cell(current_task, index, self)
1468 }
1469
1470 #[track_caller]
1471 fn try_read_local_output(
1472 &self,
1473 execution_id: ExecutionId,
1474 local_task_id: LocalTaskId,
1475 ) -> Result<Result<RawVc, EventListener>> {
1476 debug_assert_not_in_top_level_task("read_local_output");
1477 CURRENT_TASK_STATE.with(|gts| {
1478 let gts_read = gts.read().unwrap();
1479
1480 gts_read.assert_execution_id(execution_id);
1485
1486 match gts_read.local_tasks.get(local_task_id) {
1487 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1488 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1489 }
1490 })
1491 }
1492
1493 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1494 self.backend.read_task_collectibles(
1497 task,
1498 trait_id,
1499 current_task_if_available("reading collectibles"),
1500 self,
1501 )
1502 }
1503
1504 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1505 self.backend.emit_collectible(
1506 trait_type,
1507 collectible,
1508 current_task("emitting collectible"),
1509 self,
1510 );
1511 }
1512
1513 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1514 self.backend.unemit_collectible(
1515 trait_type,
1516 collectible,
1517 count,
1518 current_task("emitting collectible"),
1519 self,
1520 );
1521 }
1522
1523 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1524 for (&collectible, &count) in collectibles {
1525 if count > 0 {
1526 self.backend.unemit_collectible(
1527 trait_type,
1528 collectible,
1529 count as u32,
1530 current_task("emitting collectible"),
1531 self,
1532 );
1533 }
1534 }
1535 }
1536
1537 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1538 self.try_read_own_task_cell(task, index)
1539 }
1540
1541 fn update_own_task_cell(
1542 &self,
1543 task: TaskId,
1544 index: CellId,
1545 content: CellContent,
1546 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1547 content_hash: Option<CellHash>,
1548 verification_mode: VerificationMode,
1549 ) {
1550 self.backend.update_task_cell(
1551 task,
1552 index,
1553 content,
1554 updated_key_hashes,
1555 content_hash,
1556 verification_mode,
1557 self,
1558 );
1559 }
1560
1561 fn connect_task(&self, task: TaskId) {
1562 self.backend
1563 .connect_task(task, current_task_if_available("connecting task"), self);
1564 }
1565
1566 fn mark_own_task_as_finished(&self, task: TaskId) {
1567 self.backend.mark_own_task_as_finished(task, self);
1568 }
1569
1570 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1573 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1576 global_task_state
1577 .write()
1578 .unwrap()
1579 .local_tasks
1580 .register_detached();
1581 let wrapped = async move {
1582 struct DropGuard;
1584 impl Drop for DropGuard {
1585 fn drop(&mut self) {
1586 CURRENT_TASK_STATE
1587 .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1588 }
1589 }
1590 let _guard = DropGuard;
1591 fut.await;
1592 };
1593 tokio::spawn(TURBO_TASKS.scope(
1594 turbo_tasks(),
1595 CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1596 ));
1597 }
1598
1599 fn task_statistics(&self) -> &TaskStatisticsApi {
1600 self.backend.task_statistics()
1601 }
1602
1603 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1604 let this = self.pin();
1605 Box::pin(async move {
1606 this.stop_and_wait().await;
1607 })
1608 }
1609
1610 fn subscribe_to_compilation_events(
1611 &self,
1612 event_types: Option<Vec<String>>,
1613 ) -> Receiver<Arc<dyn CompilationEvent>> {
1614 self.compilation_events.subscribe(event_types)
1615 }
1616
1617 fn is_tracking_dependencies(&self) -> bool {
1618 self.backend.is_tracking_dependencies()
1619 }
1620}
1621
1622impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1623 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1624 self.pin()
1625 }
1626 fn backend(&self) -> &B {
1627 &self.backend
1628 }
1629
1630 #[track_caller]
1631 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1632 self.schedule_background_job(async move |this| {
1633 this.backend.run_backend_job(job, &*this).await;
1634 this
1635 })
1636 }
1637
1638 #[track_caller]
1639 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1640 self.schedule_foreground_job(async move |this| {
1641 this.backend.run_backend_job(job, &*this).await;
1642 this
1643 })
1644 }
1645
1646 #[track_caller]
1647 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1648 self.schedule(task, priority)
1649 }
1650
1651 fn get_current_task_priority(&self) -> TaskPriority {
1652 CURRENT_TASK_STATE
1653 .try_with(|task_state| task_state.read().unwrap().priority)
1654 .unwrap_or(TaskPriority::initial())
1655 }
1656
1657 fn program_duration_until(&self, instant: Instant) -> Duration {
1658 instant - self.program_start
1659 }
1660
1661 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1662 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1664 }
1665
1666 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1667 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1669 }
1670
1671 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1672 unsafe { self.task_id_factory.reuse(id.into()) }
1673 }
1674
1675 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1676 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1677 }
1678
1679 fn is_idle(&self) -> bool {
1680 self.currently_scheduled_foreground_jobs
1681 .load(Ordering::Acquire)
1682 == 0
1683 }
1684}
1685
1686async fn wait_for_local_tasks() {
1687 let listener =
1688 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1689 let Some(listener) = listener else {
1690 return;
1691 };
1692 listener.await;
1693}
1694
1695pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1696 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1697 Ok(id) => id,
1698 Err(_) => panic!(
1699 "{from} can only be used in the context of a turbo_tasks task execution or \
1700 turbo_tasks run"
1701 ),
1702 }
1703}
1704
1705pub(crate) fn current_task(from: &str) -> TaskId {
1706 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1707 Ok(Some(id)) => id,
1708 Ok(None) | Err(_) => {
1709 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1710 }
1711 }
1712}
1713
1714#[track_caller]
1717pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1718 if !cfg!(debug_assertions) {
1719 return;
1720 }
1721
1722 let in_top_level = CURRENT_TASK_STATE
1723 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1724 .unwrap_or(true);
1725 if !in_top_level {
1726 panic!("{message}");
1727 }
1728}
1729
1730#[track_caller]
1731pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1732 if !cfg!(debug_assertions) {
1733 return;
1734 }
1735
1736 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1739 .try_with(|&suppressed| suppressed)
1740 .unwrap_or(false);
1741 if suppressed {
1742 return;
1743 }
1744
1745 let in_top_level = CURRENT_TASK_STATE
1746 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1747 .unwrap_or(false);
1748 if in_top_level {
1749 panic!(
1750 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1751 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1752 reads to avoid leaking inconsistent return values."
1753 );
1754 }
1755}
1756
1757pub async fn run<T: Send + 'static>(
1758 tt: Arc<dyn TurboTasksApi>,
1759 future: impl Future<Output = Result<T>> + Send + 'static,
1760) -> Result<T> {
1761 let (tx, rx) = tokio::sync::oneshot::channel();
1762
1763 tt.run(Box::pin(async move {
1764 let result = future.await?;
1765 tx.send(result)
1766 .map_err(|_| anyhow!("unable to send result"))?;
1767 Ok(())
1768 }))
1769 .await?;
1770
1771 Ok(rx.await?)
1772}
1773
1774pub async fn run_once<T: Send + 'static>(
1775 tt: Arc<dyn TurboTasksApi>,
1776 future: impl Future<Output = Result<T>> + Send + 'static,
1777) -> Result<T> {
1778 let (tx, rx) = tokio::sync::oneshot::channel();
1779
1780 tt.run_once(Box::pin(async move {
1781 let result = future.await?;
1782 tx.send(result)
1783 .map_err(|_| anyhow!("unable to send result"))?;
1784 Ok(())
1785 }))
1786 .await?;
1787
1788 Ok(rx.await?)
1789}
1790
1791pub async fn run_once_with_reason<T: Send + 'static>(
1792 tt: Arc<dyn TurboTasksApi>,
1793 reason: impl InvalidationReason,
1794 future: impl Future<Output = Result<T>> + Send + 'static,
1795) -> Result<T> {
1796 let (tx, rx) = tokio::sync::oneshot::channel();
1797
1798 tt.run_once_with_reason(
1799 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1800 Box::pin(async move {
1801 let result = future.await?;
1802 tx.send(result)
1803 .map_err(|_| anyhow!("unable to send result"))?;
1804 Ok(())
1805 }),
1806 )
1807 .await?;
1808
1809 Ok(rx.await?)
1810}
1811
1812pub fn dynamic_call(
1814 func: &'static NativeFunction,
1815 this: Option<RawVc>,
1816 arg: &mut dyn StackDynTaskInputs,
1817 persistence: TaskPersistence,
1818) -> RawVc {
1819 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1820}
1821
1822pub fn trait_call(
1824 trait_method: &'static TraitMethod,
1825 this: RawVc,
1826 arg: &mut dyn StackDynTaskInputs,
1827 persistence: TaskPersistence,
1828) -> RawVc {
1829 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1830}
1831
1832pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1833 TURBO_TASKS.with(|arc| arc.clone())
1834}
1835
1836pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1837 TURBO_TASKS.with(Arc::downgrade)
1838}
1839
1840pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1841 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1842}
1843
1844pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1845 TURBO_TASKS.with(|arc| func(arc))
1846}
1847
1848pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1849 TURBO_TASKS.sync_scope(tt, f)
1850}
1851
1852pub fn turbo_tasks_future_scope<T>(
1853 tt: Arc<dyn TurboTasksApi>,
1854 f: impl Future<Output = T>,
1855) -> impl Future<Output = T> {
1856 TURBO_TASKS.scope(tt, f)
1857}
1858
1859pub fn with_turbo_tasks_for_testing<T>(
1860 tt: Arc<dyn TurboTasksApi>,
1861 current_task: TaskId,
1862 execution_id: ExecutionId,
1863 f: impl Future<Output = T>,
1864) -> impl Future<Output = T> {
1865 TURBO_TASKS.scope(
1866 tt,
1867 CURRENT_TASK_STATE.scope(
1868 Arc::new(RwLock::new(CurrentTaskState::new(
1869 current_task,
1870 execution_id,
1871 TaskPriority::initial(),
1872 false, ))),
1874 f,
1875 ),
1876 )
1877}
1878
1879pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1884 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1885}
1886
1887pub fn current_task_for_testing() -> Option<TaskId> {
1888 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1889}
1890
1891pub fn mark_finished() {
1894 with_turbo_tasks(|tt| {
1895 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1896 });
1897}
1898
1899pub fn get_serialization_invalidator() -> SerializationInvalidator {
1905 CURRENT_TASK_STATE.with(|cell| {
1906 let CurrentTaskState {
1907 task_id,
1908 #[cfg(feature = "verify_determinism")]
1909 stateful,
1910 ..
1911 } = &mut *cell.write().unwrap();
1912 #[cfg(feature = "verify_determinism")]
1913 {
1914 *stateful = true;
1915 }
1916 let Some(task_id) = *task_id else {
1917 panic!(
1918 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1919 task execution"
1920 );
1921 };
1922 SerializationInvalidator::new(task_id)
1923 })
1924}
1925
1926pub fn mark_invalidator() {
1927 CURRENT_TASK_STATE.with(|cell| {
1928 let CurrentTaskState {
1929 has_invalidator, ..
1930 } = &mut *cell.write().unwrap();
1931 *has_invalidator = true;
1932 })
1933}
1934
1935pub fn mark_stateful() {
1941 #[cfg(feature = "verify_determinism")]
1942 {
1943 CURRENT_TASK_STATE.with(|cell| {
1944 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1945 *stateful = true;
1946 })
1947 }
1948 }
1950
1951pub fn mark_top_level_task() {
1955 if cfg!(debug_assertions) {
1956 CURRENT_TASK_STATE.with(|cell| {
1957 cell.write().unwrap().in_top_level_task = true;
1958 })
1959 }
1960}
1961
1962pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1973 if cfg!(debug_assertions) {
1974 CURRENT_TASK_STATE.with(|cell| {
1975 cell.write().unwrap().in_top_level_task = false;
1976 })
1977 }
1978}
1979
1980pub fn prevent_gc() {
1981 }
1983
1984pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1985 with_turbo_tasks(|tt| {
1986 let raw_vc = collectible.node.node;
1987 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1988 })
1989}
1990
1991pub(crate) async fn read_task_output(
1992 this: &dyn TurboTasksApi,
1993 id: TaskId,
1994 options: ReadOutputOptions,
1995) -> Result<RawVc> {
1996 loop {
1997 match this.try_read_task_output(id, options)? {
1998 Ok(result) => return Ok(result),
1999 Err(listener) => listener.await,
2000 }
2001 }
2002}
2003
2004#[derive(Clone, Copy)]
2010pub struct CurrentCellRef {
2011 current_task: TaskId,
2012 index: CellId,
2013}
2014
2015type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2016
2017impl CurrentCellRef {
2018 fn conditional_update<T>(
2020 &self,
2021 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2022 ) where
2023 T: VcValueType,
2024 {
2025 self.conditional_update_with_shared_reference(|old_shared_reference| {
2026 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2027 let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2028 Some((
2029 SharedReference::new(triomphe::Arc::new(new_value)),
2030 updated_key_hashes,
2031 content_hash,
2032 ))
2033 })
2034 }
2035
2036 fn conditional_update_with_shared_reference(
2038 &self,
2039 functor: impl FnOnce(
2040 Option<&SharedReference>,
2041 ) -> Option<(
2042 SharedReference,
2043 Option<SmallVec<[u64; 2]>>,
2044 Option<CellHash>,
2045 )>,
2046 ) {
2047 let tt = turbo_tasks();
2048 let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2049 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2050 if let Some((update, updated_key_hashes, content_hash)) = update {
2051 tt.update_own_task_cell(
2052 self.current_task,
2053 self.index,
2054 CellContent(Some(update)),
2055 updated_key_hashes,
2056 content_hash,
2057 VerificationMode::EqualityCheck,
2058 )
2059 }
2060 }
2061
2062 pub fn compare_and_update<T>(&self, new_value: T)
2096 where
2097 T: PartialEq + VcValueType,
2098 {
2099 self.conditional_update(|old_value| {
2100 if let Some(old_value) = old_value
2101 && old_value == &new_value
2102 {
2103 return None;
2104 }
2105 Some((new_value, None, None))
2106 });
2107 }
2108
2109 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2117 where
2118 T: VcValueType + PartialEq,
2119 {
2120 self.conditional_update_with_shared_reference(|old_sr| {
2121 if let Some(old_sr) = old_sr {
2122 let old_value = extract_sr_value::<T>(old_sr);
2123 let new_value = extract_sr_value::<T>(&new_shared_reference);
2124 if old_value == new_value {
2125 return None;
2126 }
2127 }
2128 Some((new_shared_reference, None, None))
2129 });
2130 }
2131
2132 pub fn hashed_compare_and_update<T>(&self, new_value: T)
2141 where
2142 T: PartialEq + DeterministicHash + VcValueType,
2143 {
2144 self.conditional_update(|old_value| {
2145 if let Some(old_value) = old_value
2146 && old_value == &new_value
2147 {
2148 return None;
2149 }
2150 let content_hash = hash_xxh3_hash128(&new_value);
2151 Some((new_value, None, Some(content_hash)))
2152 });
2153 }
2154
2155 pub fn hashed_compare_and_update_with_shared_reference<T>(
2161 &self,
2162 new_shared_reference: SharedReference,
2163 ) where
2164 T: VcValueType + PartialEq + DeterministicHash,
2165 {
2166 self.conditional_update_with_shared_reference(move |old_sr| {
2167 if let Some(old_sr) = old_sr {
2168 let old_value = extract_sr_value::<T>(old_sr);
2169 let new_value = extract_sr_value::<T>(&new_shared_reference);
2170 if old_value == new_value {
2171 return None;
2172 }
2173 }
2174 let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2175 Some((new_shared_reference, None, Some(content_hash)))
2176 });
2177 }
2178
2179 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2181 where
2182 T: PartialEq + VcValueType,
2183 VcReadTarget<T>: KeyedEq,
2184 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2185 {
2186 self.conditional_update(|old_value| {
2187 let Some(old_value) = old_value else {
2188 return Some((new_value, None, None));
2189 };
2190 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2191 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2192 let updated_keys = old_value.different_keys(new_value_ref);
2193 if updated_keys.is_empty() {
2194 return None;
2195 }
2196 let updated_key_hashes = updated_keys
2198 .into_iter()
2199 .map(|key| FxBuildHasher.hash_one(key))
2200 .collect();
2201 Some((new_value, Some(updated_key_hashes), None))
2202 });
2203 }
2204
2205 pub fn keyed_compare_and_update_with_shared_reference<T>(
2208 &self,
2209 new_shared_reference: SharedReference,
2210 ) where
2211 T: VcValueType + PartialEq,
2212 VcReadTarget<T>: KeyedEq,
2213 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2214 {
2215 self.conditional_update_with_shared_reference(|old_sr| {
2216 let Some(old_sr) = old_sr else {
2217 return Some((new_shared_reference, None, None));
2218 };
2219 let old_value = extract_sr_value::<T>(old_sr);
2220 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2221 let new_value = extract_sr_value::<T>(&new_shared_reference);
2222 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2223 let updated_keys = old_value.different_keys(new_value);
2224 if updated_keys.is_empty() {
2225 return None;
2226 }
2227 let updated_key_hashes = updated_keys
2229 .into_iter()
2230 .map(|key| FxBuildHasher.hash_one(key))
2231 .collect();
2232 Some((new_shared_reference, Some(updated_key_hashes), None))
2233 });
2234 }
2235
2236 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2238 where
2239 T: VcValueType,
2240 {
2241 let tt = turbo_tasks();
2242 tt.update_own_task_cell(
2243 self.current_task,
2244 self.index,
2245 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2246 None,
2247 None,
2248 verification_mode,
2249 )
2250 }
2251
2252 pub fn update_with_shared_reference(
2260 &self,
2261 shared_ref: SharedReference,
2262 verification_mode: VerificationMode,
2263 ) {
2264 let tt = turbo_tasks();
2265 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2266 let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2267 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2268 shared_ref_exp != shared_ref
2270 } else {
2271 true
2272 }
2273 } else {
2274 true
2275 };
2276 if update {
2277 tt.update_own_task_cell(
2278 self.current_task,
2279 self.index,
2280 CellContent(Some(shared_ref)),
2281 None,
2282 None,
2283 verification_mode,
2284 )
2285 }
2286 }
2287}
2288
2289impl From<CurrentCellRef> for RawVc {
2290 fn from(cell: CurrentCellRef) -> Self {
2291 RawVc::TaskCell(cell.current_task, cell.index)
2292 }
2293}
2294
2295fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2296 sr.0.downcast_ref::<T>()
2297 .expect("cannot update SharedReference of different type")
2298}
2299
2300pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2301 find_cell_by_id(T::get_value_type_id())
2302}
2303
2304pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2305 CURRENT_TASK_STATE.with(|ts| {
2306 let current_task = current_task("celling turbo_tasks values");
2307 let mut ts = ts.write().unwrap();
2308 let map = ts.cell_counters.as_mut().unwrap();
2309 let current_index = map.entry(ty).or_default();
2310 let index = *current_index;
2311 *current_index += 1;
2312 CurrentCellRef {
2313 current_task,
2314 index: CellId { type_id: ty, index },
2315 }
2316 })
2317}
2318
2319pub(crate) async fn read_local_output(
2320 this: &dyn TurboTasksApi,
2321 execution_id: ExecutionId,
2322 local_task_id: LocalTaskId,
2323) -> Result<RawVc> {
2324 loop {
2325 match this.try_read_local_output(execution_id, local_task_id)? {
2326 Ok(raw_vc) => return Ok(raw_vc),
2327 Err(event_listener) => event_listener.await,
2328 }
2329 }
2330}