1use std::{
2 fmt::Display,
3 future::Future,
4 hash::BuildHasherDefault,
5 mem::take,
6 pin::Pin,
7 sync::{
8 Arc, Mutex, RwLock, Weak,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use bincode::{Decode, Encode};
17use rustc_hash::FxHasher;
18use serde::{Deserialize, Serialize};
19use tokio::{select, sync::mpsc::Receiver, task_local};
20use tokio_util::task::TaskTracker;
21use tracing::{Instrument, instrument};
22
23use crate::{
24 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
25 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
26 VcValueTrait, VcValueType,
27 backend::{
28 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
29 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
30 },
31 capture_future::CaptureFuture,
32 event::{Event, EventListener},
33 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
34 id_factory::IdFactoryWithReuse,
35 macro_helpers::NativeFunction,
36 magic_any::MagicAny,
37 message_queue::{CompilationEvent, CompilationEventQueue},
38 raw_vc::{CellId, RawVc},
39 registry,
40 serialization_invalidation::SerializationInvalidator,
41 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
42 task_statistics::TaskStatisticsApi,
43 trace::TraceRawVcs,
44 util::{IdFactory, StaticOrArc},
45};
46
47pub trait TurboTasksCallApi: Sync + Send {
50 fn dynamic_call(
53 &self,
54 native_fn: &'static NativeFunction,
55 this: Option<RawVc>,
56 arg: Box<dyn MagicAny>,
57 persistence: TaskPersistence,
58 ) -> RawVc;
59 fn native_call(
62 &self,
63 native_fn: &'static NativeFunction,
64 this: Option<RawVc>,
65 arg: Box<dyn MagicAny>,
66 persistence: TaskPersistence,
67 ) -> RawVc;
68 fn trait_call(
71 &self,
72 trait_method: &'static TraitMethod,
73 this: RawVc,
74 arg: Box<dyn MagicAny>,
75 persistence: TaskPersistence,
76 ) -> RawVc;
77
78 fn run(
79 &self,
80 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
81 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
82 fn run_once(
83 &self,
84 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
85 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
86 fn run_once_with_reason(
87 &self,
88 reason: StaticOrArc<dyn InvalidationReason>,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
91 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
92}
93
94pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
100 fn invalidate(&self, task: TaskId);
101 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
102
103 fn invalidate_serialization(&self, task: TaskId);
104
105 fn try_read_task_output(
106 &self,
107 task: TaskId,
108 options: ReadOutputOptions,
109 ) -> Result<Result<RawVc, EventListener>>;
110
111 fn try_read_task_cell(
112 &self,
113 task: TaskId,
114 index: CellId,
115 options: ReadCellOptions,
116 ) -> Result<Result<TypedCellContent, EventListener>>;
117
118 fn try_read_local_output(
133 &self,
134 execution_id: ExecutionId,
135 local_task_id: LocalTaskId,
136 ) -> Result<Result<RawVc, EventListener>>;
137
138 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
139
140 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
141 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
142 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
143
144 fn try_read_own_task_cell(
147 &self,
148 current_task: TaskId,
149 index: CellId,
150 options: ReadCellOptions,
151 ) -> Result<TypedCellContent>;
152
153 fn read_own_task_cell(
154 &self,
155 task: TaskId,
156 index: CellId,
157 options: ReadCellOptions,
158 ) -> Result<TypedCellContent>;
159 fn update_own_task_cell(
160 &self,
161 task: TaskId,
162 index: CellId,
163 is_serializable_cell_content: bool,
164 content: CellContent,
165 verification_mode: VerificationMode,
166 );
167 fn mark_own_task_as_finished(&self, task: TaskId);
168 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
169 fn mark_own_task_as_session_dependent(&self, task: TaskId);
170
171 fn connect_task(&self, task: TaskId);
172
173 fn detached_for_testing(
178 &self,
179 f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
180 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
181
182 fn task_statistics(&self) -> &TaskStatisticsApi;
183
184 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
185
186 fn subscribe_to_compilation_events(
187 &self,
188 event_types: Option<Vec<String>>,
189 ) -> Receiver<Arc<dyn CompilationEvent>>;
190 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
191
192 fn is_tracking_dependencies(&self) -> bool;
194}
195
196pub struct Unused<T> {
198 inner: T,
199}
200
201impl<T> Unused<T> {
202 pub unsafe fn new_unchecked(inner: T) -> Self {
208 Self { inner }
209 }
210
211 pub unsafe fn get_unchecked(&self) -> &T {
217 &self.inner
218 }
219
220 pub fn into(self) -> T {
222 self.inner
223 }
224}
225
226pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
228 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
229
230 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
231 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
232 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
236 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
240
241 fn schedule(&self, task: TaskId);
243
244 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
246
247 fn schedule_backend_background_job(&self, job: B::BackendJob);
252
253 fn program_duration_until(&self, instant: Instant) -> Duration;
255
256 fn is_idle(&self) -> bool;
258
259 fn backend(&self) -> &B;
261}
262
263#[allow(clippy::manual_non_exhaustive)]
264pub struct UpdateInfo {
265 pub duration: Duration,
266 pub tasks: usize,
267 pub reasons: InvalidationReasonSet,
268 #[allow(dead_code)]
269 placeholder_for_future_fields: (),
270}
271
272#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
273pub enum TaskPersistence {
274 Persistent,
276
277 Transient,
284}
285
286impl Display for TaskPersistence {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 match self {
289 TaskPersistence::Persistent => write!(f, "persistent"),
290 TaskPersistence::Transient => write!(f, "transient"),
291 }
292 }
293}
294
295#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
296pub enum ReadConsistency {
297 #[default]
300 Eventual,
301 Strong,
306}
307
308#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
309pub enum ReadTracking {
310 #[default]
312 Tracked,
313 TrackOnlyError,
318 Untracked,
323}
324
325impl ReadTracking {
326 pub fn should_track(&self, is_err: bool) -> bool {
327 match self {
328 ReadTracking::Tracked => true,
329 ReadTracking::TrackOnlyError => is_err,
330 ReadTracking::Untracked => false,
331 }
332 }
333}
334
335impl Display for ReadTracking {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 match self {
338 ReadTracking::Tracked => write!(f, "tracked"),
339 ReadTracking::TrackOnlyError => write!(f, "track only error"),
340 ReadTracking::Untracked => write!(f, "untracked"),
341 }
342 }
343}
344
345pub struct TurboTasks<B: Backend + 'static> {
346 this: Weak<Self>,
347 backend: B,
348 task_id_factory: IdFactoryWithReuse<TaskId>,
349 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
350 execution_id_factory: IdFactory<ExecutionId>,
351 stopped: AtomicBool,
352 currently_scheduled_foreground_jobs: AtomicUsize,
353 currently_scheduled_background_jobs: AtomicUsize,
354 scheduled_tasks: AtomicUsize,
355 start: Mutex<Option<Instant>>,
356 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
357 event_foreground_start: Event,
359 event_foreground_done: Event,
362 event_background_done: Event,
364 program_start: Instant,
365 compilation_events: CompilationEventQueue,
366}
367
368struct CurrentTaskState {
377 task_id: Option<TaskId>,
378 execution_id: ExecutionId,
379
380 stateful: bool,
382
383 has_invalidator: bool,
385
386 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
391
392 local_tasks: Vec<LocalTask>,
394
395 local_task_tracker: TaskTracker,
398}
399
400impl CurrentTaskState {
401 fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
402 Self {
403 task_id: Some(task_id),
404 execution_id,
405 stateful: false,
406 has_invalidator: false,
407 cell_counters: Some(AutoMap::default()),
408 local_tasks: Vec::new(),
409 local_task_tracker: TaskTracker::new(),
410 }
411 }
412
413 fn new_temporary(execution_id: ExecutionId) -> Self {
414 Self {
415 task_id: None,
416 execution_id,
417 stateful: false,
418 has_invalidator: false,
419 cell_counters: None,
420 local_tasks: Vec::new(),
421 local_task_tracker: TaskTracker::new(),
422 }
423 }
424
425 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
426 if self.execution_id != expected_execution_id {
427 panic!(
428 "Local tasks can only be scheduled/awaited within the same execution of the \
429 parent task that created them"
430 );
431 }
432 }
433
434 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
435 self.local_tasks.push(local_task);
436 if cfg!(debug_assertions) {
438 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
439 } else {
440 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
441 }
442 }
443
444 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
445 &self.local_tasks[(*local_task_id as usize) - 1]
447 }
448
449 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
450 &mut self.local_tasks[(*local_task_id as usize) - 1]
451 }
452}
453
454task_local! {
456 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
458
459 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
460}
461
462impl<B: Backend + 'static> TurboTasks<B> {
463 pub fn new(backend: B) -> Arc<Self> {
469 let task_id_factory = IdFactoryWithReuse::new(
470 TaskId::MIN,
471 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
472 );
473 let transient_task_id_factory =
474 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
475 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
476 let this = Arc::new_cyclic(|this| Self {
477 this: this.clone(),
478 backend,
479 task_id_factory,
480 transient_task_id_factory,
481 execution_id_factory,
482 stopped: AtomicBool::new(false),
483 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
484 currently_scheduled_background_jobs: AtomicUsize::new(0),
485 scheduled_tasks: AtomicUsize::new(0),
486 start: Default::default(),
487 aggregated_update: Default::default(),
488 event_foreground_done: Event::new(|| {
489 || "TurboTasks::event_foreground_done".to_string()
490 }),
491 event_foreground_start: Event::new(|| {
492 || "TurboTasks::event_foreground_start".to_string()
493 }),
494 event_background_done: Event::new(|| {
495 || "TurboTasks::event_background_done".to_string()
496 }),
497 program_start: Instant::now(),
498 compilation_events: CompilationEventQueue::default(),
499 });
500 this.backend.startup(&*this);
501 this
502 }
503
504 pub fn pin(&self) -> Arc<Self> {
505 self.this.upgrade().unwrap()
506 }
507
508 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
510 where
511 T: ?Sized,
512 F: Fn() -> Fut + Send + Sync + Clone + 'static,
513 Fut: Future<Output = Result<Vc<T>>> + Send,
514 {
515 let id = self.backend.create_transient_task(
516 TransientTaskType::Root(Box::new(move || {
517 let functor = functor.clone();
518 Box::pin(async move {
519 let raw_vc = functor().await?.node;
520 raw_vc.to_non_local().await
521 })
522 })),
523 self,
524 );
525 self.schedule(id);
526 id
527 }
528
529 pub fn dispose_root_task(&self, task_id: TaskId) {
530 self.backend.dispose_root_task(task_id, self);
531 }
532
533 #[track_caller]
537 fn spawn_once_task<T, Fut>(&self, future: Fut)
538 where
539 T: ?Sized,
540 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
541 {
542 let id = self.backend.create_transient_task(
543 TransientTaskType::Once(Box::pin(async move {
544 let raw_vc = future.await?.node;
545 raw_vc.to_non_local().await
546 })),
547 self,
548 );
549 self.schedule(id);
550 }
551
552 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
553 &self,
554 future: impl Future<Output = Result<T>> + Send + 'static,
555 ) -> Result<T> {
556 let (tx, rx) = tokio::sync::oneshot::channel();
557 self.spawn_once_task(async move {
558 let result = future.await;
559 tx.send(result)
560 .map_err(|_| anyhow!("unable to send result"))?;
561 Ok(Completion::new())
562 });
563
564 rx.await?
565 }
566
567 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
568 pub async fn run<T: TraceRawVcs + Send + 'static>(
569 &self,
570 future: impl Future<Output = Result<T>> + Send + 'static,
571 ) -> Result<T, TurboTasksExecutionError> {
572 self.begin_foreground_job();
573 let execution_id = self.execution_id_factory.wrapping_get();
575 let current_task_state =
576 Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
577
578 let result = TURBO_TASKS
579 .scope(
580 self.pin(),
581 CURRENT_TASK_STATE.scope(current_task_state, async {
582 let result = CaptureFuture::new(future).await;
583
584 let ltt =
586 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
587 ltt.close();
588 ltt.wait().await;
589
590 match result {
591 Ok(Ok(raw_vc)) => Ok(raw_vc),
592 Ok(Err(err)) => Err(err.into()),
593 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
594 }
595 }),
596 )
597 .await;
598 self.finish_foreground_job();
599 result
600 }
601
602 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
603 let this = self.pin();
604 tokio::spawn(async move {
605 this.pin()
606 .run_once(async move {
607 this.finish_foreground_job();
608 future.await;
609 this.begin_foreground_job();
610 Ok(())
611 })
612 .await
613 .unwrap()
614 });
615 }
616
617 pub(crate) fn native_call(
618 &self,
619 native_fn: &'static NativeFunction,
620 this: Option<RawVc>,
621 arg: Box<dyn MagicAny>,
622 persistence: TaskPersistence,
623 ) -> RawVc {
624 let task_type = CachedTaskType {
625 native_fn,
626 this,
627 arg,
628 };
629 RawVc::TaskOutput(match persistence {
630 TaskPersistence::Transient => self.backend.get_or_create_transient_task(
631 task_type,
632 current_task_if_available("turbo_function calls"),
633 self,
634 ),
635 TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
636 task_type,
637 current_task_if_available("turbo_function calls"),
638 self,
639 ),
640 })
641 }
642
643 pub fn dynamic_call(
644 &self,
645 native_fn: &'static NativeFunction,
646 this: Option<RawVc>,
647 arg: Box<dyn MagicAny>,
648 persistence: TaskPersistence,
649 ) -> RawVc {
650 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
651 return self.native_call(native_fn, this, arg, persistence);
652 }
653 let task_type = LocalTaskSpec {
654 task_type: LocalTaskType::ResolveNative { native_fn },
655 this,
656 arg,
657 };
658 self.schedule_local_task(task_type, persistence)
659 }
660
661 pub fn trait_call(
662 &self,
663 trait_method: &'static TraitMethod,
664 this: RawVc,
665 arg: Box<dyn MagicAny>,
666 persistence: TaskPersistence,
667 ) -> RawVc {
668 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
672 match registry::get_value_type(type_id).get_trait_method(trait_method) {
673 Some(native_fn) => {
674 let arg = native_fn.arg_meta.filter_owned(arg);
675 return self.dynamic_call(native_fn, Some(this), arg, persistence);
676 }
677 None => {
678 }
682 }
683 }
684
685 let task_type = LocalTaskSpec {
687 task_type: LocalTaskType::ResolveTrait { trait_method },
688 this: Some(this),
689 arg,
690 };
691
692 self.schedule_local_task(task_type, persistence)
693 }
694
695 #[track_caller]
696 pub(crate) fn schedule(&self, task_id: TaskId) {
697 self.begin_foreground_job();
698 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
699
700 let this = self.pin();
701 let future = async move {
702 let mut schedule_again = true;
703 while schedule_again {
704 let execution_id = this.execution_id_factory.wrapping_get();
706 let current_task_state =
707 Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
708 let single_execution_future = async {
709 if this.stopped.load(Ordering::Acquire) {
710 this.backend.task_execution_canceled(task_id, &*this);
711 return false;
712 }
713
714 let Some(TaskExecutionSpec { future, span }) =
715 this.backend.try_start_task_execution(task_id, &*this)
716 else {
717 return false;
718 };
719
720 async {
721 let result = CaptureFuture::new(future).await;
722
723 let ltt = CURRENT_TASK_STATE
725 .with(|ts| ts.read().unwrap().local_task_tracker.clone());
726 ltt.close();
727 ltt.wait().await;
728
729 let result = match result {
730 Ok(Ok(raw_vc)) => Ok(raw_vc),
731 Ok(Err(err)) => Err(err.into()),
732 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
733 };
734
735 let FinishedTaskState {
736 stateful,
737 has_invalidator,
738 } = this.finish_current_task_state();
739 let cell_counters = CURRENT_TASK_STATE
740 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
741 this.backend.task_execution_completed(
742 task_id,
743 result,
744 &cell_counters,
745 stateful,
746 has_invalidator,
747 &*this,
748 )
749 }
750 .instrument(span)
751 .await
752 };
753 schedule_again = CURRENT_TASK_STATE
754 .scope(current_task_state, single_execution_future)
755 .await;
756 }
757 this.finish_foreground_job();
758 anyhow::Ok(())
759 };
760
761 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
762
763 #[cfg(feature = "tokio_tracing")]
764 {
765 let description = self.backend.get_task_description(task_id);
766 tokio::task::Builder::new()
767 .name(&description)
768 .spawn(future)
769 .unwrap();
770 }
771 #[cfg(not(feature = "tokio_tracing"))]
772 tokio::task::spawn(future);
773 }
774
775 fn schedule_local_task(
776 &self,
777 ty: LocalTaskSpec,
778 persistence: TaskPersistence,
780 ) -> RawVc {
781 let task_type = ty.task_type;
782 let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
783 .with(|gts| {
784 let mut gts_write = gts.write().unwrap();
785 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
786 done_event: Event::new(move || {
787 move || format!("LocalTask({task_type})::done_event")
788 }),
789 });
790 (
791 Arc::clone(gts),
792 gts_write.task_id,
793 gts_write.execution_id,
794 local_task_id,
795 )
796 });
797
798 #[cfg(feature = "tokio_tracing")]
799 let description = format!(
800 "[local] (parent: {}) {}",
801 self.backend.get_task_description(parent_task_id),
802 ty.task_type,
803 );
804 #[cfg(not(feature = "tokio_tracing"))]
805 let _ = parent_task_id; let this = self.pin();
808 let future = async move {
809 let span = match &ty.task_type {
810 LocalTaskType::ResolveNative { native_fn } => native_fn.resolve_span(),
811 LocalTaskType::ResolveTrait { trait_method } => trait_method.resolve_span(),
812 };
813 async move {
814 let result = match ty.task_type {
815 LocalTaskType::ResolveNative { native_fn } => {
816 LocalTaskType::run_resolve_native(
817 native_fn,
818 ty.this,
819 &*ty.arg,
820 persistence,
821 this,
822 )
823 .await
824 }
825 LocalTaskType::ResolveTrait { trait_method } => {
826 LocalTaskType::run_resolve_trait(
827 trait_method,
828 ty.this.unwrap(),
829 &*ty.arg,
830 persistence,
831 this,
832 )
833 .await
834 }
835 };
836
837 let output = match result {
838 Ok(raw_vc) => OutputContent::Link(raw_vc),
839 Err(err) => OutputContent::Error(
840 TurboTasksExecutionError::from(err).with_task_context(task_type, None),
841 ),
842 };
843
844 let local_task = LocalTask::Done { output };
845
846 let done_event = CURRENT_TASK_STATE.with(move |gts| {
847 let mut gts_write = gts.write().unwrap();
848 let scheduled_task =
849 std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
850 let LocalTask::Scheduled { done_event } = scheduled_task else {
851 panic!("local task finished, but was not in the scheduled state?");
852 };
853 done_event
854 });
855 done_event.notify(usize::MAX)
856 }
857 .instrument(span)
858 .await
859 };
860 let future = global_task_state
861 .read()
862 .unwrap()
863 .local_task_tracker
864 .track_future(future);
865 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
866 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
867
868 #[cfg(feature = "tokio_tracing")]
869 tokio::task::Builder::new()
870 .name(&description)
871 .spawn(future)
872 .unwrap();
873 #[cfg(not(feature = "tokio_tracing"))]
874 tokio::task::spawn(future);
875
876 RawVc::LocalOutput(execution_id, local_task_id, persistence)
877 }
878
879 fn begin_foreground_job(&self) {
880 if self
881 .currently_scheduled_foreground_jobs
882 .fetch_add(1, Ordering::AcqRel)
883 == 0
884 {
885 *self.start.lock().unwrap() = Some(Instant::now());
886 self.event_foreground_start.notify(usize::MAX);
887 self.backend.idle_end(self);
888 }
889 }
890
891 fn finish_foreground_job(&self) {
892 if self
893 .currently_scheduled_foreground_jobs
894 .fetch_sub(1, Ordering::AcqRel)
895 == 1
896 {
897 self.backend.idle_start(self);
898 let total = self.scheduled_tasks.load(Ordering::Acquire);
901 self.scheduled_tasks.store(0, Ordering::Release);
902 if let Some(start) = *self.start.lock().unwrap() {
903 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
904 if let Some(update) = update.as_mut() {
905 update.0 += start.elapsed();
906 update.1 += total;
907 } else {
908 *update = Some((start.elapsed(), total));
909 }
910 }
911 self.event_foreground_done.notify(usize::MAX);
912 }
913 }
914
915 fn begin_background_job(&self) {
916 self.currently_scheduled_background_jobs
917 .fetch_add(1, Ordering::Relaxed);
918 }
919
920 fn finish_background_job(&self) {
921 if self
922 .currently_scheduled_background_jobs
923 .fetch_sub(1, Ordering::Relaxed)
924 == 1
925 {
926 self.event_background_done.notify(usize::MAX);
927 }
928 }
929
930 pub fn get_in_progress_count(&self) -> usize {
931 self.currently_scheduled_foreground_jobs
932 .load(Ordering::Acquire)
933 }
934
935 pub async fn wait_task_completion(
947 &self,
948 id: TaskId,
949 consistency: ReadConsistency,
950 ) -> Result<()> {
951 read_task_output(
952 self,
953 id,
954 ReadOutputOptions {
955 tracking: ReadTracking::Untracked,
957 consistency,
958 },
959 )
960 .await?;
961 Ok(())
962 }
963
964 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
967 self.aggregated_update_info(aggregation, Duration::MAX)
968 .await
969 .unwrap()
970 }
971
972 pub async fn aggregated_update_info(
976 &self,
977 aggregation: Duration,
978 timeout: Duration,
979 ) -> Option<UpdateInfo> {
980 let listener = self
981 .event_foreground_done
982 .listen_with_note(|| || "wait for update info".to_string());
983 let wait_for_finish = {
984 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
985 if aggregation.is_zero() {
986 if let Some((duration, tasks)) = update.take() {
987 return Some(UpdateInfo {
988 duration,
989 tasks,
990 reasons: take(reason_set),
991 placeholder_for_future_fields: (),
992 });
993 } else {
994 true
995 }
996 } else {
997 update.is_none()
998 }
999 };
1000 if wait_for_finish {
1001 if timeout == Duration::MAX {
1002 listener.await;
1004 } else {
1005 let start_listener = self
1007 .event_foreground_start
1008 .listen_with_note(|| || "wait for update info".to_string());
1009 if self
1010 .currently_scheduled_foreground_jobs
1011 .load(Ordering::Acquire)
1012 == 0
1013 {
1014 start_listener.await;
1015 } else {
1016 drop(start_listener);
1017 }
1018 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1019 return None;
1021 }
1022 }
1023 }
1024 if !aggregation.is_zero() {
1025 loop {
1026 select! {
1027 () = tokio::time::sleep(aggregation) => {
1028 break;
1029 }
1030 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1031 }
1033 }
1034 }
1035 }
1036 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1037 if let Some((duration, tasks)) = update.take() {
1038 Some(UpdateInfo {
1039 duration,
1040 tasks,
1041 reasons: take(reason_set),
1042 placeholder_for_future_fields: (),
1043 })
1044 } else {
1045 panic!("aggregated_update_info must not called concurrently")
1046 }
1047 }
1048
1049 pub async fn wait_background_done(&self) {
1050 let listener = self.event_background_done.listen();
1051 if self
1052 .currently_scheduled_background_jobs
1053 .load(Ordering::Acquire)
1054 != 0
1055 {
1056 listener.await;
1057 }
1058 }
1059
1060 pub async fn stop_and_wait(&self) {
1061 turbo_tasks_future_scope(self.pin(), async move {
1062 self.backend.stopping(self);
1063 self.stopped.store(true, Ordering::Release);
1064 {
1065 let listener = self
1066 .event_foreground_done
1067 .listen_with_note(|| || "wait for stop".to_string());
1068 if self
1069 .currently_scheduled_foreground_jobs
1070 .load(Ordering::Acquire)
1071 != 0
1072 {
1073 listener.await;
1074 }
1075 }
1076 {
1077 let listener = self.event_background_done.listen();
1078 if self
1079 .currently_scheduled_background_jobs
1080 .load(Ordering::Acquire)
1081 != 0
1082 {
1083 listener.await;
1084 }
1085 }
1086 self.backend.stop(self);
1087 })
1088 .await;
1089 }
1090
1091 #[track_caller]
1092 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1093 where
1094 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1095 T::CallOnceFuture: Send,
1096 {
1097 let mut this = self.pin();
1098 this.begin_foreground_job();
1099 tokio::spawn(
1100 TURBO_TASKS
1101 .scope(this.clone(), async move {
1102 if !this.stopped.load(Ordering::Acquire) {
1103 this = func(this.clone()).await;
1104 }
1105 this.finish_foreground_job();
1106 })
1107 .in_current_span(),
1108 );
1109 }
1110
1111 #[track_caller]
1112 pub(crate) fn schedule_background_job<T>(&self, func: T)
1113 where
1114 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1115 T::CallOnceFuture: Send,
1116 {
1117 let mut this = self.pin();
1118 self.begin_background_job();
1119 tokio::spawn(
1120 TURBO_TASKS
1121 .scope(this.clone(), async move {
1122 if !this.stopped.load(Ordering::Acquire) {
1123 this = func(this).await;
1124 }
1125 this.finish_background_job();
1126 })
1127 .in_current_span(),
1128 );
1129 }
1130
1131 fn finish_current_task_state(&self) -> FinishedTaskState {
1132 let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1133 let CurrentTaskState {
1134 stateful,
1135 has_invalidator,
1136 ..
1137 } = &mut *cell.write().unwrap();
1138 (*stateful, *has_invalidator)
1139 });
1140
1141 FinishedTaskState {
1142 stateful,
1143 has_invalidator,
1144 }
1145 }
1146
1147 pub fn backend(&self) -> &B {
1148 &self.backend
1149 }
1150}
1151
1152struct FinishedTaskState {
1153 stateful: bool,
1155
1156 has_invalidator: bool,
1158}
1159
1160impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1161 fn dynamic_call(
1162 &self,
1163 native_fn: &'static NativeFunction,
1164 this: Option<RawVc>,
1165 arg: Box<dyn MagicAny>,
1166 persistence: TaskPersistence,
1167 ) -> RawVc {
1168 self.dynamic_call(native_fn, this, arg, persistence)
1169 }
1170 fn native_call(
1171 &self,
1172 native_fn: &'static NativeFunction,
1173 this: Option<RawVc>,
1174 arg: Box<dyn MagicAny>,
1175 persistence: TaskPersistence,
1176 ) -> RawVc {
1177 self.native_call(native_fn, this, arg, persistence)
1178 }
1179 fn trait_call(
1180 &self,
1181 trait_method: &'static TraitMethod,
1182 this: RawVc,
1183 arg: Box<dyn MagicAny>,
1184 persistence: TaskPersistence,
1185 ) -> RawVc {
1186 self.trait_call(trait_method, this, arg, persistence)
1187 }
1188
1189 #[track_caller]
1190 fn run(
1191 &self,
1192 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1193 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1194 let this = self.pin();
1195 Box::pin(async move { this.run(future).await })
1196 }
1197
1198 #[track_caller]
1199 fn run_once(
1200 &self,
1201 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1202 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1203 let this = self.pin();
1204 Box::pin(async move { this.run_once(future).await })
1205 }
1206
1207 #[track_caller]
1208 fn run_once_with_reason(
1209 &self,
1210 reason: StaticOrArc<dyn InvalidationReason>,
1211 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1212 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1213 {
1214 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1215 reason_set.insert(reason);
1216 }
1217 let this = self.pin();
1218 Box::pin(async move { this.run_once(future).await })
1219 }
1220
1221 #[track_caller]
1222 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1223 self.start_once_process(future)
1224 }
1225}
1226
1227impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1228 #[instrument(level = "info", skip_all, name = "invalidate")]
1229 fn invalidate(&self, task: TaskId) {
1230 self.backend.invalidate_task(task, self);
1231 }
1232
1233 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1234 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1235 {
1236 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1237 reason_set.insert(reason);
1238 }
1239 self.backend.invalidate_task(task, self);
1240 }
1241
1242 fn invalidate_serialization(&self, task: TaskId) {
1243 self.backend.invalidate_serialization(task, self);
1244 }
1245
1246 fn try_read_task_output(
1247 &self,
1248 task: TaskId,
1249 options: ReadOutputOptions,
1250 ) -> Result<Result<RawVc, EventListener>> {
1251 self.backend.try_read_task_output(
1252 task,
1253 current_task_if_available("reading Vcs"),
1254 options,
1255 self,
1256 )
1257 }
1258
1259 fn try_read_task_cell(
1260 &self,
1261 task: TaskId,
1262 index: CellId,
1263 options: ReadCellOptions,
1264 ) -> Result<Result<TypedCellContent, EventListener>> {
1265 self.backend.try_read_task_cell(
1266 task,
1267 index,
1268 current_task_if_available("reading Vcs"),
1269 options,
1270 self,
1271 )
1272 }
1273
1274 fn try_read_own_task_cell(
1275 &self,
1276 current_task: TaskId,
1277 index: CellId,
1278 options: ReadCellOptions,
1279 ) -> Result<TypedCellContent> {
1280 self.backend
1281 .try_read_own_task_cell(current_task, index, options, self)
1282 }
1283
1284 fn try_read_local_output(
1285 &self,
1286 execution_id: ExecutionId,
1287 local_task_id: LocalTaskId,
1288 ) -> Result<Result<RawVc, EventListener>> {
1289 CURRENT_TASK_STATE.with(|gts| {
1290 let gts_read = gts.read().unwrap();
1291
1292 gts_read.assert_execution_id(execution_id);
1297
1298 match gts_read.get_local_task(local_task_id) {
1299 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1300 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1301 }
1302 })
1303 }
1304
1305 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1306 self.backend.read_task_collectibles(
1307 task,
1308 trait_id,
1309 current_task_if_available("reading collectibles"),
1310 self,
1311 )
1312 }
1313
1314 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1315 self.backend.emit_collectible(
1316 trait_type,
1317 collectible,
1318 current_task("emitting collectible"),
1319 self,
1320 );
1321 }
1322
1323 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1324 self.backend.unemit_collectible(
1325 trait_type,
1326 collectible,
1327 count,
1328 current_task("emitting collectible"),
1329 self,
1330 );
1331 }
1332
1333 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1334 for (&collectible, &count) in collectibles {
1335 if count > 0 {
1336 self.backend.unemit_collectible(
1337 trait_type,
1338 collectible,
1339 count as u32,
1340 current_task("emitting collectible"),
1341 self,
1342 );
1343 }
1344 }
1345 }
1346
1347 fn read_own_task_cell(
1348 &self,
1349 task: TaskId,
1350 index: CellId,
1351 options: ReadCellOptions,
1352 ) -> Result<TypedCellContent> {
1353 self.try_read_own_task_cell(task, index, options)
1354 }
1355
1356 fn update_own_task_cell(
1357 &self,
1358 task: TaskId,
1359 index: CellId,
1360 is_serializable_cell_content: bool,
1361 content: CellContent,
1362 verification_mode: VerificationMode,
1363 ) {
1364 self.backend.update_task_cell(
1365 task,
1366 index,
1367 is_serializable_cell_content,
1368 content,
1369 verification_mode,
1370 self,
1371 );
1372 }
1373
1374 fn connect_task(&self, task: TaskId) {
1375 self.backend
1376 .connect_task(task, current_task_if_available("connecting task"), self);
1377 }
1378
1379 fn mark_own_task_as_finished(&self, task: TaskId) {
1380 self.backend.mark_own_task_as_finished(task, self);
1381 }
1382
1383 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1384 self.backend
1385 .set_own_task_aggregation_number(task, aggregation_number, self);
1386 }
1387
1388 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1389 self.backend.mark_own_task_as_session_dependent(task, self);
1390 }
1391
1392 fn detached_for_testing(
1395 &self,
1396 fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1397 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1398 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1401 let tracked_fut = {
1402 let ts = global_task_state.read().unwrap();
1403 ts.local_task_tracker.track_future(fut)
1404 };
1405 Box::pin(TURBO_TASKS.scope(
1406 turbo_tasks(),
1407 CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1408 ))
1409 }
1410
1411 fn task_statistics(&self) -> &TaskStatisticsApi {
1412 self.backend.task_statistics()
1413 }
1414
1415 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1416 let this = self.pin();
1417 Box::pin(async move {
1418 this.stop_and_wait().await;
1419 })
1420 }
1421
1422 fn subscribe_to_compilation_events(
1423 &self,
1424 event_types: Option<Vec<String>>,
1425 ) -> Receiver<Arc<dyn CompilationEvent>> {
1426 self.compilation_events.subscribe(event_types)
1427 }
1428
1429 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1430 if let Err(e) = self.compilation_events.send(event) {
1431 tracing::warn!("Failed to send compilation event: {e}");
1432 }
1433 }
1434
1435 fn is_tracking_dependencies(&self) -> bool {
1436 self.backend.is_tracking_dependencies()
1437 }
1438}
1439
1440impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1441 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1442 self.pin()
1443 }
1444 fn backend(&self) -> &B {
1445 &self.backend
1446 }
1447
1448 #[track_caller]
1449 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1450 self.schedule_background_job(async move |this| {
1451 this.backend.run_backend_job(job, &*this).await;
1452 this
1453 })
1454 }
1455
1456 #[track_caller]
1457 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1458 self.schedule_foreground_job(async move |this| {
1459 this.backend.run_backend_job(job, &*this).await;
1460 this
1461 })
1462 }
1463
1464 #[track_caller]
1465 fn schedule(&self, task: TaskId) {
1466 self.schedule(task)
1467 }
1468
1469 fn program_duration_until(&self, instant: Instant) -> Duration {
1470 instant - self.program_start
1471 }
1472
1473 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1474 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1476 }
1477
1478 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1479 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1481 }
1482
1483 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1484 unsafe { self.task_id_factory.reuse(id.into()) }
1485 }
1486
1487 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1488 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1489 }
1490
1491 fn is_idle(&self) -> bool {
1492 self.currently_scheduled_foreground_jobs
1493 .load(Ordering::Acquire)
1494 == 0
1495 }
1496}
1497
1498pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1499 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1500 Ok(id) => id,
1501 Err(_) => panic!(
1502 "{from} can only be used in the context of a turbo_tasks task execution or \
1503 turbo_tasks run"
1504 ),
1505 }
1506}
1507
1508pub(crate) fn current_task(from: &str) -> TaskId {
1509 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1510 Ok(Some(id)) => id,
1511 Ok(None) | Err(_) => {
1512 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1513 }
1514 }
1515}
1516
1517pub async fn run<T: Send + 'static>(
1518 tt: Arc<dyn TurboTasksApi>,
1519 future: impl Future<Output = Result<T>> + Send + 'static,
1520) -> Result<T> {
1521 let (tx, rx) = tokio::sync::oneshot::channel();
1522
1523 tt.run(Box::pin(async move {
1524 let result = future.await?;
1525 tx.send(result)
1526 .map_err(|_| anyhow!("unable to send result"))?;
1527 Ok(())
1528 }))
1529 .await?;
1530
1531 Ok(rx.await?)
1532}
1533
1534pub async fn run_once<T: Send + 'static>(
1535 tt: Arc<dyn TurboTasksApi>,
1536 future: impl Future<Output = Result<T>> + Send + 'static,
1537) -> Result<T> {
1538 let (tx, rx) = tokio::sync::oneshot::channel();
1539
1540 tt.run_once(Box::pin(async move {
1541 let result = future.await?;
1542 tx.send(result)
1543 .map_err(|_| anyhow!("unable to send result"))?;
1544 Ok(())
1545 }))
1546 .await?;
1547
1548 Ok(rx.await?)
1549}
1550
1551pub async fn run_once_with_reason<T: Send + 'static>(
1552 tt: Arc<dyn TurboTasksApi>,
1553 reason: impl InvalidationReason,
1554 future: impl Future<Output = Result<T>> + Send + 'static,
1555) -> Result<T> {
1556 let (tx, rx) = tokio::sync::oneshot::channel();
1557
1558 tt.run_once_with_reason(
1559 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1560 Box::pin(async move {
1561 let result = future.await?;
1562 tx.send(result)
1563 .map_err(|_| anyhow!("unable to send result"))?;
1564 Ok(())
1565 }),
1566 )
1567 .await?;
1568
1569 Ok(rx.await?)
1570}
1571
1572pub fn dynamic_call(
1574 func: &'static NativeFunction,
1575 this: Option<RawVc>,
1576 arg: Box<dyn MagicAny>,
1577 persistence: TaskPersistence,
1578) -> RawVc {
1579 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1580}
1581
1582pub fn trait_call(
1584 trait_method: &'static TraitMethod,
1585 this: RawVc,
1586 arg: Box<dyn MagicAny>,
1587 persistence: TaskPersistence,
1588) -> RawVc {
1589 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1590}
1591
1592pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1593 TURBO_TASKS.with(|arc| arc.clone())
1594}
1595
1596pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1597 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1598}
1599
1600pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1601 TURBO_TASKS.with(|arc| func(arc))
1602}
1603
1604pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1605 TURBO_TASKS.sync_scope(tt, f)
1606}
1607
1608pub fn turbo_tasks_future_scope<T>(
1609 tt: Arc<dyn TurboTasksApi>,
1610 f: impl Future<Output = T>,
1611) -> impl Future<Output = T> {
1612 TURBO_TASKS.scope(tt, f)
1613}
1614
1615pub fn with_turbo_tasks_for_testing<T>(
1616 tt: Arc<dyn TurboTasksApi>,
1617 current_task: TaskId,
1618 execution_id: ExecutionId,
1619 f: impl Future<Output = T>,
1620) -> impl Future<Output = T> {
1621 TURBO_TASKS.scope(
1622 tt,
1623 CURRENT_TASK_STATE.scope(
1624 Arc::new(RwLock::new(CurrentTaskState::new(
1625 current_task,
1626 execution_id,
1627 ))),
1628 f,
1629 ),
1630 )
1631}
1632
1633pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1638 tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1639}
1640
1641pub fn current_task_for_testing() -> Option<TaskId> {
1642 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1643}
1644
1645pub fn mark_session_dependent() {
1647 with_turbo_tasks(|tt| {
1648 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1649 });
1650}
1651
1652pub fn mark_root() {
1655 with_turbo_tasks(|tt| {
1656 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1657 });
1658}
1659
1660pub fn mark_finished() {
1663 with_turbo_tasks(|tt| {
1664 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1665 });
1666}
1667
1668pub fn mark_stateful() -> SerializationInvalidator {
1674 CURRENT_TASK_STATE.with(|cell| {
1675 let CurrentTaskState {
1676 stateful, task_id, ..
1677 } = &mut *cell.write().unwrap();
1678 *stateful = true;
1679 let Some(task_id) = *task_id else {
1680 panic!(
1681 "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1682 );
1683 };
1684 SerializationInvalidator::new(task_id)
1685 })
1686}
1687
1688pub fn mark_invalidator() {
1689 CURRENT_TASK_STATE.with(|cell| {
1690 let CurrentTaskState {
1691 has_invalidator, ..
1692 } = &mut *cell.write().unwrap();
1693 *has_invalidator = true;
1694 })
1695}
1696
1697pub fn prevent_gc() {
1698 mark_stateful();
1700}
1701
1702pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1703 with_turbo_tasks(|tt| {
1704 let raw_vc = collectible.node.node;
1705 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1706 })
1707}
1708
1709pub(crate) async fn read_task_output(
1710 this: &dyn TurboTasksApi,
1711 id: TaskId,
1712 options: ReadOutputOptions,
1713) -> Result<RawVc> {
1714 loop {
1715 match this.try_read_task_output(id, options)? {
1716 Ok(result) => return Ok(result),
1717 Err(listener) => listener.await,
1718 }
1719 }
1720}
1721
1722pub(crate) async fn read_task_cell(
1723 this: &dyn TurboTasksApi,
1724 id: TaskId,
1725 index: CellId,
1726 options: ReadCellOptions,
1727) -> Result<TypedCellContent> {
1728 loop {
1729 match this.try_read_task_cell(id, index, options)? {
1730 Ok(result) => return Ok(result),
1731 Err(listener) => listener.await,
1732 }
1733 }
1734}
1735
1736#[derive(Clone, Copy)]
1742pub struct CurrentCellRef {
1743 current_task: TaskId,
1744 index: CellId,
1745 is_serializable_cell_content: bool,
1746}
1747
1748type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1749
1750impl CurrentCellRef {
1751 pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1753 where
1754 T: VcValueType,
1755 {
1756 self.conditional_update_with_shared_reference(|old_shared_reference| {
1757 let old_ref = old_shared_reference
1758 .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1759 .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1760 let new_value = functor(old_ref)?;
1761 Some(SharedReference::new(triomphe::Arc::new(
1762 <T::Read as VcRead<T>>::value_to_repr(new_value),
1763 )))
1764 })
1765 }
1766
1767 pub fn conditional_update_with_shared_reference(
1769 &self,
1770 functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1771 ) {
1772 let tt = turbo_tasks();
1773 let cell_content = tt
1774 .read_own_task_cell(
1775 self.current_task,
1776 self.index,
1777 ReadCellOptions {
1778 tracking: ReadTracking::Untracked,
1780 is_serializable_cell_content: self.is_serializable_cell_content,
1781 final_read_hint: false,
1782 },
1783 )
1784 .ok();
1785 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1786 if let Some(update) = update {
1787 tt.update_own_task_cell(
1788 self.current_task,
1789 self.index,
1790 self.is_serializable_cell_content,
1791 CellContent(Some(update)),
1792 VerificationMode::EqualityCheck,
1793 )
1794 }
1795 }
1796
1797 pub fn compare_and_update<T>(&self, new_value: T)
1831 where
1832 T: PartialEq + VcValueType,
1833 {
1834 self.conditional_update(|old_value| {
1835 if let Some(old_value) = old_value
1836 && old_value == &new_value
1837 {
1838 return None;
1839 }
1840 Some(new_value)
1841 });
1842 }
1843
1844 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1853 where
1854 T: VcValueType + PartialEq,
1855 {
1856 fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1857 <T::Read as VcRead<T>>::repr_to_value_ref(
1858 sr.0.downcast_ref::<VcReadRepr<T>>()
1859 .expect("cannot update SharedReference of different type"),
1860 )
1861 }
1862 self.conditional_update_with_shared_reference(|old_sr| {
1863 if let Some(old_sr) = old_sr {
1864 let old_value: &T = extract_sr_value(old_sr);
1865 let new_value = extract_sr_value(&new_shared_reference);
1866 if old_value == new_value {
1867 return None;
1868 }
1869 }
1870 Some(new_shared_reference)
1871 });
1872 }
1873
1874 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
1876 where
1877 T: VcValueType,
1878 {
1879 let tt = turbo_tasks();
1880 tt.update_own_task_cell(
1881 self.current_task,
1882 self.index,
1883 self.is_serializable_cell_content,
1884 CellContent(Some(SharedReference::new(triomphe::Arc::new(
1885 <T::Read as VcRead<T>>::value_to_repr(new_value),
1886 )))),
1887 verification_mode,
1888 )
1889 }
1890
1891 pub fn update_with_shared_reference(
1900 &self,
1901 shared_ref: SharedReference,
1902 verification_mode: VerificationMode,
1903 ) {
1904 let tt = turbo_tasks();
1905 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
1906 let content = tt
1907 .read_own_task_cell(
1908 self.current_task,
1909 self.index,
1910 ReadCellOptions {
1911 tracking: ReadTracking::Untracked,
1913 is_serializable_cell_content: self.is_serializable_cell_content,
1914 final_read_hint: false,
1915 },
1916 )
1917 .ok();
1918 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1919 shared_ref_exp != shared_ref
1921 } else {
1922 true
1923 }
1924 } else {
1925 true
1926 };
1927 if update {
1928 tt.update_own_task_cell(
1929 self.current_task,
1930 self.index,
1931 self.is_serializable_cell_content,
1932 CellContent(Some(shared_ref)),
1933 verification_mode,
1934 )
1935 }
1936 }
1937}
1938
1939impl From<CurrentCellRef> for RawVc {
1940 fn from(cell: CurrentCellRef) -> Self {
1941 RawVc::TaskCell(cell.current_task, cell.index)
1942 }
1943}
1944
1945pub fn find_cell_by_type<T: VcValueType>() -> CurrentCellRef {
1946 find_cell_by_id(T::get_value_type_id(), T::has_serialization())
1947}
1948
1949pub fn find_cell_by_id(ty: ValueTypeId, is_serializable_cell_content: bool) -> CurrentCellRef {
1950 CURRENT_TASK_STATE.with(|ts| {
1951 let current_task = current_task("celling turbo_tasks values");
1952 let mut ts = ts.write().unwrap();
1953 let map = ts.cell_counters.as_mut().unwrap();
1954 let current_index = map.entry(ty).or_default();
1955 let index = *current_index;
1956 *current_index += 1;
1957 CurrentCellRef {
1958 current_task,
1959 index: CellId { type_id: ty, index },
1960 is_serializable_cell_content,
1961 }
1962 })
1963}
1964
1965pub(crate) async fn read_local_output(
1966 this: &dyn TurboTasksApi,
1967 execution_id: ExecutionId,
1968 local_task_id: LocalTaskId,
1969) -> Result<RawVc> {
1970 loop {
1971 match this.try_read_local_output(execution_id, local_task_id)? {
1972 Ok(raw_vc) => return Ok(raw_vc),
1973 Err(event_listener) => event_listener.await,
1974 }
1975 }
1976}