1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 panic::AssertUnwindSafe,
8 pin::Pin,
9 process::abort,
10 sync::{
11 Arc, Mutex, RwLock, Weak,
12 atomic::{AtomicBool, AtomicUsize, Ordering},
13 },
14 time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32 VcValueTrait, VcValueType,
33 backend::{
34 Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35 TurboTasksExecutionError, TypedCellContent, VerificationMode,
36 },
37 capture_future::CaptureFuture,
38 dyn_task_inputs::StackDynTaskInputs,
39 event::{Event, EventListener},
40 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41 id_factory::IdFactoryWithReuse,
42 keyed::KeyedEq,
43 local_task_tracker::LocalTaskTracker,
44 macro_helpers::NativeFunction,
45 message_queue::{CompilationEvent, CompilationEventQueue},
46 priority_runner::{Executor, PriorityRunner},
47 raw_vc::{CellId, RawVc},
48 registry,
49 serialization_invalidation::SerializationInvalidator,
50 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51 task_statistics::TaskStatisticsApi,
52 trace::TraceRawVcs,
53 util::{IdFactory, StaticOrArc},
54};
55
56pub trait TurboTasksCallApi: Sync + Send {
59 fn dynamic_call(
62 &self,
63 native_fn: &'static NativeFunction,
64 this: Option<RawVc>,
65 arg: &mut dyn StackDynTaskInputs,
66 persistence: TaskPersistence,
67 ) -> RawVc;
68 fn native_call(
71 &self,
72 native_fn: &'static NativeFunction,
73 this: Option<RawVc>,
74 arg: &mut dyn StackDynTaskInputs,
75 persistence: TaskPersistence,
76 ) -> RawVc;
77 fn trait_call(
80 &self,
81 trait_method: &'static TraitMethod,
82 this: RawVc,
83 arg: &mut dyn StackDynTaskInputs,
84 persistence: TaskPersistence,
85 ) -> RawVc;
86
87 fn run(
88 &self,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91 fn run_once(
92 &self,
93 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95 fn run_once_with_reason(
96 &self,
97 reason: StaticOrArc<dyn InvalidationReason>,
98 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105 fn get_task_name(&self, task: TaskId) -> String;
107}
108
109pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115 fn invalidate(&self, task: TaskId);
116 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118 fn invalidate_serialization(&self, task: TaskId);
119
120 fn try_read_task_output(
121 &self,
122 task: TaskId,
123 options: ReadOutputOptions,
124 ) -> Result<Result<RawVc, EventListener>>;
125
126 fn try_read_task_cell(
127 &self,
128 task: TaskId,
129 index: CellId,
130 options: ReadCellOptions,
131 ) -> Result<Result<TypedCellContent, EventListener>>;
132
133 fn try_read_local_output(
148 &self,
149 execution_id: ExecutionId,
150 local_task_id: LocalTaskId,
151 ) -> Result<Result<RawVc, EventListener>>;
152
153 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159 fn try_read_own_task_cell(
162 &self,
163 current_task: TaskId,
164 index: CellId,
165 ) -> Result<TypedCellContent>;
166
167 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168 fn update_own_task_cell(
169 &self,
170 task: TaskId,
171 index: CellId,
172 content: CellContent,
173 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174 content_hash: Option<CellHash>,
175 verification_mode: VerificationMode,
176 );
177 fn mark_own_task_as_finished(&self, task: TaskId);
178 fn mark_own_task_as_session_dependent(&self, task: TaskId);
179
180 fn connect_task(&self, task: TaskId);
181
182 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
187
188 fn task_statistics(&self) -> &TaskStatisticsApi;
189
190 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
191
192 fn subscribe_to_compilation_events(
193 &self,
194 event_types: Option<Vec<String>>,
195 ) -> Receiver<Arc<dyn CompilationEvent>>;
196
197 fn is_tracking_dependencies(&self) -> bool;
199}
200
201pub struct Unused<T> {
203 inner: T,
204}
205
206impl<T> Unused<T> {
207 pub unsafe fn new_unchecked(inner: T) -> Self {
213 Self { inner }
214 }
215
216 pub unsafe fn get_unchecked(&self) -> &T {
222 &self.inner
223 }
224
225 pub fn into(self) -> T {
227 self.inner
228 }
229}
230
231pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
233 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
234
235 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
236 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
237 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
241 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
245
246 fn schedule(&self, task: TaskId, priority: TaskPriority);
248
249 fn get_current_task_priority(&self) -> TaskPriority;
251
252 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
254
255 fn schedule_backend_background_job(&self, job: B::BackendJob);
260
261 fn program_duration_until(&self, instant: Instant) -> Duration;
263
264 fn is_idle(&self) -> bool;
266
267 fn backend(&self) -> &B;
269}
270
271#[allow(clippy::manual_non_exhaustive)]
272pub struct UpdateInfo {
273 pub duration: Duration,
274 pub tasks: usize,
275 pub reasons: InvalidationReasonSet,
276 #[allow(dead_code)]
277 placeholder_for_future_fields: (),
278}
279
280#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
281pub enum TaskPersistence {
282 Persistent,
284
285 Transient,
292}
293
294impl Display for TaskPersistence {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 match self {
297 TaskPersistence::Persistent => write!(f, "persistent"),
298 TaskPersistence::Transient => write!(f, "transient"),
299 }
300 }
301}
302
303#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
304pub enum ReadConsistency {
305 #[default]
308 Eventual,
309 Strong,
314}
315
316#[derive(Clone, Copy, Debug, Eq, PartialEq)]
317pub enum ReadCellTracking {
318 Tracked {
320 key: Option<u64>,
322 },
323 TrackOnlyError,
328 Untracked,
333}
334
335impl ReadCellTracking {
336 pub fn should_track(&self, is_err: bool) -> bool {
337 match self {
338 ReadCellTracking::Tracked { .. } => true,
339 ReadCellTracking::TrackOnlyError => is_err,
340 ReadCellTracking::Untracked => false,
341 }
342 }
343
344 pub fn key(&self) -> Option<u64> {
345 match self {
346 ReadCellTracking::Tracked { key } => *key,
347 ReadCellTracking::TrackOnlyError => None,
348 ReadCellTracking::Untracked => None,
349 }
350 }
351}
352
353impl Default for ReadCellTracking {
354 fn default() -> Self {
355 ReadCellTracking::Tracked { key: None }
356 }
357}
358
359impl Display for ReadCellTracking {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 match self {
362 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
363 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
364 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
365 ReadCellTracking::Untracked => write!(f, "untracked"),
366 }
367 }
368}
369
370#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
371pub enum ReadTracking {
372 #[default]
374 Tracked,
375 TrackOnlyError,
380 Untracked,
385}
386
387impl ReadTracking {
388 pub fn should_track(&self, is_err: bool) -> bool {
389 match self {
390 ReadTracking::Tracked => true,
391 ReadTracking::TrackOnlyError => is_err,
392 ReadTracking::Untracked => false,
393 }
394 }
395}
396
397impl Display for ReadTracking {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 match self {
400 ReadTracking::Tracked => write!(f, "tracked"),
401 ReadTracking::TrackOnlyError => write!(f, "track only error"),
402 ReadTracking::Untracked => write!(f, "untracked"),
403 }
404 }
405}
406
407#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
408pub enum TaskPriority {
409 #[default]
410 Initial,
411 Invalidation {
412 priority: Reverse<u32>,
413 },
414}
415
416impl TaskPriority {
417 pub fn invalidation(priority: u32) -> Self {
418 Self::Invalidation {
419 priority: Reverse(priority),
420 }
421 }
422
423 pub fn initial() -> Self {
424 Self::Initial
425 }
426
427 pub fn leaf() -> Self {
428 Self::Invalidation {
429 priority: Reverse(0),
430 }
431 }
432
433 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
434 match self {
435 TaskPriority::Initial => parent_priority,
436 TaskPriority::Invalidation { priority } => {
437 if let TaskPriority::Invalidation {
438 priority: parent_priority,
439 } = parent_priority
440 && priority.0 < parent_priority.0
441 {
442 Self::Invalidation {
443 priority: Reverse(parent_priority.0.saturating_add(1)),
444 }
445 } else {
446 *self
447 }
448 }
449 }
450 }
451}
452
453impl Display for TaskPriority {
454 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 match self {
456 TaskPriority::Initial => write!(f, "initial"),
457 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
458 }
459 }
460}
461
462enum ScheduledTask {
463 Task {
464 task_id: TaskId,
465 span: Span,
466 },
467 LocalTask {
468 ty: LocalTaskSpec,
469 persistence: TaskPersistence,
470 local_task_id: LocalTaskId,
471 global_task_state: Arc<RwLock<CurrentTaskState>>,
472 span: Span,
473 },
474}
475
476pub struct TurboTasks<B: Backend + 'static> {
477 this: Weak<Self>,
478 backend: B,
479 task_id_factory: IdFactoryWithReuse<TaskId>,
480 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
481 execution_id_factory: IdFactory<ExecutionId>,
482 stopped: AtomicBool,
483 currently_scheduled_foreground_jobs: AtomicUsize,
484 currently_scheduled_background_jobs: AtomicUsize,
485 scheduled_tasks: AtomicUsize,
486 priority_runner:
487 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
488 start: Mutex<Option<Instant>>,
489 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
490 event_foreground_start: Event,
492 event_foreground_done: Event,
495 event_background_done: Event,
497 program_start: Instant,
498 compilation_events: CompilationEventQueue,
499}
500
501struct CurrentTaskState {
510 task_id: Option<TaskId>,
511 execution_id: ExecutionId,
512 priority: TaskPriority,
513
514 #[cfg(feature = "verify_determinism")]
517 stateful: bool,
518
519 has_invalidator: bool,
521
522 in_top_level_task: bool,
525
526 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
531
532 local_tasks: LocalTaskTracker,
535}
536
537impl CurrentTaskState {
538 fn new(
539 task_id: TaskId,
540 execution_id: ExecutionId,
541 priority: TaskPriority,
542 in_top_level_task: bool,
543 ) -> Self {
544 Self {
545 task_id: Some(task_id),
546 execution_id,
547 priority,
548 #[cfg(feature = "verify_determinism")]
549 stateful: false,
550 has_invalidator: false,
551 in_top_level_task,
552 cell_counters: Some(AutoMap::default()),
553 local_tasks: LocalTaskTracker::new(),
554 }
555 }
556
557 fn new_temporary(
558 execution_id: ExecutionId,
559 priority: TaskPriority,
560 in_top_level_task: bool,
561 ) -> Self {
562 Self {
563 task_id: None,
564 execution_id,
565 priority,
566 #[cfg(feature = "verify_determinism")]
567 stateful: false,
568 has_invalidator: false,
569 in_top_level_task,
570 cell_counters: None,
571 local_tasks: LocalTaskTracker::new(),
572 }
573 }
574
575 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
576 if self.execution_id != expected_execution_id {
577 panic!(
578 "Local tasks can only be scheduled/awaited within the same execution of the \
579 parent task that created them"
580 );
581 }
582 }
583}
584
585task_local! {
587 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
589
590 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
591
592 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
597}
598
599impl<B: Backend + 'static> TurboTasks<B> {
600 pub fn new(backend: B) -> Arc<Self> {
606 let task_id_factory = IdFactoryWithReuse::new(
607 TaskId::MIN,
608 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
609 );
610 let transient_task_id_factory =
611 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
612 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
613 let this = Arc::new_cyclic(|this| Self {
614 this: this.clone(),
615 backend,
616 task_id_factory,
617 transient_task_id_factory,
618 execution_id_factory,
619 stopped: AtomicBool::new(false),
620 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
621 currently_scheduled_background_jobs: AtomicUsize::new(0),
622 scheduled_tasks: AtomicUsize::new(0),
623 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
624 start: Default::default(),
625 aggregated_update: Default::default(),
626 event_foreground_done: Event::new(|| {
627 || "TurboTasks::event_foreground_done".to_string()
628 }),
629 event_foreground_start: Event::new(|| {
630 || "TurboTasks::event_foreground_start".to_string()
631 }),
632 event_background_done: Event::new(|| {
633 || "TurboTasks::event_background_done".to_string()
634 }),
635 program_start: Instant::now(),
636 compilation_events: CompilationEventQueue::default(),
637 });
638 this.backend.startup(&*this);
639 this
640 }
641
642 pub fn pin(&self) -> Arc<Self> {
643 self.this.upgrade().unwrap()
644 }
645
646 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
648 where
649 T: ?Sized,
650 F: Fn() -> Fut + Send + Sync + Clone + 'static,
651 Fut: Future<Output = Result<Vc<T>>> + Send,
652 {
653 let id = self.backend.create_transient_task(
654 TransientTaskType::Root(Box::new(move || {
655 let functor = functor.clone();
656 Box::pin(async move {
657 mark_top_level_task();
658 let raw_vc = functor().await?.node;
659 raw_vc.to_non_local().await
660 })
661 })),
662 self,
663 );
664 self.schedule(id, TaskPriority::initial());
665 id
666 }
667
668 pub fn dispose_root_task(&self, task_id: TaskId) {
669 self.backend.dispose_root_task(task_id, self);
670 }
671
672 #[track_caller]
676 fn spawn_once_task<T, Fut>(&self, future: Fut)
677 where
678 T: ?Sized,
679 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
680 {
681 let id = self.backend.create_transient_task(
682 TransientTaskType::Once(Box::pin(async move {
683 mark_top_level_task();
684 let raw_vc = future.await?.node;
685 raw_vc.to_non_local().await
686 })),
687 self,
688 );
689 self.schedule(id, TaskPriority::initial());
690 }
691
692 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
693 &self,
694 future: impl Future<Output = Result<T>> + Send + 'static,
695 ) -> Result<T> {
696 let (tx, rx) = tokio::sync::oneshot::channel();
697 self.spawn_once_task(async move {
698 mark_top_level_task();
699 let result = future.await;
700 tx.send(result)
701 .map_err(|_| anyhow!("unable to send result"))?;
702 Ok(Completion::new())
703 });
704
705 rx.await?
706 }
707
708 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
709 pub async fn run<T: TraceRawVcs + Send + 'static>(
710 &self,
711 future: impl Future<Output = Result<T>> + Send + 'static,
712 ) -> Result<T, TurboTasksExecutionError> {
713 self.begin_foreground_job();
714 let execution_id = self.execution_id_factory.wrapping_get();
716 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
717 execution_id,
718 TaskPriority::initial(),
719 true, )));
721
722 let result = TURBO_TASKS
723 .scope(
724 self.pin(),
725 CURRENT_TASK_STATE.scope(current_task_state, async {
726 let result = CaptureFuture::new(future).await;
727
728 wait_for_local_tasks().await;
730
731 match result {
732 Ok(Ok(value)) => Ok(value),
733 Ok(Err(err)) => Err(err.into()),
734 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
735 }
736 }),
737 )
738 .await;
739 self.finish_foreground_job();
740 result
741 }
742
743 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
744 let this = self.pin();
745 tokio::spawn(async move {
746 this.pin()
747 .run_once(async move {
748 this.finish_foreground_job();
749 future.await;
750 this.begin_foreground_job();
751 Ok(())
752 })
753 .await
754 .unwrap()
755 });
756 }
757
758 pub(crate) fn native_call(
759 &self,
760 native_fn: &'static NativeFunction,
761 this: Option<RawVc>,
762 arg: &mut dyn StackDynTaskInputs,
763 persistence: TaskPersistence,
764 ) -> RawVc {
765 RawVc::TaskOutput(self.backend.get_or_create_task(
766 native_fn,
767 this,
768 arg,
769 current_task_if_available("turbo_function calls"),
770 persistence,
771 self,
772 ))
773 }
774
775 pub fn dynamic_call(
776 &self,
777 native_fn: &'static NativeFunction,
778 this: Option<RawVc>,
779 arg: &mut dyn StackDynTaskInputs,
780 persistence: TaskPersistence,
781 ) -> RawVc {
782 if this.is_none_or(|this| this.is_resolved())
783 && native_fn.arg_meta.is_resolved(arg.as_ref())
784 {
785 return self.native_call(native_fn, this, arg, persistence);
786 }
787 let arg = arg.take_box();
789 let task_type = LocalTaskSpec {
790 task_type: LocalTaskType::ResolveNative { native_fn },
791 this,
792 arg,
793 };
794 self.schedule_local_task(task_type, persistence)
795 }
796
797 pub fn trait_call(
798 &self,
799 trait_method: &'static TraitMethod,
800 this: RawVc,
801 arg: &mut dyn StackDynTaskInputs,
802 persistence: TaskPersistence,
803 ) -> RawVc {
804 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
808 match registry::get_value_type(type_id).get_trait_method(trait_method) {
809 Some(native_fn) => {
810 if let Some(filter) = native_fn.arg_meta.filter_owned {
811 let mut arg = (filter)(arg);
812 return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
813 } else {
814 return self.dynamic_call(native_fn, Some(this), arg, persistence);
815 }
816 }
817 None => {
818 }
822 }
823 }
824
825 let task_type = LocalTaskSpec {
827 task_type: LocalTaskType::ResolveTrait { trait_method },
828 this: Some(this),
829 arg: arg.take_box(),
830 };
831
832 self.schedule_local_task(task_type, persistence)
833 }
834
835 #[track_caller]
836 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
837 self.begin_foreground_job();
838 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
839
840 self.priority_runner.schedule(
841 &self.pin(),
842 ScheduledTask::Task {
843 task_id,
844 span: Span::current(),
845 },
846 priority,
847 );
848 }
849
850 fn schedule_local_task(
851 &self,
852 ty: LocalTaskSpec,
853 persistence: TaskPersistence,
855 ) -> RawVc {
856 let task_type = ty.task_type;
857 let (global_task_state, execution_id, priority, local_task_id) =
858 CURRENT_TASK_STATE.with(|gts| {
859 let mut gts_write = gts.write().unwrap();
860 let local_task_id = gts_write.local_tasks.create(task_type);
861 (
862 Arc::clone(gts),
863 gts_write.execution_id,
864 gts_write.priority,
865 local_task_id,
866 )
867 });
868
869 self.priority_runner.schedule(
870 &self.pin(),
871 ScheduledTask::LocalTask {
872 ty,
873 persistence,
874 local_task_id,
875 global_task_state,
876 span: Span::current(),
877 },
878 priority,
879 );
880
881 RawVc::LocalOutput(execution_id, local_task_id, persistence)
882 }
883
884 fn begin_foreground_job(&self) {
885 if self
886 .currently_scheduled_foreground_jobs
887 .fetch_add(1, Ordering::AcqRel)
888 == 0
889 {
890 *self.start.lock().unwrap() = Some(Instant::now());
891 self.event_foreground_start.notify(usize::MAX);
892 self.backend.idle_end(self);
893 }
894 }
895
896 fn finish_foreground_job(&self) {
897 if self
898 .currently_scheduled_foreground_jobs
899 .fetch_sub(1, Ordering::AcqRel)
900 == 1
901 {
902 self.backend.idle_start(self);
903 let total = self.scheduled_tasks.load(Ordering::Acquire);
906 self.scheduled_tasks.store(0, Ordering::Release);
907 if let Some(start) = *self.start.lock().unwrap() {
908 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
909 if let Some(update) = update.as_mut() {
910 update.0 += start.elapsed();
911 update.1 += total;
912 } else {
913 *update = Some((start.elapsed(), total));
914 }
915 }
916 self.event_foreground_done.notify(usize::MAX);
917 }
918 }
919
920 fn begin_background_job(&self) {
921 self.currently_scheduled_background_jobs
922 .fetch_add(1, Ordering::Relaxed);
923 }
924
925 fn finish_background_job(&self) {
926 if self
927 .currently_scheduled_background_jobs
928 .fetch_sub(1, Ordering::Relaxed)
929 == 1
930 {
931 self.event_background_done.notify(usize::MAX);
932 }
933 }
934
935 pub fn get_in_progress_count(&self) -> usize {
936 self.currently_scheduled_foreground_jobs
937 .load(Ordering::Acquire)
938 }
939
940 pub async fn wait_task_completion(
952 &self,
953 id: TaskId,
954 consistency: ReadConsistency,
955 ) -> Result<()> {
956 read_task_output(
957 self,
958 id,
959 ReadOutputOptions {
960 tracking: ReadTracking::Untracked,
962 consistency,
963 },
964 )
965 .await?;
966 Ok(())
967 }
968
969 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
972 self.aggregated_update_info(aggregation, Duration::MAX)
973 .await
974 .unwrap()
975 }
976
977 pub async fn aggregated_update_info(
981 &self,
982 aggregation: Duration,
983 timeout: Duration,
984 ) -> Option<UpdateInfo> {
985 let listener = self
986 .event_foreground_done
987 .listen_with_note(|| || "wait for update info".to_string());
988 let wait_for_finish = {
989 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
990 if aggregation.is_zero() {
991 if let Some((duration, tasks)) = update.take() {
992 return Some(UpdateInfo {
993 duration,
994 tasks,
995 reasons: take(reason_set),
996 placeholder_for_future_fields: (),
997 });
998 } else {
999 true
1000 }
1001 } else {
1002 update.is_none()
1003 }
1004 };
1005 if wait_for_finish {
1006 if timeout == Duration::MAX {
1007 listener.await;
1009 } else {
1010 let start_listener = self
1012 .event_foreground_start
1013 .listen_with_note(|| || "wait for update info".to_string());
1014 if self
1015 .currently_scheduled_foreground_jobs
1016 .load(Ordering::Acquire)
1017 == 0
1018 {
1019 start_listener.await;
1020 } else {
1021 drop(start_listener);
1022 }
1023 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1024 return None;
1026 }
1027 }
1028 }
1029 if !aggregation.is_zero() {
1030 loop {
1031 select! {
1032 () = tokio::time::sleep(aggregation) => {
1033 break;
1034 }
1035 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1036 }
1038 }
1039 }
1040 }
1041 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1042 if let Some((duration, tasks)) = update.take() {
1043 Some(UpdateInfo {
1044 duration,
1045 tasks,
1046 reasons: take(reason_set),
1047 placeholder_for_future_fields: (),
1048 })
1049 } else {
1050 panic!("aggregated_update_info must not called concurrently")
1051 }
1052 }
1053
1054 pub async fn wait_background_done(&self) {
1055 let listener = self.event_background_done.listen();
1056 if self
1057 .currently_scheduled_background_jobs
1058 .load(Ordering::Acquire)
1059 != 0
1060 {
1061 listener.await;
1062 }
1063 }
1064
1065 pub async fn stop_and_wait(&self) {
1066 turbo_tasks_future_scope(self.pin(), async move {
1067 self.backend.stopping(self);
1068 self.stopped.store(true, Ordering::Release);
1069 {
1070 let listener = self
1071 .event_foreground_done
1072 .listen_with_note(|| || "wait for stop".to_string());
1073 if self
1074 .currently_scheduled_foreground_jobs
1075 .load(Ordering::Acquire)
1076 != 0
1077 {
1078 listener.await;
1079 }
1080 }
1081 {
1082 let listener = self.event_background_done.listen();
1083 if self
1084 .currently_scheduled_background_jobs
1085 .load(Ordering::Acquire)
1086 != 0
1087 {
1088 listener.await;
1089 }
1090 }
1091 self.backend.stop(self);
1092 })
1093 .await;
1094 }
1095
1096 #[track_caller]
1097 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1098 where
1099 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1100 T::CallOnceFuture: Send,
1101 {
1102 let mut this = self.pin();
1103 this.begin_foreground_job();
1104 tokio::spawn(
1105 TURBO_TASKS
1106 .scope(this.clone(), async move {
1107 if !this.stopped.load(Ordering::Acquire) {
1108 this = func(this.clone()).await;
1109 }
1110 this.finish_foreground_job();
1111 })
1112 .in_current_span(),
1113 );
1114 }
1115
1116 #[track_caller]
1117 pub(crate) fn schedule_background_job<T>(&self, func: T)
1118 where
1119 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1120 T::CallOnceFuture: Send,
1121 {
1122 let mut this = self.pin();
1123 self.begin_background_job();
1124 tokio::spawn(
1125 TURBO_TASKS
1126 .scope(this.clone(), async move {
1127 if !this.stopped.load(Ordering::Acquire) {
1128 this = func(this).await;
1129 }
1130 this.finish_background_job();
1131 })
1132 .in_current_span(),
1133 );
1134 }
1135
1136 fn finish_current_task_state(&self) -> FinishedTaskState {
1137 CURRENT_TASK_STATE.with(|cell| {
1138 let current_task_state = &*cell.write().unwrap();
1139 FinishedTaskState {
1140 #[cfg(feature = "verify_determinism")]
1141 stateful: current_task_state.stateful,
1142 has_invalidator: current_task_state.has_invalidator,
1143 }
1144 })
1145 }
1146
1147 pub fn backend(&self) -> &B {
1148 &self.backend
1149 }
1150}
1151
1152struct TurboTasksExecutor;
1153
1154async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1159 match AssertUnwindSafe(f).catch_unwind().await {
1160 Ok(r) => r,
1161 Err(_) => {
1162 eprintln!(
1163 "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1164 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1165 https://github.com/vercel/next.js/discussions and include the panic message \
1166 and stack trace above.\n\nAborting."
1167 );
1168 abort();
1169 }
1170 }
1171}
1172
1173impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1174 type Future = impl Future<Output = ()> + Send + 'static;
1175
1176 fn execute(
1177 &self,
1178 this: &Arc<TurboTasks<B>>,
1179 scheduled_task: ScheduledTask,
1180 priority: TaskPriority,
1181 ) -> Self::Future {
1182 match scheduled_task {
1183 ScheduledTask::Task { task_id, span } => {
1184 let this2 = this.clone();
1185 let this = this.clone();
1186 let future = async move {
1187 abort_on_panic(async {
1188 let mut schedule_again = true;
1189 while schedule_again {
1190 let execution_id = this.execution_id_factory.wrapping_get();
1193 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1194 task_id,
1195 execution_id,
1196 priority,
1197 false, )));
1199 let single_execution_future = async {
1200 if this.stopped.load(Ordering::Acquire) {
1201 this.backend.task_execution_canceled(task_id, &*this);
1202 return false;
1203 }
1204
1205 let Some(TaskExecutionSpec { future, span }) = this
1206 .backend
1207 .try_start_task_execution(task_id, priority, &*this)
1208 else {
1209 return false;
1210 };
1211
1212 async {
1213 let result = CaptureFuture::new(future).await;
1214
1215 wait_for_local_tasks().await;
1217
1218 let result = match result {
1219 Ok(Ok(raw_vc)) => {
1220 raw_vc
1223 .to_non_local_unchecked_sync(&*this)
1224 .map_err(|err| err.into())
1225 }
1226 Ok(Err(err)) => Err(err.into()),
1227 Err(err) => {
1228 Err(TurboTasksExecutionError::Panic(Arc::new(err)))
1229 }
1230 };
1231
1232 let finished_state = this.finish_current_task_state();
1233 let cell_counters = CURRENT_TASK_STATE.with(|ts| {
1234 ts.write().unwrap().cell_counters.take().unwrap()
1235 });
1236 this.backend.task_execution_completed(
1237 task_id,
1238 result,
1239 &cell_counters,
1240 #[cfg(feature = "verify_determinism")]
1241 finished_state.stateful,
1242 finished_state.has_invalidator,
1243 &*this,
1244 )
1245 }
1246 .instrument(span)
1247 .await
1248 };
1249 schedule_again = CURRENT_TASK_STATE
1250 .scope(current_task_state, single_execution_future)
1251 .await;
1252 }
1253 this.finish_foreground_job();
1254 })
1255 .await
1256 };
1257
1258 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1259 }
1260 ScheduledTask::LocalTask {
1261 ty,
1262 persistence,
1263 local_task_id,
1264 global_task_state,
1265 span,
1266 } => {
1267 let this2 = this.clone();
1268 let this = this.clone();
1269 let task_type = ty.task_type;
1270 let future = async move {
1271 let span = match &ty.task_type {
1272 LocalTaskType::ResolveNative { native_fn } => {
1273 native_fn.resolve_span(priority)
1274 }
1275 LocalTaskType::ResolveTrait { trait_method } => {
1276 trait_method.resolve_span(priority)
1277 }
1278 };
1279 abort_on_panic(
1280 async move {
1281 let result = match ty.task_type {
1282 LocalTaskType::ResolveNative { native_fn } => {
1283 LocalTaskType::run_resolve_native(
1284 native_fn,
1285 ty.this,
1286 &*ty.arg,
1287 persistence,
1288 this,
1289 )
1290 .await
1291 }
1292 LocalTaskType::ResolveTrait { trait_method } => {
1293 LocalTaskType::run_resolve_trait(
1294 trait_method,
1295 ty.this.unwrap(),
1296 &*ty.arg,
1297 persistence,
1298 this,
1299 )
1300 .await
1301 }
1302 };
1303
1304 let output = match result {
1305 Ok(raw_vc) => OutputContent::Link(raw_vc),
1306 Err(err) => OutputContent::Error(
1307 TurboTasksExecutionError::from(err)
1308 .with_local_task_context(task_type.to_string()),
1309 ),
1310 };
1311
1312 CURRENT_TASK_STATE.with(move |gts| {
1313 gts.write()
1314 .unwrap()
1315 .local_tasks
1316 .complete(local_task_id, output);
1317 });
1318 }
1319 .instrument(span),
1320 )
1321 .await
1322 };
1323 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1324
1325 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1326 }
1327 }
1328 }
1329}
1330
1331struct FinishedTaskState {
1332 #[cfg(feature = "verify_determinism")]
1335 stateful: bool,
1336
1337 has_invalidator: bool,
1339}
1340
1341impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1342 fn dynamic_call(
1343 &self,
1344 native_fn: &'static NativeFunction,
1345 this: Option<RawVc>,
1346 arg: &mut dyn StackDynTaskInputs,
1347 persistence: TaskPersistence,
1348 ) -> RawVc {
1349 self.dynamic_call(native_fn, this, arg, persistence)
1350 }
1351 fn native_call(
1352 &self,
1353 native_fn: &'static NativeFunction,
1354 this: Option<RawVc>,
1355 arg: &mut dyn StackDynTaskInputs,
1356 persistence: TaskPersistence,
1357 ) -> RawVc {
1358 self.native_call(native_fn, this, arg, persistence)
1359 }
1360 fn trait_call(
1361 &self,
1362 trait_method: &'static TraitMethod,
1363 this: RawVc,
1364 arg: &mut dyn StackDynTaskInputs,
1365 persistence: TaskPersistence,
1366 ) -> RawVc {
1367 self.trait_call(trait_method, this, arg, persistence)
1368 }
1369
1370 #[track_caller]
1371 fn run(
1372 &self,
1373 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1374 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1375 let this = self.pin();
1376 Box::pin(async move { this.run(future).await })
1377 }
1378
1379 #[track_caller]
1380 fn run_once(
1381 &self,
1382 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1383 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1384 let this = self.pin();
1385 Box::pin(async move { this.run_once(future).await })
1386 }
1387
1388 #[track_caller]
1389 fn run_once_with_reason(
1390 &self,
1391 reason: StaticOrArc<dyn InvalidationReason>,
1392 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1393 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1394 {
1395 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1396 reason_set.insert(reason);
1397 }
1398 let this = self.pin();
1399 Box::pin(async move { this.run_once(future).await })
1400 }
1401
1402 #[track_caller]
1403 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1404 self.start_once_process(future)
1405 }
1406
1407 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1408 if let Err(e) = self.compilation_events.send(event) {
1409 tracing::warn!("Failed to send compilation event: {e}");
1410 }
1411 }
1412
1413 fn get_task_name(&self, task: TaskId) -> String {
1414 self.backend.get_task_name(task, self)
1415 }
1416}
1417
1418impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1419 #[instrument(level = "info", skip_all, name = "invalidate")]
1420 fn invalidate(&self, task: TaskId) {
1421 self.backend.invalidate_task(task, self);
1422 }
1423
1424 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1425 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1426 {
1427 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1428 reason_set.insert(reason);
1429 }
1430 self.backend.invalidate_task(task, self);
1431 }
1432
1433 fn invalidate_serialization(&self, task: TaskId) {
1434 self.backend.invalidate_serialization(task, self);
1435 }
1436
1437 #[track_caller]
1438 fn try_read_task_output(
1439 &self,
1440 task: TaskId,
1441 options: ReadOutputOptions,
1442 ) -> Result<Result<RawVc, EventListener>> {
1443 if options.consistency == ReadConsistency::Eventual {
1444 debug_assert_not_in_top_level_task("read_task_output");
1445 }
1446 self.backend.try_read_task_output(
1447 task,
1448 current_task_if_available("reading Vcs"),
1449 options,
1450 self,
1451 )
1452 }
1453
1454 #[track_caller]
1455 fn try_read_task_cell(
1456 &self,
1457 task: TaskId,
1458 index: CellId,
1459 options: ReadCellOptions,
1460 ) -> Result<Result<TypedCellContent, EventListener>> {
1461 let reader = current_task_if_available("reading Vcs");
1462 self.backend
1463 .try_read_task_cell(task, index, reader, options, self)
1464 }
1465
1466 fn try_read_own_task_cell(
1467 &self,
1468 current_task: TaskId,
1469 index: CellId,
1470 ) -> Result<TypedCellContent> {
1471 self.backend
1472 .try_read_own_task_cell(current_task, index, self)
1473 }
1474
1475 #[track_caller]
1476 fn try_read_local_output(
1477 &self,
1478 execution_id: ExecutionId,
1479 local_task_id: LocalTaskId,
1480 ) -> Result<Result<RawVc, EventListener>> {
1481 debug_assert_not_in_top_level_task("read_local_output");
1482 CURRENT_TASK_STATE.with(|gts| {
1483 let gts_read = gts.read().unwrap();
1484
1485 gts_read.assert_execution_id(execution_id);
1490
1491 match gts_read.local_tasks.get(local_task_id) {
1492 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1493 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1494 }
1495 })
1496 }
1497
1498 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1499 self.backend.read_task_collectibles(
1502 task,
1503 trait_id,
1504 current_task_if_available("reading collectibles"),
1505 self,
1506 )
1507 }
1508
1509 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1510 self.backend.emit_collectible(
1511 trait_type,
1512 collectible,
1513 current_task("emitting collectible"),
1514 self,
1515 );
1516 }
1517
1518 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1519 self.backend.unemit_collectible(
1520 trait_type,
1521 collectible,
1522 count,
1523 current_task("emitting collectible"),
1524 self,
1525 );
1526 }
1527
1528 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1529 for (&collectible, &count) in collectibles {
1530 if count > 0 {
1531 self.backend.unemit_collectible(
1532 trait_type,
1533 collectible,
1534 count as u32,
1535 current_task("emitting collectible"),
1536 self,
1537 );
1538 }
1539 }
1540 }
1541
1542 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1543 self.try_read_own_task_cell(task, index)
1544 }
1545
1546 fn update_own_task_cell(
1547 &self,
1548 task: TaskId,
1549 index: CellId,
1550 content: CellContent,
1551 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1552 content_hash: Option<CellHash>,
1553 verification_mode: VerificationMode,
1554 ) {
1555 self.backend.update_task_cell(
1556 task,
1557 index,
1558 content,
1559 updated_key_hashes,
1560 content_hash,
1561 verification_mode,
1562 self,
1563 );
1564 }
1565
1566 fn connect_task(&self, task: TaskId) {
1567 self.backend
1568 .connect_task(task, current_task_if_available("connecting task"), self);
1569 }
1570
1571 fn mark_own_task_as_finished(&self, task: TaskId) {
1572 self.backend.mark_own_task_as_finished(task, self);
1573 }
1574
1575 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1576 self.backend.mark_own_task_as_session_dependent(task, self);
1577 }
1578
1579 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1582 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1585 global_task_state
1586 .write()
1587 .unwrap()
1588 .local_tasks
1589 .register_detached();
1590 let wrapped = async move {
1591 struct DropGuard;
1593 impl Drop for DropGuard {
1594 fn drop(&mut self) {
1595 CURRENT_TASK_STATE
1596 .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1597 }
1598 }
1599 let _guard = DropGuard;
1600 fut.await;
1601 };
1602 tokio::spawn(TURBO_TASKS.scope(
1603 turbo_tasks(),
1604 CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1605 ));
1606 }
1607
1608 fn task_statistics(&self) -> &TaskStatisticsApi {
1609 self.backend.task_statistics()
1610 }
1611
1612 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1613 let this = self.pin();
1614 Box::pin(async move {
1615 this.stop_and_wait().await;
1616 })
1617 }
1618
1619 fn subscribe_to_compilation_events(
1620 &self,
1621 event_types: Option<Vec<String>>,
1622 ) -> Receiver<Arc<dyn CompilationEvent>> {
1623 self.compilation_events.subscribe(event_types)
1624 }
1625
1626 fn is_tracking_dependencies(&self) -> bool {
1627 self.backend.is_tracking_dependencies()
1628 }
1629}
1630
1631impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1632 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1633 self.pin()
1634 }
1635 fn backend(&self) -> &B {
1636 &self.backend
1637 }
1638
1639 #[track_caller]
1640 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1641 self.schedule_background_job(async move |this| {
1642 this.backend.run_backend_job(job, &*this).await;
1643 this
1644 })
1645 }
1646
1647 #[track_caller]
1648 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1649 self.schedule_foreground_job(async move |this| {
1650 this.backend.run_backend_job(job, &*this).await;
1651 this
1652 })
1653 }
1654
1655 #[track_caller]
1656 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1657 self.schedule(task, priority)
1658 }
1659
1660 fn get_current_task_priority(&self) -> TaskPriority {
1661 CURRENT_TASK_STATE
1662 .try_with(|task_state| task_state.read().unwrap().priority)
1663 .unwrap_or(TaskPriority::initial())
1664 }
1665
1666 fn program_duration_until(&self, instant: Instant) -> Duration {
1667 instant - self.program_start
1668 }
1669
1670 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1671 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1673 }
1674
1675 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1676 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1678 }
1679
1680 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1681 unsafe { self.task_id_factory.reuse(id.into()) }
1682 }
1683
1684 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1685 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1686 }
1687
1688 fn is_idle(&self) -> bool {
1689 self.currently_scheduled_foreground_jobs
1690 .load(Ordering::Acquire)
1691 == 0
1692 }
1693}
1694
1695async fn wait_for_local_tasks() {
1696 let listener =
1697 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1698 let Some(listener) = listener else {
1699 return;
1700 };
1701 listener.await;
1702}
1703
1704pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1705 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1706 Ok(id) => id,
1707 Err(_) => panic!(
1708 "{from} can only be used in the context of a turbo_tasks task execution or \
1709 turbo_tasks run"
1710 ),
1711 }
1712}
1713
1714pub(crate) fn current_task(from: &str) -> TaskId {
1715 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1716 Ok(Some(id)) => id,
1717 Ok(None) | Err(_) => {
1718 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1719 }
1720 }
1721}
1722
1723#[track_caller]
1726pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1727 if !cfg!(debug_assertions) {
1728 return;
1729 }
1730
1731 let in_top_level = CURRENT_TASK_STATE
1732 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1733 .unwrap_or(true);
1734 if !in_top_level {
1735 panic!("{message}");
1736 }
1737}
1738
1739#[track_caller]
1740pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1741 if !cfg!(debug_assertions) {
1742 return;
1743 }
1744
1745 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1748 .try_with(|&suppressed| suppressed)
1749 .unwrap_or(false);
1750 if suppressed {
1751 return;
1752 }
1753
1754 let in_top_level = CURRENT_TASK_STATE
1755 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1756 .unwrap_or(false);
1757 if in_top_level {
1758 panic!(
1759 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1760 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1761 reads to avoid leaking inconsistent return values."
1762 );
1763 }
1764}
1765
1766pub async fn run<T: Send + 'static>(
1767 tt: Arc<dyn TurboTasksApi>,
1768 future: impl Future<Output = Result<T>> + Send + 'static,
1769) -> Result<T> {
1770 let (tx, rx) = tokio::sync::oneshot::channel();
1771
1772 tt.run(Box::pin(async move {
1773 let result = future.await?;
1774 tx.send(result)
1775 .map_err(|_| anyhow!("unable to send result"))?;
1776 Ok(())
1777 }))
1778 .await?;
1779
1780 Ok(rx.await?)
1781}
1782
1783pub async fn run_once<T: Send + 'static>(
1784 tt: Arc<dyn TurboTasksApi>,
1785 future: impl Future<Output = Result<T>> + Send + 'static,
1786) -> Result<T> {
1787 let (tx, rx) = tokio::sync::oneshot::channel();
1788
1789 tt.run_once(Box::pin(async move {
1790 let result = future.await?;
1791 tx.send(result)
1792 .map_err(|_| anyhow!("unable to send result"))?;
1793 Ok(())
1794 }))
1795 .await?;
1796
1797 Ok(rx.await?)
1798}
1799
1800pub async fn run_once_with_reason<T: Send + 'static>(
1801 tt: Arc<dyn TurboTasksApi>,
1802 reason: impl InvalidationReason,
1803 future: impl Future<Output = Result<T>> + Send + 'static,
1804) -> Result<T> {
1805 let (tx, rx) = tokio::sync::oneshot::channel();
1806
1807 tt.run_once_with_reason(
1808 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1809 Box::pin(async move {
1810 let result = future.await?;
1811 tx.send(result)
1812 .map_err(|_| anyhow!("unable to send result"))?;
1813 Ok(())
1814 }),
1815 )
1816 .await?;
1817
1818 Ok(rx.await?)
1819}
1820
1821pub fn dynamic_call(
1823 func: &'static NativeFunction,
1824 this: Option<RawVc>,
1825 arg: &mut dyn StackDynTaskInputs,
1826 persistence: TaskPersistence,
1827) -> RawVc {
1828 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1829}
1830
1831pub fn trait_call(
1833 trait_method: &'static TraitMethod,
1834 this: RawVc,
1835 arg: &mut dyn StackDynTaskInputs,
1836 persistence: TaskPersistence,
1837) -> RawVc {
1838 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1839}
1840
1841pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1842 TURBO_TASKS.with(|arc| arc.clone())
1843}
1844
1845pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1846 TURBO_TASKS.with(Arc::downgrade)
1847}
1848
1849pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1850 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1851}
1852
1853pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1854 TURBO_TASKS.with(|arc| func(arc))
1855}
1856
1857pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1858 TURBO_TASKS.sync_scope(tt, f)
1859}
1860
1861pub fn turbo_tasks_future_scope<T>(
1862 tt: Arc<dyn TurboTasksApi>,
1863 f: impl Future<Output = T>,
1864) -> impl Future<Output = T> {
1865 TURBO_TASKS.scope(tt, f)
1866}
1867
1868pub fn with_turbo_tasks_for_testing<T>(
1869 tt: Arc<dyn TurboTasksApi>,
1870 current_task: TaskId,
1871 execution_id: ExecutionId,
1872 f: impl Future<Output = T>,
1873) -> impl Future<Output = T> {
1874 TURBO_TASKS.scope(
1875 tt,
1876 CURRENT_TASK_STATE.scope(
1877 Arc::new(RwLock::new(CurrentTaskState::new(
1878 current_task,
1879 execution_id,
1880 TaskPriority::initial(),
1881 false, ))),
1883 f,
1884 ),
1885 )
1886}
1887
1888pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1893 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1894}
1895
1896pub fn current_task_for_testing() -> Option<TaskId> {
1897 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1898}
1899
1900pub fn mark_session_dependent() {
1902 with_turbo_tasks(|tt| {
1903 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1904 });
1905}
1906
1907pub fn mark_finished() {
1910 with_turbo_tasks(|tt| {
1911 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1912 });
1913}
1914
1915pub fn get_serialization_invalidator() -> SerializationInvalidator {
1921 CURRENT_TASK_STATE.with(|cell| {
1922 let CurrentTaskState {
1923 task_id,
1924 #[cfg(feature = "verify_determinism")]
1925 stateful,
1926 ..
1927 } = &mut *cell.write().unwrap();
1928 #[cfg(feature = "verify_determinism")]
1929 {
1930 *stateful = true;
1931 }
1932 let Some(task_id) = *task_id else {
1933 panic!(
1934 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1935 task execution"
1936 );
1937 };
1938 SerializationInvalidator::new(task_id)
1939 })
1940}
1941
1942pub fn mark_invalidator() {
1943 CURRENT_TASK_STATE.with(|cell| {
1944 let CurrentTaskState {
1945 has_invalidator, ..
1946 } = &mut *cell.write().unwrap();
1947 *has_invalidator = true;
1948 })
1949}
1950
1951pub fn mark_stateful() {
1957 #[cfg(feature = "verify_determinism")]
1958 {
1959 CURRENT_TASK_STATE.with(|cell| {
1960 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1961 *stateful = true;
1962 })
1963 }
1964 }
1966
1967pub fn mark_top_level_task() {
1971 if cfg!(debug_assertions) {
1972 CURRENT_TASK_STATE.with(|cell| {
1973 cell.write().unwrap().in_top_level_task = true;
1974 })
1975 }
1976}
1977
1978pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1989 if cfg!(debug_assertions) {
1990 CURRENT_TASK_STATE.with(|cell| {
1991 cell.write().unwrap().in_top_level_task = false;
1992 })
1993 }
1994}
1995
1996pub fn prevent_gc() {
1997 }
1999
2000pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
2001 with_turbo_tasks(|tt| {
2002 let raw_vc = collectible.node.node;
2003 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2004 })
2005}
2006
2007pub(crate) async fn read_task_output(
2008 this: &dyn TurboTasksApi,
2009 id: TaskId,
2010 options: ReadOutputOptions,
2011) -> Result<RawVc> {
2012 loop {
2013 match this.try_read_task_output(id, options)? {
2014 Ok(result) => return Ok(result),
2015 Err(listener) => listener.await,
2016 }
2017 }
2018}
2019
2020#[derive(Clone, Copy)]
2026pub struct CurrentCellRef {
2027 current_task: TaskId,
2028 index: CellId,
2029}
2030
2031type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2032
2033impl CurrentCellRef {
2034 fn conditional_update<T>(
2036 &self,
2037 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2038 ) where
2039 T: VcValueType,
2040 {
2041 self.conditional_update_with_shared_reference(|old_shared_reference| {
2042 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2043 let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2044 Some((
2045 SharedReference::new(triomphe::Arc::new(new_value)),
2046 updated_key_hashes,
2047 content_hash,
2048 ))
2049 })
2050 }
2051
2052 fn conditional_update_with_shared_reference(
2054 &self,
2055 functor: impl FnOnce(
2056 Option<&SharedReference>,
2057 ) -> Option<(
2058 SharedReference,
2059 Option<SmallVec<[u64; 2]>>,
2060 Option<CellHash>,
2061 )>,
2062 ) {
2063 let tt = turbo_tasks();
2064 let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2065 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2066 if let Some((update, updated_key_hashes, content_hash)) = update {
2067 tt.update_own_task_cell(
2068 self.current_task,
2069 self.index,
2070 CellContent(Some(update)),
2071 updated_key_hashes,
2072 content_hash,
2073 VerificationMode::EqualityCheck,
2074 )
2075 }
2076 }
2077
2078 pub fn compare_and_update<T>(&self, new_value: T)
2112 where
2113 T: PartialEq + VcValueType,
2114 {
2115 self.conditional_update(|old_value| {
2116 if let Some(old_value) = old_value
2117 && old_value == &new_value
2118 {
2119 return None;
2120 }
2121 Some((new_value, None, None))
2122 });
2123 }
2124
2125 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2133 where
2134 T: VcValueType + PartialEq,
2135 {
2136 self.conditional_update_with_shared_reference(|old_sr| {
2137 if let Some(old_sr) = old_sr {
2138 let old_value = extract_sr_value::<T>(old_sr);
2139 let new_value = extract_sr_value::<T>(&new_shared_reference);
2140 if old_value == new_value {
2141 return None;
2142 }
2143 }
2144 Some((new_shared_reference, None, None))
2145 });
2146 }
2147
2148 pub fn hashed_compare_and_update<T>(&self, new_value: T)
2157 where
2158 T: PartialEq + DeterministicHash + VcValueType,
2159 {
2160 self.conditional_update(|old_value| {
2161 if let Some(old_value) = old_value
2162 && old_value == &new_value
2163 {
2164 return None;
2165 }
2166 let content_hash = hash_xxh3_hash128(&new_value);
2167 Some((new_value, None, Some(content_hash)))
2168 });
2169 }
2170
2171 pub fn hashed_compare_and_update_with_shared_reference<T>(
2177 &self,
2178 new_shared_reference: SharedReference,
2179 ) where
2180 T: VcValueType + PartialEq + DeterministicHash,
2181 {
2182 self.conditional_update_with_shared_reference(move |old_sr| {
2183 if let Some(old_sr) = old_sr {
2184 let old_value = extract_sr_value::<T>(old_sr);
2185 let new_value = extract_sr_value::<T>(&new_shared_reference);
2186 if old_value == new_value {
2187 return None;
2188 }
2189 }
2190 let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2191 Some((new_shared_reference, None, Some(content_hash)))
2192 });
2193 }
2194
2195 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2197 where
2198 T: PartialEq + VcValueType,
2199 VcReadTarget<T>: KeyedEq,
2200 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2201 {
2202 self.conditional_update(|old_value| {
2203 let Some(old_value) = old_value else {
2204 return Some((new_value, None, None));
2205 };
2206 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2207 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2208 let updated_keys = old_value.different_keys(new_value_ref);
2209 if updated_keys.is_empty() {
2210 return None;
2211 }
2212 let updated_key_hashes = updated_keys
2214 .into_iter()
2215 .map(|key| FxBuildHasher.hash_one(key))
2216 .collect();
2217 Some((new_value, Some(updated_key_hashes), None))
2218 });
2219 }
2220
2221 pub fn keyed_compare_and_update_with_shared_reference<T>(
2224 &self,
2225 new_shared_reference: SharedReference,
2226 ) where
2227 T: VcValueType + PartialEq,
2228 VcReadTarget<T>: KeyedEq,
2229 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2230 {
2231 self.conditional_update_with_shared_reference(|old_sr| {
2232 let Some(old_sr) = old_sr else {
2233 return Some((new_shared_reference, None, None));
2234 };
2235 let old_value = extract_sr_value::<T>(old_sr);
2236 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2237 let new_value = extract_sr_value::<T>(&new_shared_reference);
2238 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2239 let updated_keys = old_value.different_keys(new_value);
2240 if updated_keys.is_empty() {
2241 return None;
2242 }
2243 let updated_key_hashes = updated_keys
2245 .into_iter()
2246 .map(|key| FxBuildHasher.hash_one(key))
2247 .collect();
2248 Some((new_shared_reference, Some(updated_key_hashes), None))
2249 });
2250 }
2251
2252 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2254 where
2255 T: VcValueType,
2256 {
2257 let tt = turbo_tasks();
2258 tt.update_own_task_cell(
2259 self.current_task,
2260 self.index,
2261 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2262 None,
2263 None,
2264 verification_mode,
2265 )
2266 }
2267
2268 pub fn update_with_shared_reference(
2276 &self,
2277 shared_ref: SharedReference,
2278 verification_mode: VerificationMode,
2279 ) {
2280 let tt = turbo_tasks();
2281 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2282 let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2283 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2284 shared_ref_exp != shared_ref
2286 } else {
2287 true
2288 }
2289 } else {
2290 true
2291 };
2292 if update {
2293 tt.update_own_task_cell(
2294 self.current_task,
2295 self.index,
2296 CellContent(Some(shared_ref)),
2297 None,
2298 None,
2299 verification_mode,
2300 )
2301 }
2302 }
2303}
2304
2305impl From<CurrentCellRef> for RawVc {
2306 fn from(cell: CurrentCellRef) -> Self {
2307 RawVc::TaskCell(cell.current_task, cell.index)
2308 }
2309}
2310
2311fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2312 sr.0.downcast_ref::<T>()
2313 .expect("cannot update SharedReference of different type")
2314}
2315
2316pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2317 find_cell_by_id(T::get_value_type_id())
2318}
2319
2320pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2321 CURRENT_TASK_STATE.with(|ts| {
2322 let current_task = current_task("celling turbo_tasks values");
2323 let mut ts = ts.write().unwrap();
2324 let map = ts.cell_counters.as_mut().unwrap();
2325 let current_index = map.entry(ty).or_default();
2326 let index = *current_index;
2327 *current_index += 1;
2328 CurrentCellRef {
2329 current_task,
2330 index: CellId { type_id: ty, index },
2331 }
2332 })
2333}
2334
2335pub(crate) async fn read_local_output(
2336 this: &dyn TurboTasksApi,
2337 execution_id: ExecutionId,
2338 local_task_id: LocalTaskId,
2339) -> Result<RawVc> {
2340 loop {
2341 match this.try_read_local_output(execution_id, local_task_id)? {
2342 Ok(raw_vc) => return Ok(raw_vc),
2343 Err(event_listener) => event_listener.await,
2344 }
2345 }
2346}