1use std::{
2 cmp::Reverse,
3 fmt::{Debug, Display},
4 future::Future,
5 hash::{BuildHasher, BuildHasherDefault},
6 mem::take,
7 panic::AssertUnwindSafe,
8 pin::Pin,
9 process::abort,
10 sync::{
11 Arc, Mutex, RwLock, Weak,
12 atomic::{AtomicBool, AtomicUsize, Ordering},
13 },
14 time::{Duration, Instant},
15};
16
17use anyhow::{Result, anyhow};
18use auto_hash_map::AutoMap;
19use bincode::{Decode, Encode};
20use either::Either;
21use futures::FutureExt;
22use rustc_hash::{FxBuildHasher, FxHasher};
23use serde::{Deserialize, Serialize};
24use smallvec::SmallVec;
25use tokio::{select, sync::mpsc::Receiver, task_local};
26use tracing::{Instrument, Span, instrument};
27use turbo_tasks_hash::{DeterministicHash, hash_xxh3_hash128};
28
29use crate::{
30 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
31 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
32 VcValueTrait, VcValueType,
33 backend::{
34 Backend, CellContent, CellHash, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType,
35 TurboTasksExecutionError, TypedCellContent, VerificationMode,
36 },
37 capture_future::CaptureFuture,
38 dyn_task_inputs::StackDynTaskInputs,
39 event::{Event, EventListener},
40 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
41 id_factory::IdFactoryWithReuse,
42 keyed::KeyedEq,
43 local_task_tracker::LocalTaskTracker,
44 macro_helpers::NativeFunction,
45 message_queue::{CompilationEvent, CompilationEventQueue},
46 priority_runner::{Executor, PriorityRunner},
47 raw_vc::{CellId, RawVc},
48 registry,
49 serialization_invalidation::SerializationInvalidator,
50 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
51 task_statistics::TaskStatisticsApi,
52 trace::TraceRawVcs,
53 util::{IdFactory, StaticOrArc},
54};
55
56pub trait TurboTasksCallApi: Sync + Send {
59 fn dynamic_call(
62 &self,
63 native_fn: &'static NativeFunction,
64 this: Option<RawVc>,
65 arg: &mut dyn StackDynTaskInputs,
66 persistence: TaskPersistence,
67 ) -> RawVc;
68 fn native_call(
71 &self,
72 native_fn: &'static NativeFunction,
73 this: Option<RawVc>,
74 arg: &mut dyn StackDynTaskInputs,
75 persistence: TaskPersistence,
76 ) -> RawVc;
77 fn trait_call(
80 &self,
81 trait_method: &'static TraitMethod,
82 this: RawVc,
83 arg: &mut dyn StackDynTaskInputs,
84 persistence: TaskPersistence,
85 ) -> RawVc;
86
87 fn run(
88 &self,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
91 fn run_once(
92 &self,
93 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
95 fn run_once_with_reason(
96 &self,
97 reason: StaticOrArc<dyn InvalidationReason>,
98 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
99 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
100 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
101
102 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
104
105 fn get_task_name(&self, task: TaskId) -> String;
107}
108
109pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
115 fn invalidate(&self, task: TaskId);
116 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
117
118 fn invalidate_serialization(&self, task: TaskId);
119
120 fn try_read_task_output(
121 &self,
122 task: TaskId,
123 options: ReadOutputOptions,
124 ) -> Result<Result<RawVc, EventListener>>;
125
126 fn try_read_task_cell(
127 &self,
128 task: TaskId,
129 index: CellId,
130 options: ReadCellOptions,
131 ) -> Result<Result<TypedCellContent, EventListener>>;
132
133 fn try_read_local_output(
148 &self,
149 execution_id: ExecutionId,
150 local_task_id: LocalTaskId,
151 ) -> Result<Result<RawVc, EventListener>>;
152
153 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
154
155 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
156 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
157 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
158
159 fn try_read_own_task_cell(
162 &self,
163 current_task: TaskId,
164 index: CellId,
165 ) -> Result<TypedCellContent>;
166
167 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
168 fn update_own_task_cell(
169 &self,
170 task: TaskId,
171 index: CellId,
172 content: CellContent,
173 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
174 content_hash: Option<CellHash>,
175 verification_mode: VerificationMode,
176 );
177 fn mark_own_task_as_finished(&self, task: TaskId);
178 fn mark_own_task_as_session_dependent(&self, task: TaskId);
179
180 fn connect_task(&self, task: TaskId);
181
182 fn spawn_detached_for_testing(&self, f: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
187
188 fn task_statistics(&self) -> &TaskStatisticsApi;
189
190 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
191
192 fn subscribe_to_compilation_events(
193 &self,
194 event_types: Option<Vec<String>>,
195 ) -> Receiver<Arc<dyn CompilationEvent>>;
196
197 fn is_tracking_dependencies(&self) -> bool;
199}
200
201pub struct Unused<T> {
203 inner: T,
204}
205
206impl<T> Unused<T> {
207 pub unsafe fn new_unchecked(inner: T) -> Self {
213 Self { inner }
214 }
215
216 pub unsafe fn get_unchecked(&self) -> &T {
222 &self.inner
223 }
224
225 pub fn into(self) -> T {
227 self.inner
228 }
229}
230
231pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
233 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
234
235 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
236 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
237 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
241 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
245
246 fn schedule(&self, task: TaskId, priority: TaskPriority);
248
249 fn get_current_task_priority(&self) -> TaskPriority;
251
252 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
254
255 fn schedule_backend_background_job(&self, job: B::BackendJob);
260
261 fn program_duration_until(&self, instant: Instant) -> Duration;
263
264 fn is_idle(&self) -> bool;
266
267 fn backend(&self) -> &B;
269}
270
271#[allow(clippy::manual_non_exhaustive)]
272pub struct UpdateInfo {
273 pub duration: Duration,
274 pub tasks: usize,
275 pub reasons: InvalidationReasonSet,
276 #[allow(dead_code)]
277 placeholder_for_future_fields: (),
278}
279
280#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
281pub enum TaskPersistence {
282 Persistent,
284
285 Transient,
292}
293
294impl Display for TaskPersistence {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 match self {
297 TaskPersistence::Persistent => write!(f, "persistent"),
298 TaskPersistence::Transient => write!(f, "transient"),
299 }
300 }
301}
302
303#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
304pub enum ReadConsistency {
305 #[default]
308 Eventual,
309 Strong,
314}
315
316#[derive(Clone, Copy, Debug, Eq, PartialEq)]
317pub enum ReadCellTracking {
318 Tracked {
320 key: Option<u64>,
322 },
323 TrackOnlyError,
328 Untracked,
333}
334
335impl ReadCellTracking {
336 pub fn should_track(&self, is_err: bool) -> bool {
337 match self {
338 ReadCellTracking::Tracked { .. } => true,
339 ReadCellTracking::TrackOnlyError => is_err,
340 ReadCellTracking::Untracked => false,
341 }
342 }
343
344 pub fn key(&self) -> Option<u64> {
345 match self {
346 ReadCellTracking::Tracked { key } => *key,
347 ReadCellTracking::TrackOnlyError => None,
348 ReadCellTracking::Untracked => None,
349 }
350 }
351}
352
353impl Default for ReadCellTracking {
354 fn default() -> Self {
355 ReadCellTracking::Tracked { key: None }
356 }
357}
358
359impl Display for ReadCellTracking {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 match self {
362 ReadCellTracking::Tracked { key: None } => write!(f, "tracked"),
363 ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"),
364 ReadCellTracking::TrackOnlyError => write!(f, "track only error"),
365 ReadCellTracking::Untracked => write!(f, "untracked"),
366 }
367 }
368}
369
370#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
371pub enum ReadTracking {
372 #[default]
374 Tracked,
375 TrackOnlyError,
380 Untracked,
385}
386
387impl ReadTracking {
388 pub fn should_track(&self, is_err: bool) -> bool {
389 match self {
390 ReadTracking::Tracked => true,
391 ReadTracking::TrackOnlyError => is_err,
392 ReadTracking::Untracked => false,
393 }
394 }
395}
396
397impl Display for ReadTracking {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 match self {
400 ReadTracking::Tracked => write!(f, "tracked"),
401 ReadTracking::TrackOnlyError => write!(f, "track only error"),
402 ReadTracking::Untracked => write!(f, "untracked"),
403 }
404 }
405}
406
407#[derive(Encode, Decode, Default, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
408pub enum TaskPriority {
409 #[default]
410 Initial,
411 Invalidation {
412 priority: Reverse<u32>,
413 },
414}
415
416impl TaskPriority {
417 pub fn invalidation(priority: u32) -> Self {
418 Self::Invalidation {
419 priority: Reverse(priority),
420 }
421 }
422
423 pub fn initial() -> Self {
424 Self::Initial
425 }
426
427 pub fn leaf() -> Self {
428 Self::Invalidation {
429 priority: Reverse(0),
430 }
431 }
432
433 pub fn in_parent(&self, parent_priority: TaskPriority) -> Self {
434 match self {
435 TaskPriority::Initial => parent_priority,
436 TaskPriority::Invalidation { priority } => {
437 if let TaskPriority::Invalidation {
438 priority: parent_priority,
439 } = parent_priority
440 && priority.0 < parent_priority.0
441 {
442 Self::Invalidation {
443 priority: Reverse(parent_priority.0.saturating_add(1)),
444 }
445 } else {
446 *self
447 }
448 }
449 }
450 }
451}
452
453impl Display for TaskPriority {
454 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 match self {
456 TaskPriority::Initial => write!(f, "initial"),
457 TaskPriority::Invalidation { priority } => write!(f, "invalidation({})", priority.0),
458 }
459 }
460}
461
462enum ScheduledTask {
463 Task {
464 task_id: TaskId,
465 span: Span,
466 },
467 LocalTask {
468 ty: LocalTaskSpec,
469 persistence: TaskPersistence,
470 local_task_id: LocalTaskId,
471 global_task_state: Arc<RwLock<CurrentTaskState>>,
472 span: Span,
473 },
474}
475
476pub struct TurboTasks<B: Backend + 'static> {
477 this: Weak<Self>,
478 backend: B,
479 task_id_factory: IdFactoryWithReuse<TaskId>,
480 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
481 execution_id_factory: IdFactory<ExecutionId>,
482 stopped: AtomicBool,
483 currently_scheduled_foreground_jobs: AtomicUsize,
484 currently_scheduled_background_jobs: AtomicUsize,
485 scheduled_tasks: AtomicUsize,
486 priority_runner:
487 Arc<PriorityRunner<TurboTasks<B>, ScheduledTask, TaskPriority, TurboTasksExecutor>>,
488 start: Mutex<Option<Instant>>,
489 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
490 event_foreground_start: Event,
492 event_foreground_done: Event,
495 event_background_done: Event,
497 program_start: Instant,
498 compilation_events: CompilationEventQueue,
499}
500
501struct CurrentTaskState {
510 task_id: Option<TaskId>,
511 execution_id: ExecutionId,
512 priority: TaskPriority,
513
514 #[cfg(feature = "verify_determinism")]
517 stateful: bool,
518
519 has_invalidator: bool,
521
522 in_top_level_task: bool,
525
526 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
531
532 local_tasks: LocalTaskTracker,
535}
536
537impl CurrentTaskState {
538 fn new(
539 task_id: TaskId,
540 execution_id: ExecutionId,
541 priority: TaskPriority,
542 in_top_level_task: bool,
543 ) -> Self {
544 Self {
545 task_id: Some(task_id),
546 execution_id,
547 priority,
548 #[cfg(feature = "verify_determinism")]
549 stateful: false,
550 has_invalidator: false,
551 in_top_level_task,
552 cell_counters: Some(AutoMap::default()),
553 local_tasks: LocalTaskTracker::new(),
554 }
555 }
556
557 fn new_temporary(
558 execution_id: ExecutionId,
559 priority: TaskPriority,
560 in_top_level_task: bool,
561 ) -> Self {
562 Self {
563 task_id: None,
564 execution_id,
565 priority,
566 #[cfg(feature = "verify_determinism")]
567 stateful: false,
568 has_invalidator: false,
569 in_top_level_task,
570 cell_counters: None,
571 local_tasks: LocalTaskTracker::new(),
572 }
573 }
574
575 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
576 if self.execution_id != expected_execution_id {
577 panic!(
578 "Local tasks can only be scheduled/awaited within the same execution of the \
579 parent task that created them"
580 );
581 }
582 }
583}
584
585task_local! {
587 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
589
590 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
591
592 pub(crate) static SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK: bool;
597}
598
599impl<B: Backend + 'static> TurboTasks<B> {
600 pub fn new(backend: B) -> Arc<Self> {
606 let task_id_factory = IdFactoryWithReuse::new(
607 TaskId::MIN,
608 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
609 );
610 let transient_task_id_factory =
611 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
612 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
613 let this = Arc::new_cyclic(|this| Self {
614 this: this.clone(),
615 backend,
616 task_id_factory,
617 transient_task_id_factory,
618 execution_id_factory,
619 stopped: AtomicBool::new(false),
620 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
621 currently_scheduled_background_jobs: AtomicUsize::new(0),
622 scheduled_tasks: AtomicUsize::new(0),
623 priority_runner: Arc::new(PriorityRunner::new(TurboTasksExecutor)),
624 start: Default::default(),
625 aggregated_update: Default::default(),
626 event_foreground_done: Event::new(|| {
627 || "TurboTasks::event_foreground_done".to_string()
628 }),
629 event_foreground_start: Event::new(|| {
630 || "TurboTasks::event_foreground_start".to_string()
631 }),
632 event_background_done: Event::new(|| {
633 || "TurboTasks::event_background_done".to_string()
634 }),
635 program_start: Instant::now(),
636 compilation_events: CompilationEventQueue::default(),
637 });
638 this.backend.startup(&*this);
639 this
640 }
641
642 pub fn pin(&self) -> Arc<Self> {
643 self.this.upgrade().unwrap()
644 }
645
646 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
648 where
649 T: ?Sized,
650 F: Fn() -> Fut + Send + Sync + Clone + 'static,
651 Fut: Future<Output = Result<Vc<T>>> + Send,
652 {
653 let id = self.backend.create_transient_task(
654 TransientTaskType::Root(Box::new(move || {
655 let functor = functor.clone();
656 Box::pin(async move {
657 mark_top_level_task();
658 let raw_vc = functor().await?.node;
659 raw_vc.to_non_local().await
660 })
661 })),
662 self,
663 );
664 self.schedule(id, TaskPriority::initial());
665 id
666 }
667
668 pub fn dispose_root_task(&self, task_id: TaskId) {
669 self.backend.dispose_root_task(task_id, self);
670 }
671
672 #[track_caller]
676 fn spawn_once_task<T, Fut>(&self, future: Fut)
677 where
678 T: ?Sized,
679 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
680 {
681 let id = self.backend.create_transient_task(
682 TransientTaskType::Once(Box::pin(async move {
683 mark_top_level_task();
684 let raw_vc = future.await?.node;
685 raw_vc.to_non_local().await
686 })),
687 self,
688 );
689 self.schedule(id, TaskPriority::initial());
690 }
691
692 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
693 &self,
694 future: impl Future<Output = Result<T>> + Send + 'static,
695 ) -> Result<T> {
696 let (tx, rx) = tokio::sync::oneshot::channel();
697 self.spawn_once_task(async move {
698 mark_top_level_task();
699 let result = future.await;
700 tx.send(result)
701 .map_err(|_| anyhow!("unable to send result"))?;
702 Ok(Completion::new())
703 });
704
705 rx.await?
706 }
707
708 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
709 pub async fn run<T: TraceRawVcs + Send + 'static>(
710 &self,
711 future: impl Future<Output = Result<T>> + Send + 'static,
712 ) -> Result<T, TurboTasksExecutionError> {
713 self.begin_foreground_job();
714 let execution_id = self.execution_id_factory.wrapping_get();
716 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new_temporary(
717 execution_id,
718 TaskPriority::initial(),
719 true, )));
721
722 let result = TURBO_TASKS
723 .scope(
724 self.pin(),
725 CURRENT_TASK_STATE.scope(current_task_state, async {
726 let result = CaptureFuture::new(future).await;
727
728 wait_for_local_tasks().await;
730
731 match result {
732 Ok(Ok(value)) => Ok(value),
733 Ok(Err(err)) => Err(err.into()),
734 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
735 }
736 }),
737 )
738 .await;
739 self.finish_foreground_job();
740 result
741 }
742
743 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
744 let this = self.pin();
745 tokio::spawn(async move {
746 this.pin()
747 .run_once(async move {
748 this.finish_foreground_job();
749 future.await;
750 this.begin_foreground_job();
751 Ok(())
752 })
753 .await
754 .unwrap()
755 });
756 }
757
758 pub(crate) fn native_call(
759 &self,
760 native_fn: &'static NativeFunction,
761 this: Option<RawVc>,
762 arg: &mut dyn StackDynTaskInputs,
763 persistence: TaskPersistence,
764 ) -> RawVc {
765 RawVc::TaskOutput(self.backend.get_or_create_task(
766 native_fn,
767 this,
768 arg,
769 current_task_if_available("turbo_function calls"),
770 persistence,
771 self,
772 ))
773 }
774
775 pub fn dynamic_call(
776 &self,
777 native_fn: &'static NativeFunction,
778 this: Option<RawVc>,
779 arg: &mut dyn StackDynTaskInputs,
780 persistence: TaskPersistence,
781 ) -> RawVc {
782 if this.is_none_or(|this| this.is_resolved())
783 && native_fn.arg_meta.is_resolved(arg.as_ref())
784 {
785 return self.native_call(native_fn, this, arg, persistence);
786 }
787 let arg = arg.take_box();
789 let task_type = LocalTaskSpec {
790 task_type: LocalTaskType::ResolveNative { native_fn },
791 this,
792 arg,
793 };
794 self.schedule_local_task(task_type, persistence)
795 }
796
797 pub fn trait_call(
798 &self,
799 trait_method: &'static TraitMethod,
800 this: RawVc,
801 arg: &mut dyn StackDynTaskInputs,
802 persistence: TaskPersistence,
803 ) -> RawVc {
804 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
808 match registry::get_value_type(type_id).get_trait_method(trait_method) {
809 Some(native_fn) => {
810 if let Some(filter) = native_fn.arg_meta.filter_owned {
811 let mut arg = (filter)(arg);
812 return self.dynamic_call(native_fn, Some(this), &mut arg, persistence);
813 } else {
814 return self.dynamic_call(native_fn, Some(this), arg, persistence);
815 }
816 }
817 None => {
818 }
822 }
823 }
824
825 let task_type = LocalTaskSpec {
827 task_type: LocalTaskType::ResolveTrait { trait_method },
828 this: Some(this),
829 arg: arg.take_box(),
830 };
831
832 self.schedule_local_task(task_type, persistence)
833 }
834
835 #[track_caller]
836 pub(crate) fn schedule(&self, task_id: TaskId, priority: TaskPriority) {
837 self.begin_foreground_job();
838 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
839
840 self.priority_runner.schedule(
841 &self.pin(),
842 ScheduledTask::Task {
843 task_id,
844 span: Span::current(),
845 },
846 priority,
847 );
848 }
849
850 fn schedule_local_task(
851 &self,
852 ty: LocalTaskSpec,
853 persistence: TaskPersistence,
855 ) -> RawVc {
856 let task_type = ty.task_type;
857 let (global_task_state, execution_id, priority, local_task_id) =
858 CURRENT_TASK_STATE.with(|gts| {
859 let mut gts_write = gts.write().unwrap();
860 let local_task_id = gts_write.local_tasks.create(task_type);
861 (
862 Arc::clone(gts),
863 gts_write.execution_id,
864 gts_write.priority,
865 local_task_id,
866 )
867 });
868
869 self.priority_runner.schedule(
870 &self.pin(),
871 ScheduledTask::LocalTask {
872 ty,
873 persistence,
874 local_task_id,
875 global_task_state,
876 span: Span::current(),
877 },
878 priority,
879 );
880
881 RawVc::LocalOutput(execution_id, local_task_id, persistence)
882 }
883
884 fn begin_foreground_job(&self) {
885 if self
886 .currently_scheduled_foreground_jobs
887 .fetch_add(1, Ordering::AcqRel)
888 == 0
889 {
890 *self.start.lock().unwrap() = Some(Instant::now());
891 self.event_foreground_start.notify(usize::MAX);
892 self.backend.idle_end(self);
893 }
894 }
895
896 fn finish_foreground_job(&self) {
897 if self
898 .currently_scheduled_foreground_jobs
899 .fetch_sub(1, Ordering::AcqRel)
900 == 1
901 {
902 self.backend.idle_start(self);
903 let total = self.scheduled_tasks.load(Ordering::Acquire);
906 self.scheduled_tasks.store(0, Ordering::Release);
907 if let Some(start) = *self.start.lock().unwrap() {
908 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
909 if let Some(update) = update.as_mut() {
910 update.0 += start.elapsed();
911 update.1 += total;
912 } else {
913 *update = Some((start.elapsed(), total));
914 }
915 }
916 self.event_foreground_done.notify(usize::MAX);
917 }
918 }
919
920 fn begin_background_job(&self) {
921 self.currently_scheduled_background_jobs
922 .fetch_add(1, Ordering::Relaxed);
923 }
924
925 fn finish_background_job(&self) {
926 if self
927 .currently_scheduled_background_jobs
928 .fetch_sub(1, Ordering::Relaxed)
929 == 1
930 {
931 self.event_background_done.notify(usize::MAX);
932 }
933 }
934
935 pub fn get_in_progress_count(&self) -> usize {
936 self.currently_scheduled_foreground_jobs
937 .load(Ordering::Acquire)
938 }
939
940 pub async fn wait_task_completion(
952 &self,
953 id: TaskId,
954 consistency: ReadConsistency,
955 ) -> Result<()> {
956 read_task_output(
957 self,
958 id,
959 ReadOutputOptions {
960 tracking: ReadTracking::Untracked,
962 consistency,
963 },
964 )
965 .await?;
966 Ok(())
967 }
968
969 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
972 self.aggregated_update_info(aggregation, Duration::MAX)
973 .await
974 .unwrap()
975 }
976
977 pub async fn aggregated_update_info(
981 &self,
982 aggregation: Duration,
983 timeout: Duration,
984 ) -> Option<UpdateInfo> {
985 let listener = self
986 .event_foreground_done
987 .listen_with_note(|| || "wait for update info".to_string());
988 let wait_for_finish = {
989 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
990 if aggregation.is_zero() {
991 if let Some((duration, tasks)) = update.take() {
992 return Some(UpdateInfo {
993 duration,
994 tasks,
995 reasons: take(reason_set),
996 placeholder_for_future_fields: (),
997 });
998 } else {
999 true
1000 }
1001 } else {
1002 update.is_none()
1003 }
1004 };
1005 if wait_for_finish {
1006 if timeout == Duration::MAX {
1007 listener.await;
1009 } else {
1010 let start_listener = self
1012 .event_foreground_start
1013 .listen_with_note(|| || "wait for update info".to_string());
1014 if self
1015 .currently_scheduled_foreground_jobs
1016 .load(Ordering::Acquire)
1017 == 0
1018 {
1019 start_listener.await;
1020 } else {
1021 drop(start_listener);
1022 }
1023 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1024 return None;
1026 }
1027 }
1028 }
1029 if !aggregation.is_zero() {
1030 loop {
1031 select! {
1032 () = tokio::time::sleep(aggregation) => {
1033 break;
1034 }
1035 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1036 }
1038 }
1039 }
1040 }
1041 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1042 if let Some((duration, tasks)) = update.take() {
1043 Some(UpdateInfo {
1044 duration,
1045 tasks,
1046 reasons: take(reason_set),
1047 placeholder_for_future_fields: (),
1048 })
1049 } else {
1050 panic!("aggregated_update_info must not called concurrently")
1051 }
1052 }
1053
1054 pub async fn wait_background_done(&self) {
1055 let listener = self.event_background_done.listen();
1056 if self
1057 .currently_scheduled_background_jobs
1058 .load(Ordering::Acquire)
1059 != 0
1060 {
1061 listener.await;
1062 }
1063 }
1064
1065 pub async fn stop_and_wait(&self) {
1066 turbo_tasks_future_scope(self.pin(), async move {
1067 self.backend.stopping(self);
1068 self.stopped.store(true, Ordering::Release);
1069 {
1070 let listener = self
1071 .event_foreground_done
1072 .listen_with_note(|| || "wait for stop".to_string());
1073 if self
1074 .currently_scheduled_foreground_jobs
1075 .load(Ordering::Acquire)
1076 != 0
1077 {
1078 listener.await;
1079 }
1080 }
1081 {
1082 let listener = self.event_background_done.listen();
1083 if self
1084 .currently_scheduled_background_jobs
1085 .load(Ordering::Acquire)
1086 != 0
1087 {
1088 listener.await;
1089 }
1090 }
1091 self.backend.stop(self);
1092 })
1093 .await;
1094 }
1095
1096 #[track_caller]
1097 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1098 where
1099 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1100 T::CallOnceFuture: Send,
1101 {
1102 let mut this = self.pin();
1103 this.begin_foreground_job();
1104 tokio::spawn(
1105 TURBO_TASKS
1106 .scope(this.clone(), async move {
1107 if !this.stopped.load(Ordering::Acquire) {
1108 this = func(this.clone()).await;
1109 }
1110 this.finish_foreground_job();
1111 })
1112 .in_current_span(),
1113 );
1114 }
1115
1116 #[track_caller]
1117 pub(crate) fn schedule_background_job<T>(&self, func: T)
1118 where
1119 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1120 T::CallOnceFuture: Send,
1121 {
1122 let mut this = self.pin();
1123 self.begin_background_job();
1124 tokio::spawn(
1125 TURBO_TASKS
1126 .scope(this.clone(), async move {
1127 if !this.stopped.load(Ordering::Acquire) {
1128 this = func(this).await;
1129 }
1130 this.finish_background_job();
1131 })
1132 .in_current_span(),
1133 );
1134 }
1135
1136 fn finish_current_task_state(&self) -> FinishedTaskState {
1137 CURRENT_TASK_STATE.with(|cell| {
1138 let current_task_state = &*cell.write().unwrap();
1139 FinishedTaskState {
1140 #[cfg(feature = "verify_determinism")]
1141 stateful: current_task_state.stateful,
1142 has_invalidator: current_task_state.has_invalidator,
1143 }
1144 })
1145 }
1146
1147 pub fn backend(&self) -> &B {
1148 &self.backend
1149 }
1150}
1151
1152struct TurboTasksExecutor;
1153
1154async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1159 match AssertUnwindSafe(f).catch_unwind().await {
1160 Ok(r) => r,
1161 Err(_) => {
1162 eprintln!(
1163 "\nturbo-tasks: an internal panic occurred outside the per-task panic \
1164 boundary. This is a bug in turbo-tasks/Turbopack — please report it at \
1165 https://github.com/vercel/next.js/discussions and include the panic message \
1166 and stack trace above.\n\nAborting."
1167 );
1168 abort();
1169 }
1170 }
1171}
1172
1173impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
1174 type Future = impl Future<Output = ()> + Send + 'static;
1175
1176 fn execute(
1177 &self,
1178 this: &Arc<TurboTasks<B>>,
1179 scheduled_task: ScheduledTask,
1180 priority: TaskPriority,
1181 ) -> Self::Future {
1182 match scheduled_task {
1183 ScheduledTask::Task { task_id, span } => {
1184 let this2 = this.clone();
1185 let this = this.clone();
1186 let future = async move {
1187 abort_on_panic(async {
1188 let execution_id = this.execution_id_factory.wrapping_get();
1191 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1192 task_id,
1193 execution_id,
1194 priority,
1195 false, )));
1197 let single_execution_future = async {
1198 if this.stopped.load(Ordering::Acquire) {
1199 this.backend.task_execution_canceled(task_id, &*this);
1200 return None;
1201 }
1202
1203 let TaskExecutionSpec { future, span } = this
1204 .backend
1205 .try_start_task_execution(task_id, priority, &*this)?;
1206
1207 async {
1208 let result = CaptureFuture::new(future).await;
1209
1210 wait_for_local_tasks().await;
1212
1213 let result = match result {
1214 Ok(Ok(raw_vc)) => {
1215 raw_vc
1218 .to_non_local_unchecked_sync(&*this)
1219 .map_err(|err| err.into())
1220 }
1221 Ok(Err(err)) => Err(err.into()),
1222 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1223 };
1224
1225 let finished_state = this.finish_current_task_state();
1226 let cell_counters = CURRENT_TASK_STATE
1227 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1228 this.backend.task_execution_completed(
1229 task_id,
1230 result,
1231 &cell_counters,
1232 #[cfg(feature = "verify_determinism")]
1233 finished_state.stateful,
1234 finished_state.has_invalidator,
1235 &*this,
1236 )
1237 }
1238 .instrument(span)
1239 .await
1240 };
1241 if let Some(stale_priority) = CURRENT_TASK_STATE
1242 .scope(current_task_state, single_execution_future)
1243 .await
1244 {
1245 this.schedule(task_id, stale_priority);
1248 }
1249 this.finish_foreground_job();
1250 })
1251 .await
1252 };
1253
1254 Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
1255 }
1256 ScheduledTask::LocalTask {
1257 ty,
1258 persistence,
1259 local_task_id,
1260 global_task_state,
1261 span,
1262 } => {
1263 let this2 = this.clone();
1264 let this = this.clone();
1265 let task_type = ty.task_type;
1266 let future = async move {
1267 let span = match &ty.task_type {
1268 LocalTaskType::ResolveNative { native_fn } => {
1269 native_fn.resolve_span(priority)
1270 }
1271 LocalTaskType::ResolveTrait { trait_method } => {
1272 trait_method.resolve_span(priority)
1273 }
1274 };
1275 abort_on_panic(
1276 async move {
1277 let result = match ty.task_type {
1278 LocalTaskType::ResolveNative { native_fn } => {
1279 LocalTaskType::run_resolve_native(
1280 native_fn,
1281 ty.this,
1282 &*ty.arg,
1283 persistence,
1284 this,
1285 )
1286 .await
1287 }
1288 LocalTaskType::ResolveTrait { trait_method } => {
1289 LocalTaskType::run_resolve_trait(
1290 trait_method,
1291 ty.this.unwrap(),
1292 &*ty.arg,
1293 persistence,
1294 this,
1295 )
1296 .await
1297 }
1298 };
1299
1300 let output = match result {
1301 Ok(raw_vc) => OutputContent::Link(raw_vc),
1302 Err(err) => OutputContent::Error(
1303 TurboTasksExecutionError::from(err)
1304 .with_local_task_context(task_type.to_string()),
1305 ),
1306 };
1307
1308 CURRENT_TASK_STATE.with(move |gts| {
1309 gts.write()
1310 .unwrap()
1311 .local_tasks
1312 .complete(local_task_id, output);
1313 });
1314 }
1315 .instrument(span),
1316 )
1317 .await
1318 };
1319 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
1320
1321 Either::Right(TURBO_TASKS.scope(this2, future).instrument(span))
1322 }
1323 }
1324 }
1325}
1326
1327struct FinishedTaskState {
1328 #[cfg(feature = "verify_determinism")]
1331 stateful: bool,
1332
1333 has_invalidator: bool,
1335}
1336
1337impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1338 fn dynamic_call(
1339 &self,
1340 native_fn: &'static NativeFunction,
1341 this: Option<RawVc>,
1342 arg: &mut dyn StackDynTaskInputs,
1343 persistence: TaskPersistence,
1344 ) -> RawVc {
1345 self.dynamic_call(native_fn, this, arg, persistence)
1346 }
1347 fn native_call(
1348 &self,
1349 native_fn: &'static NativeFunction,
1350 this: Option<RawVc>,
1351 arg: &mut dyn StackDynTaskInputs,
1352 persistence: TaskPersistence,
1353 ) -> RawVc {
1354 self.native_call(native_fn, this, arg, persistence)
1355 }
1356 fn trait_call(
1357 &self,
1358 trait_method: &'static TraitMethod,
1359 this: RawVc,
1360 arg: &mut dyn StackDynTaskInputs,
1361 persistence: TaskPersistence,
1362 ) -> RawVc {
1363 self.trait_call(trait_method, this, arg, persistence)
1364 }
1365
1366 #[track_caller]
1367 fn run(
1368 &self,
1369 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1370 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1371 let this = self.pin();
1372 Box::pin(async move { this.run(future).await })
1373 }
1374
1375 #[track_caller]
1376 fn run_once(
1377 &self,
1378 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1379 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1380 let this = self.pin();
1381 Box::pin(async move { this.run_once(future).await })
1382 }
1383
1384 #[track_caller]
1385 fn run_once_with_reason(
1386 &self,
1387 reason: StaticOrArc<dyn InvalidationReason>,
1388 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1389 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1390 {
1391 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1392 reason_set.insert(reason);
1393 }
1394 let this = self.pin();
1395 Box::pin(async move { this.run_once(future).await })
1396 }
1397
1398 #[track_caller]
1399 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1400 self.start_once_process(future)
1401 }
1402
1403 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1404 if let Err(e) = self.compilation_events.send(event) {
1405 tracing::warn!("Failed to send compilation event: {e}");
1406 }
1407 }
1408
1409 fn get_task_name(&self, task: TaskId) -> String {
1410 self.backend.get_task_name(task, self)
1411 }
1412}
1413
1414impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1415 #[instrument(level = "info", skip_all, name = "invalidate")]
1416 fn invalidate(&self, task: TaskId) {
1417 self.backend.invalidate_task(task, self);
1418 }
1419
1420 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1421 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1422 {
1423 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1424 reason_set.insert(reason);
1425 }
1426 self.backend.invalidate_task(task, self);
1427 }
1428
1429 fn invalidate_serialization(&self, task: TaskId) {
1430 self.backend.invalidate_serialization(task, self);
1431 }
1432
1433 #[track_caller]
1434 fn try_read_task_output(
1435 &self,
1436 task: TaskId,
1437 options: ReadOutputOptions,
1438 ) -> Result<Result<RawVc, EventListener>> {
1439 if options.consistency == ReadConsistency::Eventual {
1440 debug_assert_not_in_top_level_task("read_task_output");
1441 }
1442 self.backend.try_read_task_output(
1443 task,
1444 current_task_if_available("reading Vcs"),
1445 options,
1446 self,
1447 )
1448 }
1449
1450 #[track_caller]
1451 fn try_read_task_cell(
1452 &self,
1453 task: TaskId,
1454 index: CellId,
1455 options: ReadCellOptions,
1456 ) -> Result<Result<TypedCellContent, EventListener>> {
1457 let reader = current_task_if_available("reading Vcs");
1458 self.backend
1459 .try_read_task_cell(task, index, reader, options, self)
1460 }
1461
1462 fn try_read_own_task_cell(
1463 &self,
1464 current_task: TaskId,
1465 index: CellId,
1466 ) -> Result<TypedCellContent> {
1467 self.backend
1468 .try_read_own_task_cell(current_task, index, self)
1469 }
1470
1471 #[track_caller]
1472 fn try_read_local_output(
1473 &self,
1474 execution_id: ExecutionId,
1475 local_task_id: LocalTaskId,
1476 ) -> Result<Result<RawVc, EventListener>> {
1477 debug_assert_not_in_top_level_task("read_local_output");
1478 CURRENT_TASK_STATE.with(|gts| {
1479 let gts_read = gts.read().unwrap();
1480
1481 gts_read.assert_execution_id(execution_id);
1486
1487 match gts_read.local_tasks.get(local_task_id) {
1488 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1489 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1490 }
1491 })
1492 }
1493
1494 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1495 self.backend.read_task_collectibles(
1498 task,
1499 trait_id,
1500 current_task_if_available("reading collectibles"),
1501 self,
1502 )
1503 }
1504
1505 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1506 self.backend.emit_collectible(
1507 trait_type,
1508 collectible,
1509 current_task("emitting collectible"),
1510 self,
1511 );
1512 }
1513
1514 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1515 self.backend.unemit_collectible(
1516 trait_type,
1517 collectible,
1518 count,
1519 current_task("emitting collectible"),
1520 self,
1521 );
1522 }
1523
1524 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1525 for (&collectible, &count) in collectibles {
1526 if count > 0 {
1527 self.backend.unemit_collectible(
1528 trait_type,
1529 collectible,
1530 count as u32,
1531 current_task("emitting collectible"),
1532 self,
1533 );
1534 }
1535 }
1536 }
1537
1538 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
1539 self.try_read_own_task_cell(task, index)
1540 }
1541
1542 fn update_own_task_cell(
1543 &self,
1544 task: TaskId,
1545 index: CellId,
1546 content: CellContent,
1547 updated_key_hashes: Option<SmallVec<[u64; 2]>>,
1548 content_hash: Option<CellHash>,
1549 verification_mode: VerificationMode,
1550 ) {
1551 self.backend.update_task_cell(
1552 task,
1553 index,
1554 content,
1555 updated_key_hashes,
1556 content_hash,
1557 verification_mode,
1558 self,
1559 );
1560 }
1561
1562 fn connect_task(&self, task: TaskId) {
1563 self.backend
1564 .connect_task(task, current_task_if_available("connecting task"), self);
1565 }
1566
1567 fn mark_own_task_as_finished(&self, task: TaskId) {
1568 self.backend.mark_own_task_as_finished(task, self);
1569 }
1570
1571 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1572 self.backend.mark_own_task_as_session_dependent(task, self);
1573 }
1574
1575 fn spawn_detached_for_testing(&self, fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1578 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1581 global_task_state
1582 .write()
1583 .unwrap()
1584 .local_tasks
1585 .register_detached();
1586 let wrapped = async move {
1587 struct DropGuard;
1589 impl Drop for DropGuard {
1590 fn drop(&mut self) {
1591 CURRENT_TASK_STATE
1592 .with(|ts| ts.write().unwrap().local_tasks.decrement_in_flight());
1593 }
1594 }
1595 let _guard = DropGuard;
1596 fut.await;
1597 };
1598 tokio::spawn(TURBO_TASKS.scope(
1599 turbo_tasks(),
1600 CURRENT_TASK_STATE.scope(global_task_state, wrapped),
1601 ));
1602 }
1603
1604 fn task_statistics(&self) -> &TaskStatisticsApi {
1605 self.backend.task_statistics()
1606 }
1607
1608 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1609 let this = self.pin();
1610 Box::pin(async move {
1611 this.stop_and_wait().await;
1612 })
1613 }
1614
1615 fn subscribe_to_compilation_events(
1616 &self,
1617 event_types: Option<Vec<String>>,
1618 ) -> Receiver<Arc<dyn CompilationEvent>> {
1619 self.compilation_events.subscribe(event_types)
1620 }
1621
1622 fn is_tracking_dependencies(&self) -> bool {
1623 self.backend.is_tracking_dependencies()
1624 }
1625}
1626
1627impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1628 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1629 self.pin()
1630 }
1631 fn backend(&self) -> &B {
1632 &self.backend
1633 }
1634
1635 #[track_caller]
1636 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1637 self.schedule_background_job(async move |this| {
1638 this.backend.run_backend_job(job, &*this).await;
1639 this
1640 })
1641 }
1642
1643 #[track_caller]
1644 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1645 self.schedule_foreground_job(async move |this| {
1646 this.backend.run_backend_job(job, &*this).await;
1647 this
1648 })
1649 }
1650
1651 #[track_caller]
1652 fn schedule(&self, task: TaskId, priority: TaskPriority) {
1653 self.schedule(task, priority)
1654 }
1655
1656 fn get_current_task_priority(&self) -> TaskPriority {
1657 CURRENT_TASK_STATE
1658 .try_with(|task_state| task_state.read().unwrap().priority)
1659 .unwrap_or(TaskPriority::initial())
1660 }
1661
1662 fn program_duration_until(&self, instant: Instant) -> Duration {
1663 instant - self.program_start
1664 }
1665
1666 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1667 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1669 }
1670
1671 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1672 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1674 }
1675
1676 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1677 unsafe { self.task_id_factory.reuse(id.into()) }
1678 }
1679
1680 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1681 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1682 }
1683
1684 fn is_idle(&self) -> bool {
1685 self.currently_scheduled_foreground_jobs
1686 .load(Ordering::Acquire)
1687 == 0
1688 }
1689}
1690
1691async fn wait_for_local_tasks() {
1692 let listener =
1693 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_tasks.listen_for_in_flight());
1694 let Some(listener) = listener else {
1695 return;
1696 };
1697 listener.await;
1698}
1699
1700pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1701 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1702 Ok(id) => id,
1703 Err(_) => panic!(
1704 "{from} can only be used in the context of a turbo_tasks task execution or \
1705 turbo_tasks run"
1706 ),
1707 }
1708}
1709
1710pub(crate) fn current_task(from: &str) -> TaskId {
1711 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1712 Ok(Some(id)) => id,
1713 Ok(None) | Err(_) => {
1714 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1715 }
1716 }
1717}
1718
1719#[track_caller]
1722pub(crate) fn debug_assert_in_top_level_task(message: &str) {
1723 if !cfg!(debug_assertions) {
1724 return;
1725 }
1726
1727 let in_top_level = CURRENT_TASK_STATE
1728 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1729 .unwrap_or(true);
1730 if !in_top_level {
1731 panic!("{message}");
1732 }
1733}
1734
1735#[track_caller]
1736pub(crate) fn debug_assert_not_in_top_level_task(operation: &str) {
1737 if !cfg!(debug_assertions) {
1738 return;
1739 }
1740
1741 let suppressed = SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK
1744 .try_with(|&suppressed| suppressed)
1745 .unwrap_or(false);
1746 if suppressed {
1747 return;
1748 }
1749
1750 let in_top_level = CURRENT_TASK_STATE
1751 .try_with(|ts| ts.read().unwrap().in_top_level_task)
1752 .unwrap_or(false);
1753 if in_top_level {
1754 panic!(
1755 "Eventually consistent read ({operation}) cannot be performed from a top-level task. \
1756 Top-level tasks (e.g. code inside `.run_once(...)`) must use strongly consistent \
1757 reads to avoid leaking inconsistent return values."
1758 );
1759 }
1760}
1761
1762pub async fn run<T: Send + 'static>(
1763 tt: Arc<dyn TurboTasksApi>,
1764 future: impl Future<Output = Result<T>> + Send + 'static,
1765) -> Result<T> {
1766 let (tx, rx) = tokio::sync::oneshot::channel();
1767
1768 tt.run(Box::pin(async move {
1769 let result = future.await?;
1770 tx.send(result)
1771 .map_err(|_| anyhow!("unable to send result"))?;
1772 Ok(())
1773 }))
1774 .await?;
1775
1776 Ok(rx.await?)
1777}
1778
1779pub async fn run_once<T: Send + 'static>(
1780 tt: Arc<dyn TurboTasksApi>,
1781 future: impl Future<Output = Result<T>> + Send + 'static,
1782) -> Result<T> {
1783 let (tx, rx) = tokio::sync::oneshot::channel();
1784
1785 tt.run_once(Box::pin(async move {
1786 let result = future.await?;
1787 tx.send(result)
1788 .map_err(|_| anyhow!("unable to send result"))?;
1789 Ok(())
1790 }))
1791 .await?;
1792
1793 Ok(rx.await?)
1794}
1795
1796pub async fn run_once_with_reason<T: Send + 'static>(
1797 tt: Arc<dyn TurboTasksApi>,
1798 reason: impl InvalidationReason,
1799 future: impl Future<Output = Result<T>> + Send + 'static,
1800) -> Result<T> {
1801 let (tx, rx) = tokio::sync::oneshot::channel();
1802
1803 tt.run_once_with_reason(
1804 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1805 Box::pin(async move {
1806 let result = future.await?;
1807 tx.send(result)
1808 .map_err(|_| anyhow!("unable to send result"))?;
1809 Ok(())
1810 }),
1811 )
1812 .await?;
1813
1814 Ok(rx.await?)
1815}
1816
1817pub fn dynamic_call(
1819 func: &'static NativeFunction,
1820 this: Option<RawVc>,
1821 arg: &mut dyn StackDynTaskInputs,
1822 persistence: TaskPersistence,
1823) -> RawVc {
1824 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1825}
1826
1827pub fn trait_call(
1829 trait_method: &'static TraitMethod,
1830 this: RawVc,
1831 arg: &mut dyn StackDynTaskInputs,
1832 persistence: TaskPersistence,
1833) -> RawVc {
1834 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1835}
1836
1837pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1838 TURBO_TASKS.with(|arc| arc.clone())
1839}
1840
1841pub fn turbo_tasks_weak() -> Weak<dyn TurboTasksApi> {
1842 TURBO_TASKS.with(Arc::downgrade)
1843}
1844
1845pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1846 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1847}
1848
1849pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1850 TURBO_TASKS.with(|arc| func(arc))
1851}
1852
1853pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1854 TURBO_TASKS.sync_scope(tt, f)
1855}
1856
1857pub fn turbo_tasks_future_scope<T>(
1858 tt: Arc<dyn TurboTasksApi>,
1859 f: impl Future<Output = T>,
1860) -> impl Future<Output = T> {
1861 TURBO_TASKS.scope(tt, f)
1862}
1863
1864pub fn with_turbo_tasks_for_testing<T>(
1865 tt: Arc<dyn TurboTasksApi>,
1866 current_task: TaskId,
1867 execution_id: ExecutionId,
1868 f: impl Future<Output = T>,
1869) -> impl Future<Output = T> {
1870 TURBO_TASKS.scope(
1871 tt,
1872 CURRENT_TASK_STATE.scope(
1873 Arc::new(RwLock::new(CurrentTaskState::new(
1874 current_task,
1875 execution_id,
1876 TaskPriority::initial(),
1877 false, ))),
1879 f,
1880 ),
1881 )
1882}
1883
1884pub fn spawn_detached_for_testing(f: impl Future<Output = ()> + Send + 'static) {
1889 turbo_tasks().spawn_detached_for_testing(Box::pin(f));
1890}
1891
1892pub fn current_task_for_testing() -> Option<TaskId> {
1893 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1894}
1895
1896pub fn mark_session_dependent() {
1898 with_turbo_tasks(|tt| {
1899 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1900 });
1901}
1902
1903pub fn mark_finished() {
1906 with_turbo_tasks(|tt| {
1907 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1908 });
1909}
1910
1911pub fn get_serialization_invalidator() -> SerializationInvalidator {
1917 CURRENT_TASK_STATE.with(|cell| {
1918 let CurrentTaskState {
1919 task_id,
1920 #[cfg(feature = "verify_determinism")]
1921 stateful,
1922 ..
1923 } = &mut *cell.write().unwrap();
1924 #[cfg(feature = "verify_determinism")]
1925 {
1926 *stateful = true;
1927 }
1928 let Some(task_id) = *task_id else {
1929 panic!(
1930 "get_serialization_invalidator() can only be used in the context of a turbo_tasks \
1931 task execution"
1932 );
1933 };
1934 SerializationInvalidator::new(task_id)
1935 })
1936}
1937
1938pub fn mark_invalidator() {
1939 CURRENT_TASK_STATE.with(|cell| {
1940 let CurrentTaskState {
1941 has_invalidator, ..
1942 } = &mut *cell.write().unwrap();
1943 *has_invalidator = true;
1944 })
1945}
1946
1947pub fn mark_stateful() {
1953 #[cfg(feature = "verify_determinism")]
1954 {
1955 CURRENT_TASK_STATE.with(|cell| {
1956 let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap();
1957 *stateful = true;
1958 })
1959 }
1960 }
1962
1963pub fn mark_top_level_task() {
1967 if cfg!(debug_assertions) {
1968 CURRENT_TASK_STATE.with(|cell| {
1969 cell.write().unwrap().in_top_level_task = true;
1970 })
1971 }
1972}
1973
1974pub fn unmark_top_level_task_may_leak_eventually_consistent_state() {
1985 if cfg!(debug_assertions) {
1986 CURRENT_TASK_STATE.with(|cell| {
1987 cell.write().unwrap().in_top_level_task = false;
1988 })
1989 }
1990}
1991
1992pub fn prevent_gc() {
1993 }
1995
1996pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1997 with_turbo_tasks(|tt| {
1998 let raw_vc = collectible.node.node;
1999 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
2000 })
2001}
2002
2003pub(crate) async fn read_task_output(
2004 this: &dyn TurboTasksApi,
2005 id: TaskId,
2006 options: ReadOutputOptions,
2007) -> Result<RawVc> {
2008 loop {
2009 match this.try_read_task_output(id, options)? {
2010 Ok(result) => return Ok(result),
2011 Err(listener) => listener.await,
2012 }
2013 }
2014}
2015
2016#[derive(Clone, Copy)]
2022pub struct CurrentCellRef {
2023 current_task: TaskId,
2024 index: CellId,
2025}
2026
2027type VcReadTarget<T> = <<T as VcValueType>::Read as VcRead<T>>::Target;
2028
2029impl CurrentCellRef {
2030 fn conditional_update<T>(
2032 &self,
2033 functor: impl FnOnce(Option<&T>) -> Option<(T, Option<SmallVec<[u64; 2]>>, Option<CellHash>)>,
2034 ) where
2035 T: VcValueType,
2036 {
2037 self.conditional_update_with_shared_reference(|old_shared_reference| {
2038 let old_ref = old_shared_reference.and_then(|sr| sr.0.downcast_ref::<T>());
2039 let (new_value, updated_key_hashes, content_hash) = functor(old_ref)?;
2040 Some((
2041 SharedReference::new(triomphe::Arc::new(new_value)),
2042 updated_key_hashes,
2043 content_hash,
2044 ))
2045 })
2046 }
2047
2048 fn conditional_update_with_shared_reference(
2050 &self,
2051 functor: impl FnOnce(
2052 Option<&SharedReference>,
2053 ) -> Option<(
2054 SharedReference,
2055 Option<SmallVec<[u64; 2]>>,
2056 Option<CellHash>,
2057 )>,
2058 ) {
2059 let tt = turbo_tasks();
2060 let cell_content = tt.read_own_task_cell(self.current_task, self.index).ok();
2061 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
2062 if let Some((update, updated_key_hashes, content_hash)) = update {
2063 tt.update_own_task_cell(
2064 self.current_task,
2065 self.index,
2066 CellContent(Some(update)),
2067 updated_key_hashes,
2068 content_hash,
2069 VerificationMode::EqualityCheck,
2070 )
2071 }
2072 }
2073
2074 pub fn compare_and_update<T>(&self, new_value: T)
2108 where
2109 T: PartialEq + VcValueType,
2110 {
2111 self.conditional_update(|old_value| {
2112 if let Some(old_value) = old_value
2113 && old_value == &new_value
2114 {
2115 return None;
2116 }
2117 Some((new_value, None, None))
2118 });
2119 }
2120
2121 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
2129 where
2130 T: VcValueType + PartialEq,
2131 {
2132 self.conditional_update_with_shared_reference(|old_sr| {
2133 if let Some(old_sr) = old_sr {
2134 let old_value = extract_sr_value::<T>(old_sr);
2135 let new_value = extract_sr_value::<T>(&new_shared_reference);
2136 if old_value == new_value {
2137 return None;
2138 }
2139 }
2140 Some((new_shared_reference, None, None))
2141 });
2142 }
2143
2144 pub fn hashed_compare_and_update<T>(&self, new_value: T)
2153 where
2154 T: PartialEq + DeterministicHash + VcValueType,
2155 {
2156 self.conditional_update(|old_value| {
2157 if let Some(old_value) = old_value
2158 && old_value == &new_value
2159 {
2160 return None;
2161 }
2162 let content_hash = hash_xxh3_hash128(&new_value);
2163 Some((new_value, None, Some(content_hash)))
2164 });
2165 }
2166
2167 pub fn hashed_compare_and_update_with_shared_reference<T>(
2173 &self,
2174 new_shared_reference: SharedReference,
2175 ) where
2176 T: VcValueType + PartialEq + DeterministicHash,
2177 {
2178 self.conditional_update_with_shared_reference(move |old_sr| {
2179 if let Some(old_sr) = old_sr {
2180 let old_value = extract_sr_value::<T>(old_sr);
2181 let new_value = extract_sr_value::<T>(&new_shared_reference);
2182 if old_value == new_value {
2183 return None;
2184 }
2185 }
2186 let content_hash = hash_xxh3_hash128(extract_sr_value::<T>(&new_shared_reference));
2187 Some((new_shared_reference, None, Some(content_hash)))
2188 });
2189 }
2190
2191 pub fn keyed_compare_and_update<T>(&self, new_value: T)
2193 where
2194 T: PartialEq + VcValueType,
2195 VcReadTarget<T>: KeyedEq,
2196 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2197 {
2198 self.conditional_update(|old_value| {
2199 let Some(old_value) = old_value else {
2200 return Some((new_value, None, None));
2201 };
2202 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2203 let new_value_ref = <T as VcValueType>::Read::value_to_target_ref(&new_value);
2204 let updated_keys = old_value.different_keys(new_value_ref);
2205 if updated_keys.is_empty() {
2206 return None;
2207 }
2208 let updated_key_hashes = updated_keys
2210 .into_iter()
2211 .map(|key| FxBuildHasher.hash_one(key))
2212 .collect();
2213 Some((new_value, Some(updated_key_hashes), None))
2214 });
2215 }
2216
2217 pub fn keyed_compare_and_update_with_shared_reference<T>(
2220 &self,
2221 new_shared_reference: SharedReference,
2222 ) where
2223 T: VcValueType + PartialEq,
2224 VcReadTarget<T>: KeyedEq,
2225 <VcReadTarget<T> as KeyedEq>::Key: std::hash::Hash,
2226 {
2227 self.conditional_update_with_shared_reference(|old_sr| {
2228 let Some(old_sr) = old_sr else {
2229 return Some((new_shared_reference, None, None));
2230 };
2231 let old_value = extract_sr_value::<T>(old_sr);
2232 let old_value = <T as VcValueType>::Read::value_to_target_ref(old_value);
2233 let new_value = extract_sr_value::<T>(&new_shared_reference);
2234 let new_value = <T as VcValueType>::Read::value_to_target_ref(new_value);
2235 let updated_keys = old_value.different_keys(new_value);
2236 if updated_keys.is_empty() {
2237 return None;
2238 }
2239 let updated_key_hashes = updated_keys
2241 .into_iter()
2242 .map(|key| FxBuildHasher.hash_one(key))
2243 .collect();
2244 Some((new_shared_reference, Some(updated_key_hashes), None))
2245 });
2246 }
2247
2248 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
2250 where
2251 T: VcValueType,
2252 {
2253 let tt = turbo_tasks();
2254 tt.update_own_task_cell(
2255 self.current_task,
2256 self.index,
2257 CellContent(Some(SharedReference::new(triomphe::Arc::new(new_value)))),
2258 None,
2259 None,
2260 verification_mode,
2261 )
2262 }
2263
2264 pub fn update_with_shared_reference(
2272 &self,
2273 shared_ref: SharedReference,
2274 verification_mode: VerificationMode,
2275 ) {
2276 let tt = turbo_tasks();
2277 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
2278 let content = tt.read_own_task_cell(self.current_task, self.index).ok();
2279 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2280 shared_ref_exp != shared_ref
2282 } else {
2283 true
2284 }
2285 } else {
2286 true
2287 };
2288 if update {
2289 tt.update_own_task_cell(
2290 self.current_task,
2291 self.index,
2292 CellContent(Some(shared_ref)),
2293 None,
2294 None,
2295 verification_mode,
2296 )
2297 }
2298 }
2299}
2300
2301impl From<CurrentCellRef> for RawVc {
2302 fn from(cell: CurrentCellRef) -> Self {
2303 RawVc::TaskCell(cell.current_task, cell.index)
2304 }
2305}
2306
2307fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
2308 sr.0.downcast_ref::<T>()
2309 .expect("cannot update SharedReference of different type")
2310}
2311
2312pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
2313 find_cell_by_id(T::get_value_type_id())
2314}
2315
2316pub fn find_cell_by_id(ty: ValueTypeId) -> CurrentCellRef {
2317 CURRENT_TASK_STATE.with(|ts| {
2318 let current_task = current_task("celling turbo_tasks values");
2319 let mut ts = ts.write().unwrap();
2320 let map = ts.cell_counters.as_mut().unwrap();
2321 let current_index = map.entry(ty).or_default();
2322 let index = *current_index;
2323 *current_index += 1;
2324 CurrentCellRef {
2325 current_task,
2326 index: CellId { type_id: ty, index },
2327 }
2328 })
2329}
2330
2331pub(crate) async fn read_local_output(
2332 this: &dyn TurboTasksApi,
2333 execution_id: ExecutionId,
2334 local_task_id: LocalTaskId,
2335) -> Result<RawVc> {
2336 loop {
2337 match this.try_read_local_output(execution_id, local_task_id)? {
2338 Ok(raw_vc) => return Ok(raw_vc),
2339 Err(event_listener) => event_listener.await,
2340 }
2341 }
2342}