1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 panic::AssertUnwindSafe,
8 pin::Pin,
9 process::abort,
10 sync::{
11 Arc, Mutex, RwLock, Weak,
12 atomic::{AtomicBool, AtomicUsize, Ordering},
13 },
14 time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32 VcValueTrait, VcValueType,
33 backend::{
34 Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35 TurboTasksExecutionError, TypedCellContent, VerificationMode,
36 },
37 capture_future::CaptureFuture,
38 dyn_task_inputs::StackDynTaskInputs,
39 event::{Event, EventListener},
40 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41 id_factory::IdFactoryWithReuse,
42 keyed::KeyedEq,
43 local_task_tracker::LocalTaskTracker,
44 macro_helpers::NativeFunction,
45 message_queue::{CompilationEvent, CompilationEventQueue},
46 priority_runner::{Executor, PriorityRunner},
47 raw_vc::{CellId, RawVc},
48 registry,
49 serialization_invalidation::SerializationInvalidator,
50 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51 task_statistics::TaskStatisticsApi,
52 trace::TraceRawVcs,
53 util::{IdFactory, StaticOrArc},
54};
55
56pub trait TurboTasksCallApi: Sync + Send {
59 fn dynamic_call(
62 &self,
63 native_fn: &'static NativeFunction,
64 this: Option<RawVc>,
65 arg: &mut dyn StackDynTaskInputs,
66 persistence: TaskPersistence,
67 ) -> RawVc;
68 fn native_call(
71 &self,
72 native_fn: &'static NativeFunction,
73 this: Option<RawVc>,
74 arg: &mut dyn StackDynTaskInputs,
75 persistence: TaskPersistence,
76 ) -> RawVc;
77 fn trait_call(
80 &self,
81 trait_method: &'static TraitMethod,
82 this: RawVc,
83 arg: &mut dyn StackDynTaskInputs,
84 persistence: TaskPersistence,
85 ) -> RawVc;
86
87 fn run(
88 &self,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91 fn run_once(
92 &self,
93 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95 fn run_once_with_reason(
96 &self,
97 reason: StaticOrArc<dyn InvalidationReason>,
98 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105 fn get_task_name(&self, task: TaskId) -> String;
107}
108
109pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115 fn invalidate(&self, task: TaskId);
116 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118 fn invalidate_serialization(&self, task: TaskId);
119
120 fn try_read_task_output(
121 &self,
122 task: TaskId,
123 options: ReadOutputOptions,
124 ) -> Result<Result<RawVc, EventListener>>;
125
126 fn try_read_task_cell(
127 &self,
128 task: TaskId,
129 index: CellId,
130 options: ReadCellOptions,
131 ) -> Result<Result<TypedCellContent, EventListener>>;
132
133 fn try_read_local_output(
148 &self,
149 execution_id: ExecutionId,
150 local_task_id: LocalTaskId,
151 ) -> Result<Result<RawVc, EventListener>>;
152
153 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159 fn try_read_own_task_cell(
162 &self,
163 current_task: TaskId,
164 index: CellId,
165 ) -> Result<TypedCellContent>;
166
167 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168 fn update_own_task_cell(
169 &self,
170 task: TaskId,
171 index: CellId,
172 content: CellContent,
173 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174 content_hash: Option<CellHash>,
175 verification_mode: VerificationMode,
176 );
177 fn mark_own_task_as_finished(&self, task: TaskId);
178
179 fn connect_task(&self, task: TaskId);
180
181 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
186
187 fn task_statistics(&self) -> &TaskStatisticsApi;
188
189 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
190
191 fn subscribe_to_compilation_events(
192 &self,
193 event_types: Option<Vec<String>>,
194 ) -> Receiver<Arc<dyn CompilationEvent>>;
195
196 fn is_tracking_dependencies(&self) -> bool;
198}
199
200pub struct Unused<T> {
202 inner: T,
203}
204
205impl<T> Unused<T> {
206 pub unsafe fn new_unchecked(inner: T) -> Self {
212 Self { inner }
213 }
214
215 pub unsafe fn get_unchecked(&self) -> &T {
221 &self.inner
222 }
223
224 pub fn into(self) -> T {
226 self.inner
227 }
228}
229
230pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
232 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
233
234 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
235 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
236 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
240 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
244
245 fn schedule(&self, task: TaskId, priority: TaskPriority);
247
248 fn get_current_task_priority(&self) -> TaskPriority;
250
251 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
253
254 fn schedule_backend_background_job(&self, job: B::BackendJob);
259
260 fn program_duration_until(&self, instant: Instant) -> Duration;
262
263 fn is_idle(&self) -> bool;
265
266 fn backend(&self) -> &B;
268}
269
270#[allow(clippy::manual_non_exhaustive)]
271pub struct UpdateInfo {
272 pub duration: Duration,
273 pub tasks: usize,
274 pub reasons: InvalidationReasonSet,
275 #[allow(dead_code)]
276 placeholder_for_future_fields: (),
277}
278
279#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
280pub enum TaskPersistence {
281 Persistent,
283
284 Transient,
291}
292
293impl Display for TaskPersistence {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 match self {
296 TaskPersistence::Persistent => write!(f, "persistent"),
297 TaskPersistence::Transient => write!(f, "transient"),
298 }
299 }
300}
301
302#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
303pub enum ReadConsistency {
304 #[default]
307 Eventual,
308 Strong,
313}
314
315#[derive(Clone, Copy, Debug, Eq, PartialEq)]
316pub enum ReadCellTracking {
317 Tracked {
319 key: Option<u64>,
321 },
322 TrackOnlyError,
327 Untracked,
332}
333
334impl ReadCellTracking {
335 pub fn should_track(&self, is_err: bool) -> bool {
336 match self {
337 ReadCellTracking::Tracked { .. } => true,
338 ReadCellTracking::TrackOnlyError => is_err,
339 ReadCellTracking::Untracked => false,
340 }
341 }
342
343 pub fn key(&self) -> Option<u64> {
344 match self {
345 ReadCellTracking::Tracked { key } => *key,
346 ReadCellTracking::TrackOnlyError => None,
347 ReadCellTracking::Untracked => None,
348 }
349 }
350}
351
352impl Default for ReadCellTracking {
353 fn default() -> Self {
354 ReadCellTracking::Tracked { key: None }
355 }
356}
357
358impl Display for ReadCellTracking {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 match self {
361 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
362 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
363 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
364 ReadCellTracking::Untracked => write!(f, "untracked"),
365 }
366 }
367}
368
369#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
370pub enum ReadTracking {
371 #[default]
373 Tracked,
374 TrackOnlyError,
379 Untracked,
384}
385
386impl ReadTracking {
387 pub fn should_track(&self, is_err: bool) -> bool {
388 match self {
389 ReadTracking::Tracked => true,
390 ReadTracking::TrackOnlyError => is_err,
391 ReadTracking::Untracked => false,
392 }
393 }
394}
395
396impl Display for ReadTracking {
397 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398 match self {
399 ReadTracking::Tracked => write!(f, "tracked"),
400 ReadTracking::TrackOnlyError => write!(f, "track only error"),
401 ReadTracking::Untracked => write!(f, "untracked"),
402 }
403 }
404}
405
406#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
407pub enum TaskPriority {
408 #[default]
409 Initial,
410 Invalidation {
411 priority: Reverse<u32>,
412 },
413 Recomputation,
414}
415
416impl TaskPriority {
417 pub fn invalidation(priority: u32) -> Self {
418 Self::Invalidation {
419 priority: Reverse(priority),
420 }
421 }
422
423 pub fn initial() -> Self {
424 Self::Initial
425 }
426
427 pub fn leaf() -> Self {
428 Self::Invalidation {
429 priority: Reverse(0),
430 }
431 }
432
433 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
434 match self {
435 TaskPriority::Initial => parent_priority,
436 TaskPriority::Invalidation { priority } => {
437 if let TaskPriority::Invalidation {
438 priority: parent_priority,
439 } = parent_priority
440 && priority.0 < parent_priority.0
441 {
442 Self::Invalidation {
443 priority: Reverse(parent_priority.0.saturating_add(1)),
444 }
445 } else {
446 *self
447 }
448 }
449 TaskPriority::Recomputation => TaskPriority::Recomputation,
450 }
451 }
452}
453
454impl Display for TaskPriority {
455 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456 match self {
457 TaskPriority::Initial => write!(f, "initial"),
458 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
459 TaskPriority::Recomputation => write!(f, "recomputation"),
460 }
461 }
462}
463
464enum ScheduledTask {
465 Task {
466 task_id: TaskId,
467 span: Span,
468 },
469 LocalTask {
470 ty: LocalTaskSpec,
471 persistence: TaskPersistence,
472 local_task_id: LocalTaskId,
473 global_task_state: Arc<RwLock<CurrentTaskState>>,
474 span: Span,
475 },
476}
477
478pub struct TurboTasks<B: Backend + 'static> {
479 this: Weak<Self>,
480 backend: B,
481 task_id_factory: IdFactoryWithReuse<TaskId>,
482 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
483 execution_id_factory: IdFactory<ExecutionId>,
484 stopped: AtomicBool,
485 currently_scheduled_foreground_jobs: AtomicUsize,
486 currently_scheduled_background_jobs: AtomicUsize,
487 scheduled_tasks: AtomicUsize,
488 priority_runner:
489 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
490 start: Mutex<Option<Instant>>,
491 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
492 event_foreground_start: Event,
494 event_foreground_done: Event,
497 event_background_done: Event,
499 program_start: Instant,
500 compilation_events: CompilationEventQueue,
501}
502
503struct CurrentTaskState {
512 task_id: Option<TaskId>,
513 execution_id: ExecutionId,
514 priority: TaskPriority,
515
516 #[cfg(feature = "verify_determinism")]
519 stateful: bool,
520
521 has_invalidator: bool,
523
524 in_top_level_task: bool,
527
528 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
533
534 local_tasks: LocalTaskTracker,
537}
538
539impl CurrentTaskState {
540 fn new(
541 task_id: TaskId,
542 execution_id: ExecutionId,
543 priority: TaskPriority,
544 in_top_level_task: bool,
545 ) -> Self {
546 Self {
547 task_id: Some(task_id),
548 execution_id,
549 priority,
550 #[cfg(feature = "verify_determinism")]
551 stateful: false,
552 has_invalidator: false,
553 in_top_level_task,
554 cell_counters: Some(AutoMap::default()),
555 local_tasks: LocalTaskTracker::new(),
556 }
557 }
558
559 fn new_temporary(
560 execution_id: ExecutionId,
561 priority: TaskPriority,
562 in_top_level_task: bool,
563 ) -> Self {
564 Self {
565 task_id: None,
566 execution_id,
567 priority,
568 #[cfg(feature = "verify_determinism")]
569 stateful: false,
570 has_invalidator: false,
571 in_top_level_task,
572 cell_counters: None,
573 local_tasks: LocalTaskTracker::new(),
574 }
575 }
576
577 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
578 if self.execution_id != expected_execution_id {
579 panic!(
580 "Local tasks can only be scheduled/awaited within the same execution of the \
581 parent task that created them"
582 );
583 }
584 }
585}
586
587task_local! {
589 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
591
592 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
593
594 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
599}
600
601impl<B: Backend + 'static> TurboTasks<B> {
602 pub fn new(backend: B) -> Arc<Self> {
608 let task_id_factory = IdFactoryWithReuse::new(
609 TaskId::MIN,
610 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
611 );
612 let transient_task_id_factory =
613 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
614 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
615 let this = Arc::new_cyclic(|this| Self {
616 this: this.clone(),
617 backend,
618 task_id_factory,
619 transient_task_id_factory,
620 execution_id_factory,
621 stopped: AtomicBool::new(false),
622 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
623 currently_scheduled_background_jobs: AtomicUsize::new(0),
624 scheduled_tasks: AtomicUsize::new(0),
625 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
626 start: Default::default(),
627 aggregated_update: Default::default(),
628 event_foreground_done: Event::new(|| {
629 || "TurboTasks::event_foreground_done".to_string()
630 }),
631 event_foreground_start: Event::new(|| {
632 || "TurboTasks::event_foreground_start".to_string()
633 }),
634 event_background_done: Event::new(|| {
635 || "TurboTasks::event_background_done".to_string()
636 }),
637 program_start: Instant::now(),
638 compilation_events: CompilationEventQueue::default(),
639 });
640 this.backend.startup(&*this);
641 this
642 }
643
644 pub fn pin(&self) -> Arc<Self> {
645 self.this.upgrade().unwrap()
646 }
647
648 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
650 where
651 T: ?Sized,
652 F: Fn() -> Fut + Send + Sync + Clone + 'static,
653 Fut: Future<Output = Result<Vc<T>>> + Send,
654 {
655 let id = self.backend.create_transient_task(
656 TransientTaskType::Root(Box::new(move || {
657 let functor = functor.clone();
658 Box::pin(async move {
659 mark_top_level_task();
660 let raw_vc = functor().await?.node;
661 raw_vc.to_non_local().await
662 })
663 })),
664 self,
665 );
666 self.schedule(id, TaskPriority::initial());
667 id
668 }
669
670 pub fn dispose_root_task(&self, task_id: TaskId) {
671 self.backend.dispose_root_task(task_id, self);
672 }
673
674 #[track_caller]
678 fn spawn_once_task<T, Fut>(&self, future: Fut)
679 where
680 T: ?Sized,
681 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
682 {
683 let id = self.backend.create_transient_task(
684 TransientTaskType::Once(Box::pin(async move {
685 mark_top_level_task();
686 let raw_vc = future.await?.node;
687 raw_vc.to_non_local().await
688 })),
689 self,
690 );
691 self.schedule(id, TaskPriority::initial());
692 }
693
694 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
695 &self,
696 future: impl Future<Output = Result<T>> + Send + 'static,
697 ) -> Result<T> {
698 let (tx, rx) = tokio::sync::oneshot::channel();
699 self.spawn_once_task(async move {
700 mark_top_level_task();
701 let result = future.await;
702 tx.send(result)
703 .map_err(|_| anyhow!("unable to send result"))?;
704 Ok(Completion::new())
705 });
706
707 rx.await?
708 }
709
710 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
711 pub async fn run<T: TraceRawVcs + Send + 'static>(
712 &self,
713 future: impl Future<Output = Result<T>> + Send + 'static,
714 ) -> Result<T, TurboTasksExecutionError> {
715 self.begin_foreground_job();
716 let execution_id = self.execution_id_factory.wrapping_get();
718 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
719 execution_id,
720 TaskPriority::initial(),
721 true, )));
723
724 let result = TURBO_TASKS
725 .scope(
726 self.pin(),
727 CURRENT_TASK_STATE.scope(current_task_state, async {
728 let result = CaptureFuture::new(future).await;
729
730 wait_for_local_tasks().await;
732
733 match result {
734 Ok(Ok(value)) => Ok(value),
735 Ok(Err(err)) => Err(err.into()),
736 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
737 }
738 }),
739 )
740 .await;
741 self.finish_foreground_job();
742 result
743 }
744
745 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
746 let this = self.pin();
747 tokio::spawn(async move {
748 this.pin()
749 .run_once(async move {
750 this.finish_foreground_job();
751 future.await;
752 this.begin_foreground_job();
753 Ok(())
754 })
755 .await
756 .unwrap()
757 });
758 }
759
760 pub(crate) fn native_call(
761 &self,
762 native_fn: &'static NativeFunction,
763 this: Option<RawVc>,
764 arg: &mut dyn StackDynTaskInputs,
765 persistence: TaskPersistence,
766 ) -> RawVc {
767 RawVc::TaskOutput(self.backend.get_or_create_task(
768 native_fn,
769 this,
770 arg,
771 current_task_if_available("turbo_function calls"),
772 persistence,
773 self,
774 ))
775 }
776
777 pub fn dynamic_call(
778 &self,
779 native_fn: &'static NativeFunction,
780 this: Option<RawVc>,
781 arg: &mut dyn StackDynTaskInputs,
782 persistence: TaskPersistence,
783 ) -> RawVc {
784 if this.is_none_or(|this| this.is_resolved())
785 && native_fn.arg_meta.is_resolved(arg.as_ref())
786 {
787 return self.native_call(native_fn, this, arg, persistence);
788 }
789 let arg = arg.take_box();
791 let task_type = LocalTaskSpec {
792 task_type: LocalTaskType::ResolveNative { native_fn },
793 this,
794 arg,
795 };
796 self.schedule_local_task(task_type, persistence)
797 }
798
799 pub fn trait_call(
800 &self,
801 trait_method: &'static TraitMethod,
802 this: RawVc,
803 arg: &mut dyn StackDynTaskInputs,
804 persistence: TaskPersistence,
805 ) -> RawVc {
806 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
810 match registry::get_value_type(type_id).get_trait_method(trait_method) {
811 Some(native_fn) => {
812 if let Some(filter) = native_fn.arg_meta.filter_owned {
813 let mut arg = (filter)(arg);
814 return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
815 } else {
816 return self.dynamic_call(native_fn, Some(this), arg, persistence);
817 }
818 }
819 None => {
820 }
824 }
825 }
826
827 let task_type = LocalTaskSpec {
829 task_type: LocalTaskType::ResolveTrait { trait_method },
830 this: Some(this),
831 arg: arg.take_box(),
832 };
833
834 self.schedule_local_task(task_type, persistence)
835 }
836
837 #[track_caller]
838 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
839 self.begin_foreground_job();
840 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
841
842 self.priority_runner.schedule(
843 &self.pin(),
844 ScheduledTask::Task {
845 task_id,
846 span: Span::current(),
847 },
848 priority,
849 );
850 }
851
852 fn schedule_local_task(
853 &self,
854 ty: LocalTaskSpec,
855 persistence: TaskPersistence,
857 ) -> RawVc {
858 let task_type = ty.task_type;
859 let (global_task_state, execution_id, priority, local_task_id) =
860 CURRENT_TASK_STATE.with(|gts| {
861 let mut gts_write = gts.write().unwrap();
862 let local_task_id = gts_write.local_tasks.create(task_type);
863 (
864 Arc::clone(gts),
865 gts_write.execution_id,
866 gts_write.priority,
867 local_task_id,
868 )
869 });
870
871 self.priority_runner.schedule(
872 &self.pin(),
873 ScheduledTask::LocalTask {
874 ty,
875 persistence,
876 local_task_id,
877 global_task_state,
878 span: Span::current(),
879 },
880 priority,
881 );
882
883 RawVc::LocalOutput(execution_id, local_task_id, persistence)
884 }
885
886 fn begin_foreground_job(&self) {
887 if self
888 .currently_scheduled_foreground_jobs
889 .fetch_add(1, Ordering::AcqRel)
890 == 0
891 {
892 *self.start.lock().unwrap() = Some(Instant::now());
893 self.event_foreground_start.notify(usize::MAX);
894 self.backend.idle_end(self);
895 }
896 }
897
898 fn finish_foreground_job(&self) {
899 if self
900 .currently_scheduled_foreground_jobs
901 .fetch_sub(1, Ordering::AcqRel)
902 == 1
903 {
904 self.backend.idle_start(self);
905 let total = self.scheduled_tasks.load(Ordering::Acquire);
908 self.scheduled_tasks.store(0, Ordering::Release);
909 if let Some(start) = *self.start.lock().unwrap() {
910 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
911 if let Some(update) = update.as_mut() {
912 update.0 += start.elapsed();
913 update.1 += total;
914 } else {
915 *update = Some((start.elapsed(), total));
916 }
917 }
918 self.event_foreground_done.notify(usize::MAX);
919 }
920 }
921
922 fn begin_background_job(&self) {
923 self.currently_scheduled_background_jobs
924 .fetch_add(1, Ordering::Relaxed);
925 }
926
927 fn finish_background_job(&self) {
928 if self
929 .currently_scheduled_background_jobs
930 .fetch_sub(1, Ordering::Relaxed)
931 == 1
932 {
933 self.event_background_done.notify(usize::MAX);
934 }
935 }
936
937 pub fn get_in_progress_count(&self) -> usize {
938 self.currently_scheduled_foreground_jobs
939 .load(Ordering::Acquire)
940 }
941
942 pub async fn wait_task_completion(
954 &self,
955 id: TaskId,
956 consistency: ReadConsistency,
957 ) -> Result<()> {
958 read_task_output(
959 self,
960 id,
961 ReadOutputOptions {
962 tracking: ReadTracking::Untracked,
964 consistency,
965 },
966 )
967 .await?;
968 Ok(())
969 }
970
971 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
974 self.aggregated_update_info(aggregation, Duration::MAX)
975 .await
976 .unwrap()
977 }
978
979 pub async fn aggregated_update_info(
983 &self,
984 aggregation: Duration,
985 timeout: Duration,
986 ) -> Option<UpdateInfo> {
987 let listener = self
988 .event_foreground_done
989 .listen_with_note(|| || "wait for update info".to_string());
990 let wait_for_finish = {
991 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
992 if aggregation.is_zero() {
993 if let Some((duration, tasks)) = update.take() {
994 return Some(UpdateInfo {
995 duration,
996 tasks,
997 reasons: take(reason_set),
998 placeholder_for_future_fields: (),
999 });
1000 } else {
1001 true
1002 }
1003 } else {
1004 update.is_none()
1005 }
1006 };
1007 if wait_for_finish {
1008 if timeout == Duration::MAX {
1009 listener.await;
1011 } else {
1012 let start_listener = self
1014 .event_foreground_start
1015 .listen_with_note(|| || "wait for update info".to_string());
1016 if self
1017 .currently_scheduled_foreground_jobs
1018 .load(Ordering::Acquire)
1019 == 0
1020 {
1021 start_listener.await;
1022 } else {
1023 drop(start_listener);
1024 }
1025 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1026 return None;
1028 }
1029 }
1030 }
1031 if !aggregation.is_zero() {
1032 loop {
1033 select! {
1034 () = tokio::time::sleep(aggregation) => {
1035 break;
1036 }
1037 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1038 }
1040 }
1041 }
1042 }
1043 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1044 if let Some((duration, tasks)) = update.take() {
1045 Some(UpdateInfo {
1046 duration,
1047 tasks,
1048 reasons: take(reason_set),
1049 placeholder_for_future_fields: (),
1050 })
1051 } else {
1052 panic!("aggregated_update_info must not called concurrently")
1053 }
1054 }
1055
1056 pub async fn wait_background_done(&self) {
1057 let listener = self.event_background_done.listen();
1058 if self
1059 .currently_scheduled_background_jobs
1060 .load(Ordering::Acquire)
1061 != 0
1062 {
1063 listener.await;
1064 }
1065 }
1066
1067 pub async fn stop_and_wait(&self) {
1068 turbo_tasks_future_scope(self.pin(), async move {
1069 self.backend.stopping(self);
1070 self.stopped.store(true, Ordering::Release);
1071 {
1072 let listener = self
1073 .event_foreground_done
1074 .listen_with_note(|| || "wait for stop".to_string());
1075 if self
1076 .currently_scheduled_foreground_jobs
1077 .load(Ordering::Acquire)
1078 != 0
1079 {
1080 listener.await;
1081 }
1082 }
1083 {
1084 let listener = self.event_background_done.listen();
1085 if self
1086 .currently_scheduled_background_jobs
1087 .load(Ordering::Acquire)
1088 != 0
1089 {
1090 listener.await;
1091 }
1092 }
1093 self.backend.stop(self);
1094 })
1095 .await;
1096 }
1097
1098 #[track_caller]
1099 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1100 where
1101 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1102 T::CallOnceFuture: Send,
1103 {
1104 let mut this = self.pin();
1105 this.begin_foreground_job();
1106 tokio::spawn(
1107 TURBO_TASKS
1108 .scope(this.clone(), async move {
1109 if !this.stopped.load(Ordering::Acquire) {
1110 this = func(this.clone()).await;
1111 }
1112 this.finish_foreground_job();
1113 })
1114 .in_current_span(),
1115 );
1116 }
1117
1118 #[track_caller]
1119 pub(crate) fn schedule_background_job<T>(&self, func: T)
1120 where
1121 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1122 T::CallOnceFuture: Send,
1123 {
1124 let mut this = self.pin();
1125 self.begin_background_job();
1126 tokio::spawn(
1127 TURBO_TASKS
1128 .scope(this.clone(), async move {
1129 if !this.stopped.load(Ordering::Acquire) {
1130 this = func(this).await;
1131 }
1132 this.finish_background_job();
1133 })
1134 .in_current_span(),
1135 );
1136 }
1137
1138 fn finish_current_task_state(&self) -> FinishedTaskState {
1139 CURRENT_TASK_STATE.with(|cell| {
1140 let current_task_state = &*cell.write().unwrap();
1141 FinishedTaskState {
1142 #[cfg(feature = "verify_determinism")]
1143 stateful: current_task_state.stateful,
1144 has_invalidator: current_task_state.has_invalidator,
1145 }
1146 })
1147 }
1148
1149 pub fn backend(&self) -> &B {
1150 &self.backend
1151 }
1152}
1153
1154struct TurboTasksExecutor;
1155
1156async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1161 match AssertUnwindSafe(f).catch_unwind().await {
1162 Ok(r) => r,
1163 Err(_) => {
1164 eprintln!(
1165 "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1166 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1167 https://github.com/vercel/next.js/discussions and include the panic message \
1168 and stack trace above.\n\nAborting."
1169 );
1170 abort();
1171 }
1172 }
1173}
1174
1175impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1176 type Future = impl Future<Output = ()> + Send + 'static;
1177
1178 fn execute(
1179 &self,
1180 this: &Arc<TurboTasks<B>>,
1181 scheduled_task: ScheduledTask,
1182 priority: TaskPriority,
1183 ) -> Self::Future {
1184 match scheduled_task {
1185 ScheduledTask::Task { task_id, span } => {
1186 let this2 = this.clone();
1187 let this = this.clone();
1188 let future = async move {
1189 abort_on_panic(async {
1190 let execution_id = this.execution_id_factory.wrapping_get();
1193 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1194 task_id,
1195 execution_id,
1196 priority,
1197 false, )));
1199 let single_execution_future = async {
1200 if this.stopped.load(Ordering::Acquire) {
1201 this.backend.task_execution_canceled(task_id, &*this);
1202 return None;
1203 }
1204
1205 let TaskExecutionSpec { future, span } = this
1206 .backend
1207 .try_start_task_execution(task_id, priority, &*this)?;
1208
1209 async {
1210 let result = CaptureFuture::new(future).await;
1211
1212 wait_for_local_tasks().await;
1214
1215 let result = match result {
1216 Ok(Ok(raw_vc)) => {
1217 raw_vc
1220 .to_non_local_unchecked_sync(&*this)
1221 .map_err(|err| err.into())
1222 }
1223 Ok(Err(err)) => Err(err.into()),
1224 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1225 };
1226
1227 let finished_state = this.finish_current_task_state();
1228 let cell_counters = CURRENT_TASK_STATE
1229 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1230 this.backend.task_execution_completed(
1231 task_id,
1232 result,
1233 &cell_counters,
1234 #[cfg(feature = "verify_determinism")]
1235 finished_state.stateful,
1236 finished_state.has_invalidator,
1237 &*this,
1238 )
1239 }
1240 .instrument(span)
1241 .await
1242 };
1243 if let Some(stale_priority) = CURRENT_TASK_STATE
1244 .scope(current_task_state, single_execution_future)
1245 .await
1246 {
1247 this.schedule(task_id, stale_priority);
1250 }
1251 this.finish_foreground_job();
1252 })
1253 .await
1254 };
1255
1256 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1257 }
1258 ScheduledTask::LocalTask {
1259 ty,
1260 persistence,
1261 local_task_id,
1262 global_task_state,
1263 span,
1264 } => {
1265 let this2 = this.clone();
1266 let this = this.clone();
1267 let task_type = ty.task_type;
1268 let future = async move {
1269 let span = match &ty.task_type {
1270 LocalTaskType::ResolveNative { native_fn } => {
1271 native_fn.resolve_span(priority)
1272 }
1273 LocalTaskType::ResolveTrait { trait_method } => {
1274 trait_method.resolve_span(priority)
1275 }
1276 };
1277 abort_on_panic(
1278 async move {
1279 let result = match ty.task_type {
1280 LocalTaskType::ResolveNative { native_fn } => {
1281 LocalTaskType::run_resolve_native(
1282 native_fn,
1283 ty.this,
1284 &*ty.arg,
1285 persistence,
1286 this,
1287 )
1288 .await
1289 }
1290 LocalTaskType::ResolveTrait { trait_method } => {
1291 LocalTaskType::run_resolve_trait(
1292 trait_method,
1293 ty.this.unwrap(),
1294 &*ty.arg,
1295 persistence,
1296 this,
1297 )
1298 .await
1299 }
1300 };
1301
1302 let output = match result {
1303 Ok(raw_vc) => OutputContent::Link(raw_vc),
1304 Err(err) => OutputContent::Error(
1305 TurboTasksExecutionError::from(err)
1306 .with_local_task_context(task_type.to_string()),
1307 ),
1308 };
1309
1310 CURRENT_TASK_STATE.with(move |gts| {
1311 gts.write()
1312 .unwrap()
1313 .local_tasks
1314 .complete(local_task_id, output);
1315 });
1316 }
1317 .instrument(span),
1318 )
1319 .await
1320 };
1321 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1322
1323 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1324 }
1325 }
1326 }
1327}
1328
1329struct FinishedTaskState {
1330 #[cfg(feature = "verify_determinism")]
1333 stateful: bool,
1334
1335 has_invalidator: bool,
1337}
1338
1339impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1340 fn dynamic_call(
1341 &self,
1342 native_fn: &'static NativeFunction,
1343 this: Option<RawVc>,
1344 arg: &mut dyn StackDynTaskInputs,
1345 persistence: TaskPersistence,
1346 ) -> RawVc {
1347 self.dynamic_call(native_fn, this, arg, persistence)
1348 }
1349 fn native_call(
1350 &self,
1351 native_fn: &'static NativeFunction,
1352 this: Option<RawVc>,
1353 arg: &mut dyn StackDynTaskInputs,
1354 persistence: TaskPersistence,
1355 ) -> RawVc {
1356 self.native_call(native_fn, this, arg, persistence)
1357 }
1358 fn trait_call(
1359 &self,
1360 trait_method: &'static TraitMethod,
1361 this: RawVc,
1362 arg: &mut dyn StackDynTaskInputs,
1363 persistence: TaskPersistence,
1364 ) -> RawVc {
1365 self.trait_call(trait_method, this, arg, persistence)
1366 }
1367
1368 #[track_caller]
1369 fn run(
1370 &self,
1371 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1372 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1373 let this = self.pin();
1374 Box::pin(async move { this.run(future).await })
1375 }
1376
1377 #[track_caller]
1378 fn run_once(
1379 &self,
1380 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1381 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1382 let this = self.pin();
1383 Box::pin(async move { this.run_once(future).await })
1384 }
1385
1386 #[track_caller]
1387 fn run_once_with_reason(
1388 &self,
1389 reason: StaticOrArc<dyn InvalidationReason>,
1390 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1391 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1392 {
1393 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1394 reason_set.insert(reason);
1395 }
1396 let this = self.pin();
1397 Box::pin(async move { this.run_once(future).await })
1398 }
1399
1400 #[track_caller]
1401 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1402 self.start_once_process(future)
1403 }
1404
1405 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1406 if let Err(e) = self.compilation_events.send(event) {
1407 tracing::warn!("Failed to send compilation event: {e}");
1408 }
1409 }
1410
1411 fn get_task_name(&self, task: TaskId) -> String {
1412 self.backend.get_task_name(task, self)
1413 }
1414}
1415
1416impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1417 #[instrument(level = "info", skip_all, name = "invalidate")]
1418 fn invalidate(&self, task: TaskId) {
1419 self.backend.invalidate_task(task, self);
1420 }
1421
1422 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1423 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1424 {
1425 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1426 reason_set.insert(reason);
1427 }
1428 self.backend.invalidate_task(task, self);
1429 }
1430
1431 fn invalidate_serialization(&self, task: TaskId) {
1432 self.backend.invalidate_serialization(task, self);
1433 }
1434
1435 #[track_caller]
1436 fn try_read_task_output(
1437 &self,
1438 task: TaskId,
1439 options: ReadOutputOptions,
1440 ) -> Result<Result<RawVc, EventListener>> {
1441 if options.consistency == ReadConsistency::Eventual {
1442 debug_assert_not_in_top_level_task("read_task_output");
1443 }
1444 self.backend.try_read_task_output(
1445 task,
1446 current_task_if_available("reading Vcs"),
1447 options,
1448 self,
1449 )
1450 }
1451
1452 #[track_caller]
1453 fn try_read_task_cell(
1454 &self,
1455 task: TaskId,
1456 index: CellId,
1457 options: ReadCellOptions,
1458 ) -> Result<Result<TypedCellContent, EventListener>> {
1459 let reader = current_task_if_available("reading Vcs");
1460 self.backend
1461 .try_read_task_cell(task, index, reader, options, self)
1462 }
1463
1464 fn try_read_own_task_cell(
1465 &self,
1466 current_task: TaskId,
1467 index: CellId,
1468 ) -> Result<TypedCellContent> {
1469 self.backend
1470 .try_read_own_task_cell(current_task, index, self)
1471 }
1472
1473 #[track_caller]
1474 fn try_read_local_output(
1475 &self,
1476 execution_id: ExecutionId,
1477 local_task_id: LocalTaskId,
1478 ) -> Result<Result<RawVc, EventListener>> {
1479 debug_assert_not_in_top_level_task("read_local_output");
1480 CURRENT_TASK_STATE.with(|gts| {
1481 let gts_read = gts.read().unwrap();
1482
1483 gts_read.assert_execution_id(execution_id);
1488
1489 match gts_read.local_tasks.get(local_task_id) {
1490 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1491 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1492 }
1493 })
1494 }
1495
1496 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1497 self.backend.read_task_collectibles(
1500 task,
1501 trait_id,
1502 current_task_if_available("reading collectibles"),
1503 self,
1504 )
1505 }
1506
1507 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1508 self.backend.emit_collectible(
1509 trait_type,
1510 collectible,
1511 current_task("emitting collectible"),
1512 self,
1513 );
1514 }
1515
1516 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1517 self.backend.unemit_collectible(
1518 trait_type,
1519 collectible,
1520 count,
1521 current_task("emitting collectible"),
1522 self,
1523 );
1524 }
1525
1526 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1527 for (&collectible, &count) in collectibles {
1528 if count > 0 {
1529 self.backend.unemit_collectible(
1530 trait_type,
1531 collectible,
1532 count as u32,
1533 current_task("emitting collectible"),
1534 self,
1535 );
1536 }
1537 }
1538 }
1539
1540 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1541 self.try_read_own_task_cell(task, index)
1542 }
1543
1544 fn update_own_task_cell(
1545 &self,
1546 task: TaskId,
1547 index: CellId,
1548 content: CellContent,
1549 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1550 content_hash: Option<CellHash>,
1551 verification_mode: VerificationMode,
1552 ) {
1553 self.backend.update_task_cell(
1554 task,
1555 index,
1556 content,
1557 updated_key_hashes,
1558 content_hash,
1559 verification_mode,
1560 self,
1561 );
1562 }
1563
1564 fn connect_task(&self, task: TaskId) {
1565 self.backend
1566 .connect_task(task, current_task_if_available("connecting task"), self);
1567 }
1568
1569 fn mark_own_task_as_finished(&self, task: TaskId) {
1570 self.backend.mark_own_task_as_finished(task, self);
1571 }
1572
1573 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1576 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1579 global_task_state
1580 .write()
1581 .unwrap()
1582 .local_tasks
1583 .register_detached();
1584 let wrapped = async move {
1585 struct DropGuard;
1587 impl Drop for DropGuard {
1588 fn drop(&mut self) {
1589 CURRENT_TASK_STATE
1590 .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1591 }
1592 }
1593 let _guard = DropGuard;
1594 fut.await;
1595 };
1596 tokio::spawn(TURBO_TASKS.scope(
1597 turbo_tasks(),
1598 CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1599 ));
1600 }
1601
1602 fn task_statistics(&self) -> &TaskStatisticsApi {
1603 self.backend.task_statistics()
1604 }
1605
1606 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1607 let this = self.pin();
1608 Box::pin(async move {
1609 this.stop_and_wait().await;
1610 })
1611 }
1612
1613 fn subscribe_to_compilation_events(
1614 &self,
1615 event_types: Option<Vec<String>>,
1616 ) -> Receiver<Arc<dyn CompilationEvent>> {
1617 self.compilation_events.subscribe(event_types)
1618 }
1619
1620 fn is_tracking_dependencies(&self) -> bool {
1621 self.backend.is_tracking_dependencies()
1622 }
1623}
1624
1625impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1626 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1627 self.pin()
1628 }
1629 fn backend(&self) -> &B {
1630 &self.backend
1631 }
1632
1633 #[track_caller]
1634 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1635 self.schedule_background_job(async move |this| {
1636 this.backend.run_backend_job(job, &*this).await;
1637 this
1638 })
1639 }
1640
1641 #[track_caller]
1642 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1643 self.schedule_foreground_job(async move |this| {
1644 this.backend.run_backend_job(job, &*this).await;
1645 this
1646 })
1647 }
1648
1649 #[track_caller]
1650 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1651 self.schedule(task, priority)
1652 }
1653
1654 fn get_current_task_priority(&self) -> TaskPriority {
1655 CURRENT_TASK_STATE
1656 .try_with(|task_state| task_state.read().unwrap().priority)
1657 .unwrap_or(TaskPriority::initial())
1658 }
1659
1660 fn program_duration_until(&self, instant: Instant) -> Duration {
1661 instant - self.program_start
1662 }
1663
1664 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1665 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1667 }
1668
1669 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1670 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1672 }
1673
1674 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1675 unsafe { self.task_id_factory.reuse(id.into()) }
1676 }
1677
1678 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1679 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1680 }
1681
1682 fn is_idle(&self) -> bool {
1683 self.currently_scheduled_foreground_jobs
1684 .load(Ordering::Acquire)
1685 == 0
1686 }
1687}
1688
1689async fn wait_for_local_tasks() {
1690 let listener =
1691 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1692 let Some(listener) = listener else {
1693 return;
1694 };
1695 listener.await;
1696}
1697
1698pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1699 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1700 Ok(id) => id,
1701 Err(_) => panic!(
1702 "{from} can only be used in the context of a turbo_tasks task execution or \
1703 turbo_tasks run"
1704 ),
1705 }
1706}
1707
1708pub(crate) fn current_task(from: &str) -> TaskId {
1709 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1710 Ok(Some(id)) => id,
1711 Ok(None) | Err(_) => {
1712 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1713 }
1714 }
1715}
1716
1717#[track_caller]
1720pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1721 if !cfg!(debug_assertions) {
1722 return;
1723 }
1724
1725 let in_top_level = CURRENT_TASK_STATE
1726 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1727 .unwrap_or(true);
1728 if !in_top_level {
1729 panic!("{message}");
1730 }
1731}
1732
1733#[track_caller]
1734pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1735 if !cfg!(debug_assertions) {
1736 return;
1737 }
1738
1739 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1742 .try_with(|&suppressed| suppressed)
1743 .unwrap_or(false);
1744 if suppressed {
1745 return;
1746 }
1747
1748 let in_top_level = CURRENT_TASK_STATE
1749 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1750 .unwrap_or(false);
1751 if in_top_level {
1752 panic!(
1753 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1754 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1755 reads to avoid leaking inconsistent return values."
1756 );
1757 }
1758}
1759
1760pub async fn run<T: Send + 'static>(
1761 tt: Arc<dyn TurboTasksApi>,
1762 future: impl Future<Output = Result<T>> + Send + 'static,
1763) -> Result<T> {
1764 let (tx, rx) = tokio::sync::oneshot::channel();
1765
1766 tt.run(Box::pin(async move {
1767 let result = future.await?;
1768 tx.send(result)
1769 .map_err(|_| anyhow!("unable to send result"))?;
1770 Ok(())
1771 }))
1772 .await?;
1773
1774 Ok(rx.await?)
1775}
1776
1777pub async fn run_once<T: Send + 'static>(
1778 tt: Arc<dyn TurboTasksApi>,
1779 future: impl Future<Output = Result<T>> + Send + 'static,
1780) -> Result<T> {
1781 let (tx, rx) = tokio::sync::oneshot::channel();
1782
1783 tt.run_once(Box::pin(async move {
1784 let result = future.await?;
1785 tx.send(result)
1786 .map_err(|_| anyhow!("unable to send result"))?;
1787 Ok(())
1788 }))
1789 .await?;
1790
1791 Ok(rx.await?)
1792}
1793
1794pub async fn run_once_with_reason<T: Send + 'static>(
1795 tt: Arc<dyn TurboTasksApi>,
1796 reason: impl InvalidationReason,
1797 future: impl Future<Output = Result<T>> + Send + 'static,
1798) -> Result<T> {
1799 let (tx, rx) = tokio::sync::oneshot::channel();
1800
1801 tt.run_once_with_reason(
1802 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1803 Box::pin(async move {
1804 let result = future.await?;
1805 tx.send(result)
1806 .map_err(|_| anyhow!("unable to send result"))?;
1807 Ok(())
1808 }),
1809 )
1810 .await?;
1811
1812 Ok(rx.await?)
1813}
1814
1815pub fn dynamic_call(
1817 func: &'static NativeFunction,
1818 this: Option<RawVc>,
1819 arg: &mut dyn StackDynTaskInputs,
1820 persistence: TaskPersistence,
1821) -> RawVc {
1822 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1823}
1824
1825pub fn trait_call(
1827 trait_method: &'static TraitMethod,
1828 this: RawVc,
1829 arg: &mut dyn StackDynTaskInputs,
1830 persistence: TaskPersistence,
1831) -> RawVc {
1832 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1833}
1834
1835pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1836 TURBO_TASKS.with(|arc| arc.clone())
1837}
1838
1839pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1840 TURBO_TASKS.with(Arc::downgrade)
1841}
1842
1843pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1844 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1845}
1846
1847pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1848 TURBO_TASKS.with(|arc| func(arc))
1849}
1850
1851pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1852 TURBO_TASKS.sync_scope(tt, f)
1853}
1854
1855pub fn turbo_tasks_future_scope<T>(
1856 tt: Arc<dyn TurboTasksApi>,
1857 f: impl Future<Output = T>,
1858) -> impl Future<Output = T> {
1859 TURBO_TASKS.scope(tt, f)
1860}
1861
1862pub fn with_turbo_tasks_for_testing<T>(
1863 tt: Arc<dyn TurboTasksApi>,
1864 current_task: TaskId,
1865 execution_id: ExecutionId,
1866 f: impl Future<Output = T>,
1867) -> impl Future<Output = T> {
1868 TURBO_TASKS.scope(
1869 tt,
1870 CURRENT_TASK_STATE.scope(
1871 Arc::new(RwLock::new(CurrentTaskState::new(
1872 current_task,
1873 execution_id,
1874 TaskPriority::initial(),
1875 false, ))),
1877 f,
1878 ),
1879 )
1880}
1881
1882pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1887 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1888}
1889
1890pub fn current_task_for_testing() -> Option<TaskId> {
1891 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1892}
1893
1894pub fn mark_finished() {
1897 with_turbo_tasks(|tt| {
1898 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1899 });
1900}
1901
1902pub fn get_serialization_invalidator() -> SerializationInvalidator {
1908 CURRENT_TASK_STATE.with(|cell| {
1909 let CurrentTaskState {
1910 task_id,
1911 #[cfg(feature = "verify_determinism")]
1912 stateful,
1913 ..
1914 } = &mut *cell.write().unwrap();
1915 #[cfg(feature = "verify_determinism")]
1916 {
1917 *stateful = true;
1918 }
1919 let Some(task_id) = *task_id else {
1920 panic!(
1921 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1922 task execution"
1923 );
1924 };
1925 SerializationInvalidator::new(task_id)
1926 })
1927}
1928
1929pub fn mark_invalidator() {
1930 CURRENT_TASK_STATE.with(|cell| {
1931 let CurrentTaskState {
1932 has_invalidator, ..
1933 } = &mut *cell.write().unwrap();
1934 *has_invalidator = true;
1935 })
1936}
1937
1938pub fn mark_stateful() {
1944 #[cfg(feature = "verify_determinism")]
1945 {
1946 CURRENT_TASK_STATE.with(|cell| {
1947 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1948 *stateful = true;
1949 })
1950 }
1951 }
1953
1954pub fn mark_top_level_task() {
1958 if cfg!(debug_assertions) {
1959 CURRENT_TASK_STATE.with(|cell| {
1960 cell.write().unwrap().in_top_level_task = true;
1961 })
1962 }
1963}
1964
1965pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1976 if cfg!(debug_assertions) {
1977 CURRENT_TASK_STATE.with(|cell| {
1978 cell.write().unwrap().in_top_level_task = false;
1979 })
1980 }
1981}
1982
1983pub fn prevent_gc() {
1984 }
1986
1987pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1988 with_turbo_tasks(|tt| {
1989 let raw_vc = collectible.node.node;
1990 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1991 })
1992}
1993
1994pub(crate) async fn read_task_output(
1995 this: &dyn TurboTasksApi,
1996 id: TaskId,
1997 options: ReadOutputOptions,
1998) -> Result<RawVc> {
1999 loop {
2000 match this.try_read_task_output(id, options)? {
2001 Ok(result) => return Ok(result),
2002 Err(listener) => listener.await,
2003 }
2004 }
2005}
2006
2007#[derive(Clone, Copy)]
2013pub struct CurrentCellRef {
2014 current_task: TaskId,
2015 index: CellId,
2016}
2017
2018type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2019
2020impl CurrentCellRef {
2021 fn conditional_update<T>(
2023 &self,
2024 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2025 ) where
2026 T: VcValueType,
2027 {
2028 self.conditional_update_with_shared_reference(|old_shared_reference| {
2029 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2030 let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2031 Some((
2032 SharedReference::new(triomphe::Arc::new(new_value)),
2033 updated_key_hashes,
2034 content_hash,
2035 ))
2036 })
2037 }
2038
2039 fn conditional_update_with_shared_reference(
2041 &self,
2042 functor: impl FnOnce(
2043 Option<&SharedReference>,
2044 ) -> Option<(
2045 SharedReference,
2046 Option<SmallVec<[u64; 2]>>,
2047 Option<CellHash>,
2048 )>,
2049 ) {
2050 let tt = turbo_tasks();
2051 let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2052 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2053 if let Some((update, updated_key_hashes, content_hash)) = update {
2054 tt.update_own_task_cell(
2055 self.current_task,
2056 self.index,
2057 CellContent(Some(update)),
2058 updated_key_hashes,
2059 content_hash,
2060 VerificationMode::EqualityCheck,
2061 )
2062 }
2063 }
2064
2065 pub fn compare_and_update<T>(&self, new_value: T)
2099 where
2100 T: PartialEq + VcValueType,
2101 {
2102 self.conditional_update(|old_value| {
2103 if let Some(old_value) = old_value
2104 && old_value == &new_value
2105 {
2106 return None;
2107 }
2108 Some((new_value, None, None))
2109 });
2110 }
2111
2112 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2120 where
2121 T: VcValueType + PartialEq,
2122 {
2123 self.conditional_update_with_shared_reference(|old_sr| {
2124 if let Some(old_sr) = old_sr {
2125 let old_value = extract_sr_value::<T>(old_sr);
2126 let new_value = extract_sr_value::<T>(&new_shared_reference);
2127 if old_value == new_value {
2128 return None;
2129 }
2130 }
2131 Some((new_shared_reference, None, None))
2132 });
2133 }
2134
2135 pub fn hashed_compare_and_update<T>(&self, new_value: T)
2144 where
2145 T: PartialEq + DeterministicHash + VcValueType,
2146 {
2147 self.conditional_update(|old_value| {
2148 if let Some(old_value) = old_value
2149 && old_value == &new_value
2150 {
2151 return None;
2152 }
2153 let content_hash = hash_xxh3_hash128(&new_value);
2154 Some((new_value, None, Some(content_hash)))
2155 });
2156 }
2157
2158 pub fn hashed_compare_and_update_with_shared_reference<T>(
2164 &self,
2165 new_shared_reference: SharedReference,
2166 ) where
2167 T: VcValueType + PartialEq + DeterministicHash,
2168 {
2169 self.conditional_update_with_shared_reference(move |old_sr| {
2170 if let Some(old_sr) = old_sr {
2171 let old_value = extract_sr_value::<T>(old_sr);
2172 let new_value = extract_sr_value::<T>(&new_shared_reference);
2173 if old_value == new_value {
2174 return None;
2175 }
2176 }
2177 let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2178 Some((new_shared_reference, None, Some(content_hash)))
2179 });
2180 }
2181
2182 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2184 where
2185 T: PartialEq + VcValueType,
2186 VcReadTarget<T>: KeyedEq,
2187 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2188 {
2189 self.conditional_update(|old_value| {
2190 let Some(old_value) = old_value else {
2191 return Some((new_value, None, None));
2192 };
2193 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2194 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2195 let updated_keys = old_value.different_keys(new_value_ref);
2196 if updated_keys.is_empty() {
2197 return None;
2198 }
2199 let updated_key_hashes = updated_keys
2201 .into_iter()
2202 .map(|key| FxBuildHasher.hash_one(key))
2203 .collect();
2204 Some((new_value, Some(updated_key_hashes), None))
2205 });
2206 }
2207
2208 pub fn keyed_compare_and_update_with_shared_reference<T>(
2211 &self,
2212 new_shared_reference: SharedReference,
2213 ) where
2214 T: VcValueType + PartialEq,
2215 VcReadTarget<T>: KeyedEq,
2216 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2217 {
2218 self.conditional_update_with_shared_reference(|old_sr| {
2219 let Some(old_sr) = old_sr else {
2220 return Some((new_shared_reference, None, None));
2221 };
2222 let old_value = extract_sr_value::<T>(old_sr);
2223 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2224 let new_value = extract_sr_value::<T>(&new_shared_reference);
2225 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2226 let updated_keys = old_value.different_keys(new_value);
2227 if updated_keys.is_empty() {
2228 return None;
2229 }
2230 let updated_key_hashes = updated_keys
2232 .into_iter()
2233 .map(|key| FxBuildHasher.hash_one(key))
2234 .collect();
2235 Some((new_shared_reference, Some(updated_key_hashes), None))
2236 });
2237 }
2238
2239 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2241 where
2242 T: VcValueType,
2243 {
2244 let tt = turbo_tasks();
2245 tt.update_own_task_cell(
2246 self.current_task,
2247 self.index,
2248 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2249 None,
2250 None,
2251 verification_mode,
2252 )
2253 }
2254
2255 pub fn update_with_shared_reference(
2263 &self,
2264 shared_ref: SharedReference,
2265 verification_mode: VerificationMode,
2266 ) {
2267 let tt = turbo_tasks();
2268 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2269 let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2270 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2271 shared_ref_exp != shared_ref
2273 } else {
2274 true
2275 }
2276 } else {
2277 true
2278 };
2279 if update {
2280 tt.update_own_task_cell(
2281 self.current_task,
2282 self.index,
2283 CellContent(Some(shared_ref)),
2284 None,
2285 None,
2286 verification_mode,
2287 )
2288 }
2289 }
2290}
2291
2292impl From<CurrentCellRef> for RawVc {
2293 fn from(cell: CurrentCellRef) -> Self {
2294 RawVc::TaskCell(cell.current_task, cell.index)
2295 }
2296}
2297
2298fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2299 sr.0.downcast_ref::<T>()
2300 .expect("cannot update SharedReference of different type")
2301}
2302
2303pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2304 find_cell_by_id(T::get_value_type_id())
2305}
2306
2307pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2308 CURRENT_TASK_STATE.with(|ts| {
2309 let current_task = current_task("celling turbo_tasks values");
2310 let mut ts = ts.write().unwrap();
2311 let map = ts.cell_counters.as_mut().unwrap();
2312 let current_index = map.entry(ty).or_default();
2313 let index = *current_index;
2314 *current_index += 1;
2315 CurrentCellRef {
2316 current_task,
2317 index: CellId { type_id: ty, index },
2318 }
2319 })
2320}
2321
2322pub(crate) async fn read_local_output(
2323 this: &dyn TurboTasksApi,
2324 execution_id: ExecutionId,
2325 local_task_id: LocalTaskId,
2326) -> Result<RawVc> {
2327 loop {
2328 match this.try_read_local_output(execution_id, local_task_id)? {
2329 Ok(raw_vc) => return Ok(raw_vc),
2330 Err(event_listener) => event_listener.await,
2331 }
2332 }
2333}