1use std::{
2 fmt::Display,
3 future::Future,
4 hash::BuildHasherDefault,
5 mem::take,
6 pin::Pin,
7 sync::{
8 Arc, Mutex, RwLock, Weak,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use tokio::{select, sync::mpsc::Receiver, task_local};
19use tokio_util::task::TaskTracker;
20use tracing::{Instrument, instrument};
21
22use crate::{
23 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
24 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
25 VcValueTrait, VcValueType,
26 backend::{
27 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
28 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
29 },
30 capture_future::CaptureFuture,
31 event::{Event, EventListener},
32 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
33 id_factory::IdFactoryWithReuse,
34 macro_helpers::NativeFunction,
35 magic_any::MagicAny,
36 message_queue::{CompilationEvent, CompilationEventQueue},
37 raw_vc::{CellId, RawVc},
38 registry,
39 serialization_invalidation::SerializationInvalidator,
40 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
41 task_statistics::TaskStatisticsApi,
42 trace::TraceRawVcs,
43 util::{IdFactory, StaticOrArc},
44};
45
46pub trait TurboTasksCallApi: Sync + Send {
49 fn dynamic_call(
52 &self,
53 native_fn: &'static NativeFunction,
54 this: Option<RawVc>,
55 arg: Box<dyn MagicAny>,
56 persistence: TaskPersistence,
57 ) -> RawVc;
58 fn native_call(
61 &self,
62 native_fn: &'static NativeFunction,
63 this: Option<RawVc>,
64 arg: Box<dyn MagicAny>,
65 persistence: TaskPersistence,
66 ) -> RawVc;
67 fn trait_call(
70 &self,
71 trait_method: &'static TraitMethod,
72 this: RawVc,
73 arg: Box<dyn MagicAny>,
74 persistence: TaskPersistence,
75 ) -> RawVc;
76
77 fn run(
78 &self,
79 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
81 fn run_once(
82 &self,
83 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
85 fn run_once_with_reason(
86 &self,
87 reason: StaticOrArc<dyn InvalidationReason>,
88 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
90 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
91}
92
93pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
99 fn invalidate(&self, task: TaskId);
100 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
101
102 fn invalidate_serialization(&self, task: TaskId);
103
104 fn try_read_task_output(
105 &self,
106 task: TaskId,
107 options: ReadOutputOptions,
108 ) -> Result<Result<RawVc, EventListener>>;
109
110 fn try_read_task_cell(
111 &self,
112 task: TaskId,
113 index: CellId,
114 options: ReadCellOptions,
115 ) -> Result<Result<TypedCellContent, EventListener>>;
116
117 fn try_read_local_output(
132 &self,
133 execution_id: ExecutionId,
134 local_task_id: LocalTaskId,
135 ) -> Result<Result<RawVc, EventListener>>;
136
137 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
138
139 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
140 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
141 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
142
143 fn try_read_own_task_cell(
146 &self,
147 current_task: TaskId,
148 index: CellId,
149 options: ReadCellOptions,
150 ) -> Result<TypedCellContent>;
151
152 fn read_own_task_cell(
153 &self,
154 task: TaskId,
155 index: CellId,
156 options: ReadCellOptions,
157 ) -> Result<TypedCellContent>;
158 fn update_own_task_cell(
159 &self,
160 task: TaskId,
161 index: CellId,
162 content: CellContent,
163 verification_mode: VerificationMode,
164 );
165 fn mark_own_task_as_finished(&self, task: TaskId);
166 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
167 fn mark_own_task_as_session_dependent(&self, task: TaskId);
168
169 fn connect_task(&self, task: TaskId);
170
171 fn detached_for_testing(
176 &self,
177 f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
178 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
179
180 fn task_statistics(&self) -> &TaskStatisticsApi;
181
182 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
183
184 fn subscribe_to_compilation_events(
185 &self,
186 event_types: Option<Vec<String>>,
187 ) -> Receiver<Arc<dyn CompilationEvent>>;
188 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
189
190 fn is_tracking_dependencies(&self) -> bool;
192}
193
194pub struct Unused<T> {
196 inner: T,
197}
198
199impl<T> Unused<T> {
200 pub unsafe fn new_unchecked(inner: T) -> Self {
206 Self { inner }
207 }
208
209 pub unsafe fn get_unchecked(&self) -> &T {
215 &self.inner
216 }
217
218 pub fn into(self) -> T {
220 self.inner
221 }
222}
223
224pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
226 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
227
228 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
229 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
230 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
234 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
238
239 fn schedule(&self, task: TaskId);
241
242 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
244
245 fn schedule_backend_background_job(&self, job: B::BackendJob);
250
251 fn program_duration_until(&self, instant: Instant) -> Duration;
253
254 fn is_idle(&self) -> bool;
256
257 fn backend(&self) -> &B;
259}
260
261#[allow(clippy::manual_non_exhaustive)]
262pub struct UpdateInfo {
263 pub duration: Duration,
264 pub tasks: usize,
265 pub reasons: InvalidationReasonSet,
266 #[allow(dead_code)]
267 placeholder_for_future_fields: (),
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
271pub enum TaskPersistence {
272 Persistent,
274
275 Transient,
282}
283
284#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
285pub enum ReadConsistency {
286 #[default]
289 Eventual,
290 Strong,
295}
296
297#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
298pub enum ReadTracking {
299 #[default]
301 Tracked,
302 TrackOnlyError,
307 Untracked,
312}
313
314impl ReadTracking {
315 pub fn should_track(&self, is_err: bool) -> bool {
316 match self {
317 ReadTracking::Tracked => true,
318 ReadTracking::TrackOnlyError => is_err,
319 ReadTracking::Untracked => false,
320 }
321 }
322}
323
324impl Display for ReadTracking {
325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326 match self {
327 ReadTracking::Tracked => write!(f, "tracked"),
328 ReadTracking::TrackOnlyError => write!(f, "track only error"),
329 ReadTracking::Untracked => write!(f, "untracked"),
330 }
331 }
332}
333
334pub struct TurboTasks<B: Backend + 'static> {
335 this: Weak<Self>,
336 backend: B,
337 task_id_factory: IdFactoryWithReuse<TaskId>,
338 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
339 execution_id_factory: IdFactory<ExecutionId>,
340 stopped: AtomicBool,
341 currently_scheduled_foreground_jobs: AtomicUsize,
342 currently_scheduled_background_jobs: AtomicUsize,
343 scheduled_tasks: AtomicUsize,
344 start: Mutex<Option<Instant>>,
345 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
346 event_foreground_start: Event,
348 event_foreground_done: Event,
351 event_background_done: Event,
353 program_start: Instant,
354 compilation_events: CompilationEventQueue,
355}
356
357struct CurrentTaskState {
366 task_id: Option<TaskId>,
367 execution_id: ExecutionId,
368
369 stateful: bool,
371
372 has_invalidator: bool,
374
375 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
380
381 local_tasks: Vec<LocalTask>,
383
384 local_task_tracker: TaskTracker,
387}
388
389impl CurrentTaskState {
390 fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
391 Self {
392 task_id: Some(task_id),
393 execution_id,
394 stateful: false,
395 has_invalidator: false,
396 cell_counters: Some(AutoMap::default()),
397 local_tasks: Vec::new(),
398 local_task_tracker: TaskTracker::new(),
399 }
400 }
401
402 fn new_temporary(execution_id: ExecutionId) -> Self {
403 Self {
404 task_id: None,
405 execution_id,
406 stateful: false,
407 has_invalidator: false,
408 cell_counters: None,
409 local_tasks: Vec::new(),
410 local_task_tracker: TaskTracker::new(),
411 }
412 }
413
414 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
415 if self.execution_id != expected_execution_id {
416 panic!(
417 "Local tasks can only be scheduled/awaited within the same execution of the \
418 parent task that created them"
419 );
420 }
421 }
422
423 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
424 self.local_tasks.push(local_task);
425 if cfg!(debug_assertions) {
427 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
428 } else {
429 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
430 }
431 }
432
433 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
434 &self.local_tasks[(*local_task_id as usize) - 1]
436 }
437
438 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
439 &mut self.local_tasks[(*local_task_id as usize) - 1]
440 }
441}
442
443task_local! {
445 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
447
448 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
449}
450
451impl<B: Backend + 'static> TurboTasks<B> {
452 pub fn new(backend: B) -> Arc<Self> {
458 let task_id_factory = IdFactoryWithReuse::new(
459 TaskId::MIN,
460 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
461 );
462 let transient_task_id_factory =
463 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
464 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
465 let this = Arc::new_cyclic(|this| Self {
466 this: this.clone(),
467 backend,
468 task_id_factory,
469 transient_task_id_factory,
470 execution_id_factory,
471 stopped: AtomicBool::new(false),
472 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
473 currently_scheduled_background_jobs: AtomicUsize::new(0),
474 scheduled_tasks: AtomicUsize::new(0),
475 start: Default::default(),
476 aggregated_update: Default::default(),
477 event_foreground_done: Event::new(|| {
478 || "TurboTasks::event_foreground_done".to_string()
479 }),
480 event_foreground_start: Event::new(|| {
481 || "TurboTasks::event_foreground_start".to_string()
482 }),
483 event_background_done: Event::new(|| {
484 || "TurboTasks::event_background_done".to_string()
485 }),
486 program_start: Instant::now(),
487 compilation_events: CompilationEventQueue::default(),
488 });
489 this.backend.startup(&*this);
490 this
491 }
492
493 pub fn pin(&self) -> Arc<Self> {
494 self.this.upgrade().unwrap()
495 }
496
497 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
499 where
500 T: ?Sized,
501 F: Fn() -> Fut + Send + Sync + Clone + 'static,
502 Fut: Future<Output = Result<Vc<T>>> + Send,
503 {
504 let id = self.backend.create_transient_task(
505 TransientTaskType::Root(Box::new(move || {
506 let functor = functor.clone();
507 Box::pin(async move {
508 let raw_vc = functor().await?.node;
509 raw_vc.to_non_local().await
510 })
511 })),
512 self,
513 );
514 self.schedule(id);
515 id
516 }
517
518 pub fn dispose_root_task(&self, task_id: TaskId) {
519 self.backend.dispose_root_task(task_id, self);
520 }
521
522 #[track_caller]
526 fn spawn_once_task<T, Fut>(&self, future: Fut)
527 where
528 T: ?Sized,
529 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
530 {
531 let id = self.backend.create_transient_task(
532 TransientTaskType::Once(Box::pin(async move {
533 let raw_vc = future.await?.node;
534 raw_vc.to_non_local().await
535 })),
536 self,
537 );
538 self.schedule(id);
539 }
540
541 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
542 &self,
543 future: impl Future<Output = Result<T>> + Send + 'static,
544 ) -> Result<T> {
545 let (tx, rx) = tokio::sync::oneshot::channel();
546 self.spawn_once_task(async move {
547 let result = future.await;
548 tx.send(result)
549 .map_err(|_| anyhow!("unable to send result"))?;
550 Ok(Completion::new())
551 });
552
553 rx.await?
554 }
555
556 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
557 pub async fn run<T: TraceRawVcs + Send + 'static>(
558 &self,
559 future: impl Future<Output = Result<T>> + Send + 'static,
560 ) -> Result<T, TurboTasksExecutionError> {
561 self.begin_foreground_job();
562 let execution_id = self.execution_id_factory.wrapping_get();
564 let current_task_state =
565 Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
566
567 let result = TURBO_TASKS
568 .scope(
569 self.pin(),
570 CURRENT_TASK_STATE.scope(current_task_state, async {
571 let result = CaptureFuture::new(future).await;
572
573 let ltt =
575 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
576 ltt.close();
577 ltt.wait().await;
578
579 match result {
580 Ok(Ok(raw_vc)) => Ok(raw_vc),
581 Ok(Err(err)) => Err(err.into()),
582 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
583 }
584 }),
585 )
586 .await;
587 self.finish_foreground_job();
588 result
589 }
590
591 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
592 let this = self.pin();
593 tokio::spawn(async move {
594 this.pin()
595 .run_once(async move {
596 this.finish_foreground_job();
597 future.await;
598 this.begin_foreground_job();
599 Ok(())
600 })
601 .await
602 .unwrap()
603 });
604 }
605
606 pub(crate) fn native_call(
607 &self,
608 native_fn: &'static NativeFunction,
609 this: Option<RawVc>,
610 arg: Box<dyn MagicAny>,
611 persistence: TaskPersistence,
612 ) -> RawVc {
613 let task_type = CachedTaskType {
614 native_fn,
615 this,
616 arg,
617 };
618 RawVc::TaskOutput(match persistence {
619 TaskPersistence::Transient => self.backend.get_or_create_transient_task(
620 task_type,
621 current_task_if_available("turbo_function calls"),
622 self,
623 ),
624 TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
625 task_type,
626 current_task_if_available("turbo_function calls"),
627 self,
628 ),
629 })
630 }
631
632 pub fn dynamic_call(
633 &self,
634 native_fn: &'static NativeFunction,
635 this: Option<RawVc>,
636 arg: Box<dyn MagicAny>,
637 persistence: TaskPersistence,
638 ) -> RawVc {
639 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
640 return self.native_call(native_fn, this, arg, persistence);
641 }
642 let task_type = LocalTaskSpec {
643 task_type: LocalTaskType::ResolveNative { native_fn },
644 this,
645 arg,
646 };
647 self.schedule_local_task(task_type, persistence)
648 }
649
650 pub fn trait_call(
651 &self,
652 trait_method: &'static TraitMethod,
653 this: RawVc,
654 arg: Box<dyn MagicAny>,
655 persistence: TaskPersistence,
656 ) -> RawVc {
657 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
661 match registry::get_value_type(type_id).get_trait_method(trait_method) {
662 Some(native_fn) => {
663 let arg = native_fn.arg_meta.filter_owned(arg);
664 return self.dynamic_call(native_fn, Some(this), arg, persistence);
665 }
666 None => {
667 }
671 }
672 }
673
674 let task_type = LocalTaskSpec {
676 task_type: LocalTaskType::ResolveTrait { trait_method },
677 this: Some(this),
678 arg,
679 };
680
681 self.schedule_local_task(task_type, persistence)
682 }
683
684 #[track_caller]
685 pub(crate) fn schedule(&self, task_id: TaskId) {
686 self.begin_foreground_job();
687 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
688
689 let this = self.pin();
690 let future = async move {
691 let mut schedule_again = true;
692 while schedule_again {
693 let execution_id = this.execution_id_factory.wrapping_get();
695 let current_task_state =
696 Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
697 let single_execution_future = async {
698 if this.stopped.load(Ordering::Acquire) {
699 this.backend.task_execution_canceled(task_id, &*this);
700 return false;
701 }
702
703 let Some(TaskExecutionSpec { future, span }) =
704 this.backend.try_start_task_execution(task_id, &*this)
705 else {
706 return false;
707 };
708
709 async {
710 let result = CaptureFuture::new(future).await;
711
712 let ltt = CURRENT_TASK_STATE
714 .with(|ts| ts.read().unwrap().local_task_tracker.clone());
715 ltt.close();
716 ltt.wait().await;
717
718 let result = match result {
719 Ok(Ok(raw_vc)) => Ok(raw_vc),
720 Ok(Err(err)) => Err(err.into()),
721 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
722 };
723
724 let FinishedTaskState {
725 stateful,
726 has_invalidator,
727 } = this.finish_current_task_state();
728 let cell_counters = CURRENT_TASK_STATE
729 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
730 this.backend.task_execution_completed(
731 task_id,
732 result,
733 &cell_counters,
734 stateful,
735 has_invalidator,
736 &*this,
737 )
738 }
739 .instrument(span)
740 .await
741 };
742 schedule_again = CURRENT_TASK_STATE
743 .scope(current_task_state, single_execution_future)
744 .await;
745 }
746 this.finish_foreground_job();
747 anyhow::Ok(())
748 };
749
750 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
751
752 #[cfg(feature = "tokio_tracing")]
753 {
754 let description = self.backend.get_task_description(task_id);
755 tokio::task::Builder::new()
756 .name(&description)
757 .spawn(future)
758 .unwrap();
759 }
760 #[cfg(not(feature = "tokio_tracing"))]
761 tokio::task::spawn(future);
762 }
763
764 fn schedule_local_task(
765 &self,
766 ty: LocalTaskSpec,
767 persistence: TaskPersistence,
769 ) -> RawVc {
770 let task_type = ty.task_type;
771 let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
772 .with(|gts| {
773 let mut gts_write = gts.write().unwrap();
774 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
775 done_event: Event::new(move || {
776 move || format!("LocalTask({task_type})::done_event")
777 }),
778 });
779 (
780 Arc::clone(gts),
781 gts_write.task_id,
782 gts_write.execution_id,
783 local_task_id,
784 )
785 });
786
787 #[cfg(feature = "tokio_tracing")]
788 let description = format!(
789 "[local] (parent: {}) {}",
790 self.backend.get_task_description(parent_task_id),
791 ty.task_type,
792 );
793 #[cfg(not(feature = "tokio_tracing"))]
794 let _ = parent_task_id; let this = self.pin();
797 let future = async move {
798 let span = match &ty.task_type {
799 LocalTaskType::ResolveNative { native_fn } => native_fn.resolve_span(),
800 LocalTaskType::ResolveTrait { trait_method } => trait_method.resolve_span(),
801 };
802 async move {
803 let result = match ty.task_type {
804 LocalTaskType::ResolveNative { native_fn } => {
805 LocalTaskType::run_resolve_native(
806 native_fn,
807 ty.this,
808 &*ty.arg,
809 persistence,
810 this,
811 )
812 .await
813 }
814 LocalTaskType::ResolveTrait { trait_method } => {
815 LocalTaskType::run_resolve_trait(
816 trait_method,
817 ty.this.unwrap(),
818 &*ty.arg,
819 persistence,
820 this,
821 )
822 .await
823 }
824 };
825
826 let output = match result {
827 Ok(raw_vc) => OutputContent::Link(raw_vc),
828 Err(err) => OutputContent::Error(
829 TurboTasksExecutionError::from(err).with_task_context(task_type),
830 ),
831 };
832
833 let local_task = LocalTask::Done { output };
834
835 let done_event = CURRENT_TASK_STATE.with(move |gts| {
836 let mut gts_write = gts.write().unwrap();
837 let scheduled_task =
838 std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
839 let LocalTask::Scheduled { done_event } = scheduled_task else {
840 panic!("local task finished, but was not in the scheduled state?");
841 };
842 done_event
843 });
844 done_event.notify(usize::MAX)
845 }
846 .instrument(span)
847 .await
848 };
849 let future = global_task_state
850 .read()
851 .unwrap()
852 .local_task_tracker
853 .track_future(future);
854 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
855 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
856
857 #[cfg(feature = "tokio_tracing")]
858 tokio::task::Builder::new()
859 .name(&description)
860 .spawn(future)
861 .unwrap();
862 #[cfg(not(feature = "tokio_tracing"))]
863 tokio::task::spawn(future);
864
865 RawVc::LocalOutput(execution_id, local_task_id, persistence)
866 }
867
868 fn begin_foreground_job(&self) {
869 if self
870 .currently_scheduled_foreground_jobs
871 .fetch_add(1, Ordering::AcqRel)
872 == 0
873 {
874 *self.start.lock().unwrap() = Some(Instant::now());
875 self.event_foreground_start.notify(usize::MAX);
876 self.backend.idle_end(self);
877 }
878 }
879
880 fn finish_foreground_job(&self) {
881 if self
882 .currently_scheduled_foreground_jobs
883 .fetch_sub(1, Ordering::AcqRel)
884 == 1
885 {
886 self.backend.idle_start(self);
887 let total = self.scheduled_tasks.load(Ordering::Acquire);
890 self.scheduled_tasks.store(0, Ordering::Release);
891 if let Some(start) = *self.start.lock().unwrap() {
892 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
893 if let Some(update) = update.as_mut() {
894 update.0 += start.elapsed();
895 update.1 += total;
896 } else {
897 *update = Some((start.elapsed(), total));
898 }
899 }
900 self.event_foreground_done.notify(usize::MAX);
901 }
902 }
903
904 fn begin_background_job(&self) {
905 self.currently_scheduled_background_jobs
906 .fetch_add(1, Ordering::Relaxed);
907 }
908
909 fn finish_background_job(&self) {
910 if self
911 .currently_scheduled_background_jobs
912 .fetch_sub(1, Ordering::Relaxed)
913 == 1
914 {
915 self.event_background_done.notify(usize::MAX);
916 }
917 }
918
919 pub fn get_in_progress_count(&self) -> usize {
920 self.currently_scheduled_foreground_jobs
921 .load(Ordering::Acquire)
922 }
923
924 pub async fn wait_task_completion(
936 &self,
937 id: TaskId,
938 consistency: ReadConsistency,
939 ) -> Result<()> {
940 read_task_output(
941 self,
942 id,
943 ReadOutputOptions {
944 tracking: ReadTracking::Untracked,
946 consistency,
947 },
948 )
949 .await?;
950 Ok(())
951 }
952
953 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
956 self.aggregated_update_info(aggregation, Duration::MAX)
957 .await
958 .unwrap()
959 }
960
961 pub async fn aggregated_update_info(
965 &self,
966 aggregation: Duration,
967 timeout: Duration,
968 ) -> Option<UpdateInfo> {
969 let listener = self
970 .event_foreground_done
971 .listen_with_note(|| || "wait for update info".to_string());
972 let wait_for_finish = {
973 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
974 if aggregation.is_zero() {
975 if let Some((duration, tasks)) = update.take() {
976 return Some(UpdateInfo {
977 duration,
978 tasks,
979 reasons: take(reason_set),
980 placeholder_for_future_fields: (),
981 });
982 } else {
983 true
984 }
985 } else {
986 update.is_none()
987 }
988 };
989 if wait_for_finish {
990 if timeout == Duration::MAX {
991 listener.await;
993 } else {
994 let start_listener = self
996 .event_foreground_start
997 .listen_with_note(|| || "wait for update info".to_string());
998 if self
999 .currently_scheduled_foreground_jobs
1000 .load(Ordering::Acquire)
1001 == 0
1002 {
1003 start_listener.await;
1004 } else {
1005 drop(start_listener);
1006 }
1007 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1008 return None;
1010 }
1011 }
1012 }
1013 if !aggregation.is_zero() {
1014 loop {
1015 select! {
1016 () = tokio::time::sleep(aggregation) => {
1017 break;
1018 }
1019 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1020 }
1022 }
1023 }
1024 }
1025 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1026 if let Some((duration, tasks)) = update.take() {
1027 Some(UpdateInfo {
1028 duration,
1029 tasks,
1030 reasons: take(reason_set),
1031 placeholder_for_future_fields: (),
1032 })
1033 } else {
1034 panic!("aggregated_update_info must not called concurrently")
1035 }
1036 }
1037
1038 pub async fn wait_background_done(&self) {
1039 let listener = self.event_background_done.listen();
1040 if self
1041 .currently_scheduled_background_jobs
1042 .load(Ordering::Acquire)
1043 != 0
1044 {
1045 listener.await;
1046 }
1047 }
1048
1049 pub async fn stop_and_wait(&self) {
1050 turbo_tasks_future_scope(self.pin(), async move {
1051 self.backend.stopping(self);
1052 self.stopped.store(true, Ordering::Release);
1053 {
1054 let listener = self
1055 .event_foreground_done
1056 .listen_with_note(|| || "wait for stop".to_string());
1057 if self
1058 .currently_scheduled_foreground_jobs
1059 .load(Ordering::Acquire)
1060 != 0
1061 {
1062 listener.await;
1063 }
1064 }
1065 {
1066 let listener = self.event_background_done.listen();
1067 if self
1068 .currently_scheduled_background_jobs
1069 .load(Ordering::Acquire)
1070 != 0
1071 {
1072 listener.await;
1073 }
1074 }
1075 self.backend.stop(self);
1076 })
1077 .await;
1078 }
1079
1080 #[track_caller]
1081 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1082 where
1083 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1084 T::CallOnceFuture: Send,
1085 {
1086 let mut this = self.pin();
1087 this.begin_foreground_job();
1088 tokio::spawn(
1089 TURBO_TASKS
1090 .scope(this.clone(), async move {
1091 if !this.stopped.load(Ordering::Acquire) {
1092 this = func(this.clone()).await;
1093 }
1094 this.finish_foreground_job();
1095 })
1096 .in_current_span(),
1097 );
1098 }
1099
1100 #[track_caller]
1101 pub(crate) fn schedule_background_job<T>(&self, func: T)
1102 where
1103 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1104 T::CallOnceFuture: Send,
1105 {
1106 let mut this = self.pin();
1107 self.begin_background_job();
1108 tokio::spawn(
1109 TURBO_TASKS
1110 .scope(this.clone(), async move {
1111 if !this.stopped.load(Ordering::Acquire) {
1112 this = func(this).await;
1113 }
1114 this.finish_background_job();
1115 })
1116 .in_current_span(),
1117 );
1118 }
1119
1120 fn finish_current_task_state(&self) -> FinishedTaskState {
1121 let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1122 let CurrentTaskState {
1123 stateful,
1124 has_invalidator,
1125 ..
1126 } = &mut *cell.write().unwrap();
1127 (*stateful, *has_invalidator)
1128 });
1129
1130 FinishedTaskState {
1131 stateful,
1132 has_invalidator,
1133 }
1134 }
1135
1136 pub fn backend(&self) -> &B {
1137 &self.backend
1138 }
1139}
1140
1141struct FinishedTaskState {
1142 stateful: bool,
1144
1145 has_invalidator: bool,
1147}
1148
1149impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1150 fn dynamic_call(
1151 &self,
1152 native_fn: &'static NativeFunction,
1153 this: Option<RawVc>,
1154 arg: Box<dyn MagicAny>,
1155 persistence: TaskPersistence,
1156 ) -> RawVc {
1157 self.dynamic_call(native_fn, this, arg, persistence)
1158 }
1159 fn native_call(
1160 &self,
1161 native_fn: &'static NativeFunction,
1162 this: Option<RawVc>,
1163 arg: Box<dyn MagicAny>,
1164 persistence: TaskPersistence,
1165 ) -> RawVc {
1166 self.native_call(native_fn, this, arg, persistence)
1167 }
1168 fn trait_call(
1169 &self,
1170 trait_method: &'static TraitMethod,
1171 this: RawVc,
1172 arg: Box<dyn MagicAny>,
1173 persistence: TaskPersistence,
1174 ) -> RawVc {
1175 self.trait_call(trait_method, this, arg, persistence)
1176 }
1177
1178 #[track_caller]
1179 fn run(
1180 &self,
1181 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1182 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1183 let this = self.pin();
1184 Box::pin(async move { this.run(future).await })
1185 }
1186
1187 #[track_caller]
1188 fn run_once(
1189 &self,
1190 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1191 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1192 let this = self.pin();
1193 Box::pin(async move { this.run_once(future).await })
1194 }
1195
1196 #[track_caller]
1197 fn run_once_with_reason(
1198 &self,
1199 reason: StaticOrArc<dyn InvalidationReason>,
1200 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1201 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1202 {
1203 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1204 reason_set.insert(reason);
1205 }
1206 let this = self.pin();
1207 Box::pin(async move { this.run_once(future).await })
1208 }
1209
1210 #[track_caller]
1211 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1212 self.start_once_process(future)
1213 }
1214}
1215
1216impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1217 #[instrument(level = "info", skip_all, name = "invalidate")]
1218 fn invalidate(&self, task: TaskId) {
1219 self.backend.invalidate_task(task, self);
1220 }
1221
1222 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1223 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1224 {
1225 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1226 reason_set.insert(reason);
1227 }
1228 self.backend.invalidate_task(task, self);
1229 }
1230
1231 fn invalidate_serialization(&self, task: TaskId) {
1232 self.backend.invalidate_serialization(task, self);
1233 }
1234
1235 fn try_read_task_output(
1236 &self,
1237 task: TaskId,
1238 options: ReadOutputOptions,
1239 ) -> Result<Result<RawVc, EventListener>> {
1240 self.backend.try_read_task_output(
1241 task,
1242 current_task_if_available("reading Vcs"),
1243 options,
1244 self,
1245 )
1246 }
1247
1248 fn try_read_task_cell(
1249 &self,
1250 task: TaskId,
1251 index: CellId,
1252 options: ReadCellOptions,
1253 ) -> Result<Result<TypedCellContent, EventListener>> {
1254 self.backend.try_read_task_cell(
1255 task,
1256 index,
1257 current_task_if_available("reading Vcs"),
1258 options,
1259 self,
1260 )
1261 }
1262
1263 fn try_read_own_task_cell(
1264 &self,
1265 current_task: TaskId,
1266 index: CellId,
1267 options: ReadCellOptions,
1268 ) -> Result<TypedCellContent> {
1269 self.backend
1270 .try_read_own_task_cell(current_task, index, options, self)
1271 }
1272
1273 fn try_read_local_output(
1274 &self,
1275 execution_id: ExecutionId,
1276 local_task_id: LocalTaskId,
1277 ) -> Result<Result<RawVc, EventListener>> {
1278 CURRENT_TASK_STATE.with(|gts| {
1279 let gts_read = gts.read().unwrap();
1280
1281 gts_read.assert_execution_id(execution_id);
1286
1287 match gts_read.get_local_task(local_task_id) {
1288 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1289 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1290 }
1291 })
1292 }
1293
1294 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1295 self.backend.read_task_collectibles(
1296 task,
1297 trait_id,
1298 current_task_if_available("reading collectibles"),
1299 self,
1300 )
1301 }
1302
1303 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1304 self.backend.emit_collectible(
1305 trait_type,
1306 collectible,
1307 current_task("emitting collectible"),
1308 self,
1309 );
1310 }
1311
1312 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1313 self.backend.unemit_collectible(
1314 trait_type,
1315 collectible,
1316 count,
1317 current_task("emitting collectible"),
1318 self,
1319 );
1320 }
1321
1322 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1323 for (&collectible, &count) in collectibles {
1324 if count > 0 {
1325 self.backend.unemit_collectible(
1326 trait_type,
1327 collectible,
1328 count as u32,
1329 current_task("emitting collectible"),
1330 self,
1331 );
1332 }
1333 }
1334 }
1335
1336 fn read_own_task_cell(
1337 &self,
1338 task: TaskId,
1339 index: CellId,
1340 options: ReadCellOptions,
1341 ) -> Result<TypedCellContent> {
1342 self.try_read_own_task_cell(task, index, options)
1343 }
1344
1345 fn update_own_task_cell(
1346 &self,
1347 task: TaskId,
1348 index: CellId,
1349 content: CellContent,
1350 verification_mode: VerificationMode,
1351 ) {
1352 self.backend
1353 .update_task_cell(task, index, content, verification_mode, self);
1354 }
1355
1356 fn connect_task(&self, task: TaskId) {
1357 self.backend
1358 .connect_task(task, current_task_if_available("connecting task"), self);
1359 }
1360
1361 fn mark_own_task_as_finished(&self, task: TaskId) {
1362 self.backend.mark_own_task_as_finished(task, self);
1363 }
1364
1365 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1366 self.backend
1367 .set_own_task_aggregation_number(task, aggregation_number, self);
1368 }
1369
1370 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1371 self.backend.mark_own_task_as_session_dependent(task, self);
1372 }
1373
1374 fn detached_for_testing(
1377 &self,
1378 fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1379 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1380 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1383 let tracked_fut = {
1384 let ts = global_task_state.read().unwrap();
1385 ts.local_task_tracker.track_future(fut)
1386 };
1387 Box::pin(TURBO_TASKS.scope(
1388 turbo_tasks(),
1389 CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1390 ))
1391 }
1392
1393 fn task_statistics(&self) -> &TaskStatisticsApi {
1394 self.backend.task_statistics()
1395 }
1396
1397 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1398 let this = self.pin();
1399 Box::pin(async move {
1400 this.stop_and_wait().await;
1401 })
1402 }
1403
1404 fn subscribe_to_compilation_events(
1405 &self,
1406 event_types: Option<Vec<String>>,
1407 ) -> Receiver<Arc<dyn CompilationEvent>> {
1408 self.compilation_events.subscribe(event_types)
1409 }
1410
1411 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1412 if let Err(e) = self.compilation_events.send(event) {
1413 tracing::warn!("Failed to send compilation event: {e}");
1414 }
1415 }
1416
1417 fn is_tracking_dependencies(&self) -> bool {
1418 self.backend.is_tracking_dependencies()
1419 }
1420}
1421
1422impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1423 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1424 self.pin()
1425 }
1426 fn backend(&self) -> &B {
1427 &self.backend
1428 }
1429
1430 #[track_caller]
1431 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1432 self.schedule_background_job(async move |this| {
1433 this.backend.run_backend_job(job, &*this).await;
1434 this
1435 })
1436 }
1437
1438 #[track_caller]
1439 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1440 self.schedule_foreground_job(async move |this| {
1441 this.backend.run_backend_job(job, &*this).await;
1442 this
1443 })
1444 }
1445
1446 #[track_caller]
1447 fn schedule(&self, task: TaskId) {
1448 self.schedule(task)
1449 }
1450
1451 fn program_duration_until(&self, instant: Instant) -> Duration {
1452 instant - self.program_start
1453 }
1454
1455 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1456 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1458 }
1459
1460 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1461 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1463 }
1464
1465 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1466 unsafe { self.task_id_factory.reuse(id.into()) }
1467 }
1468
1469 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1470 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1471 }
1472
1473 fn is_idle(&self) -> bool {
1474 self.currently_scheduled_foreground_jobs
1475 .load(Ordering::Acquire)
1476 == 0
1477 }
1478}
1479
1480pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1481 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1482 Ok(id) => id,
1483 Err(_) => panic!(
1484 "{from} can only be used in the context of a turbo_tasks task execution or \
1485 turbo_tasks run"
1486 ),
1487 }
1488}
1489
1490pub(crate) fn current_task(from: &str) -> TaskId {
1491 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1492 Ok(Some(id)) => id,
1493 Ok(None) | Err(_) => {
1494 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1495 }
1496 }
1497}
1498
1499pub async fn run<T: Send + 'static>(
1500 tt: Arc<dyn TurboTasksApi>,
1501 future: impl Future<Output = Result<T>> + Send + 'static,
1502) -> Result<T> {
1503 let (tx, rx) = tokio::sync::oneshot::channel();
1504
1505 tt.run(Box::pin(async move {
1506 let result = future.await?;
1507 tx.send(result)
1508 .map_err(|_| anyhow!("unable to send result"))?;
1509 Ok(())
1510 }))
1511 .await?;
1512
1513 Ok(rx.await?)
1514}
1515
1516pub async fn run_once<T: Send + 'static>(
1517 tt: Arc<dyn TurboTasksApi>,
1518 future: impl Future<Output = Result<T>> + Send + 'static,
1519) -> Result<T> {
1520 let (tx, rx) = tokio::sync::oneshot::channel();
1521
1522 tt.run_once(Box::pin(async move {
1523 let result = future.await?;
1524 tx.send(result)
1525 .map_err(|_| anyhow!("unable to send result"))?;
1526 Ok(())
1527 }))
1528 .await?;
1529
1530 Ok(rx.await?)
1531}
1532
1533pub async fn run_once_with_reason<T: Send + 'static>(
1534 tt: Arc<dyn TurboTasksApi>,
1535 reason: impl InvalidationReason,
1536 future: impl Future<Output = Result<T>> + Send + 'static,
1537) -> Result<T> {
1538 let (tx, rx) = tokio::sync::oneshot::channel();
1539
1540 tt.run_once_with_reason(
1541 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1542 Box::pin(async move {
1543 let result = future.await?;
1544 tx.send(result)
1545 .map_err(|_| anyhow!("unable to send result"))?;
1546 Ok(())
1547 }),
1548 )
1549 .await?;
1550
1551 Ok(rx.await?)
1552}
1553
1554pub fn dynamic_call(
1556 func: &'static NativeFunction,
1557 this: Option<RawVc>,
1558 arg: Box<dyn MagicAny>,
1559 persistence: TaskPersistence,
1560) -> RawVc {
1561 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1562}
1563
1564pub fn trait_call(
1566 trait_method: &'static TraitMethod,
1567 this: RawVc,
1568 arg: Box<dyn MagicAny>,
1569 persistence: TaskPersistence,
1570) -> RawVc {
1571 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1572}
1573
1574pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1575 TURBO_TASKS.with(|arc| arc.clone())
1576}
1577
1578pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1579 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1580}
1581
1582pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1583 TURBO_TASKS.with(|arc| func(arc))
1584}
1585
1586pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1587 TURBO_TASKS.sync_scope(tt, f)
1588}
1589
1590pub fn turbo_tasks_future_scope<T>(
1591 tt: Arc<dyn TurboTasksApi>,
1592 f: impl Future<Output = T>,
1593) -> impl Future<Output = T> {
1594 TURBO_TASKS.scope(tt, f)
1595}
1596
1597pub fn with_turbo_tasks_for_testing<T>(
1598 tt: Arc<dyn TurboTasksApi>,
1599 current_task: TaskId,
1600 execution_id: ExecutionId,
1601 f: impl Future<Output = T>,
1602) -> impl Future<Output = T> {
1603 TURBO_TASKS.scope(
1604 tt,
1605 CURRENT_TASK_STATE.scope(
1606 Arc::new(RwLock::new(CurrentTaskState::new(
1607 current_task,
1608 execution_id,
1609 ))),
1610 f,
1611 ),
1612 )
1613}
1614
1615pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1620 tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1621}
1622
1623pub fn current_task_for_testing() -> Option<TaskId> {
1624 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1625}
1626
1627pub fn mark_session_dependent() {
1629 with_turbo_tasks(|tt| {
1630 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1631 });
1632}
1633
1634pub fn mark_root() {
1637 with_turbo_tasks(|tt| {
1638 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1639 });
1640}
1641
1642pub fn mark_finished() {
1645 with_turbo_tasks(|tt| {
1646 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1647 });
1648}
1649
1650pub fn mark_stateful() -> SerializationInvalidator {
1656 CURRENT_TASK_STATE.with(|cell| {
1657 let CurrentTaskState {
1658 stateful, task_id, ..
1659 } = &mut *cell.write().unwrap();
1660 *stateful = true;
1661 let Some(task_id) = *task_id else {
1662 panic!(
1663 "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1664 );
1665 };
1666 SerializationInvalidator::new(task_id)
1667 })
1668}
1669
1670pub fn mark_invalidator() {
1671 CURRENT_TASK_STATE.with(|cell| {
1672 let CurrentTaskState {
1673 has_invalidator, ..
1674 } = &mut *cell.write().unwrap();
1675 *has_invalidator = true;
1676 })
1677}
1678
1679pub fn prevent_gc() {
1680 mark_stateful();
1682}
1683
1684pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1685 with_turbo_tasks(|tt| {
1686 let raw_vc = collectible.node.node;
1687 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1688 })
1689}
1690
1691pub(crate) async fn read_task_output(
1692 this: &dyn TurboTasksApi,
1693 id: TaskId,
1694 options: ReadOutputOptions,
1695) -> Result<RawVc> {
1696 loop {
1697 match this.try_read_task_output(id, options)? {
1698 Ok(result) => return Ok(result),
1699 Err(listener) => listener.await,
1700 }
1701 }
1702}
1703
1704pub(crate) async fn read_task_cell(
1705 this: &dyn TurboTasksApi,
1706 id: TaskId,
1707 index: CellId,
1708 options: ReadCellOptions,
1709) -> Result<TypedCellContent> {
1710 loop {
1711 match this.try_read_task_cell(id, index, options)? {
1712 Ok(result) => return Ok(result),
1713 Err(listener) => listener.await,
1714 }
1715 }
1716}
1717
1718#[derive(Clone, Copy, Serialize, Deserialize)]
1724pub struct CurrentCellRef {
1725 current_task: TaskId,
1726 index: CellId,
1727}
1728
1729type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1730
1731impl CurrentCellRef {
1732 pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1734 where
1735 T: VcValueType,
1736 {
1737 self.conditional_update_with_shared_reference(|old_shared_reference| {
1738 let old_ref = old_shared_reference
1739 .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1740 .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1741 let new_value = functor(old_ref)?;
1742 Some(SharedReference::new(triomphe::Arc::new(
1743 <T::Read as VcRead<T>>::value_to_repr(new_value),
1744 )))
1745 })
1746 }
1747
1748 pub fn conditional_update_with_shared_reference(
1750 &self,
1751 functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1752 ) {
1753 let tt = turbo_tasks();
1754 let cell_content = tt
1755 .read_own_task_cell(
1756 self.current_task,
1757 self.index,
1758 ReadCellOptions {
1759 tracking: ReadTracking::Untracked,
1761 ..Default::default()
1762 },
1763 )
1764 .ok();
1765 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1766 if let Some(update) = update {
1767 tt.update_own_task_cell(
1768 self.current_task,
1769 self.index,
1770 CellContent(Some(update)),
1771 VerificationMode::EqualityCheck,
1772 )
1773 }
1774 }
1775
1776 pub fn compare_and_update<T>(&self, new_value: T)
1810 where
1811 T: PartialEq + VcValueType,
1812 {
1813 self.conditional_update(|old_value| {
1814 if let Some(old_value) = old_value
1815 && old_value == &new_value
1816 {
1817 return None;
1818 }
1819 Some(new_value)
1820 });
1821 }
1822
1823 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1832 where
1833 T: VcValueType + PartialEq,
1834 {
1835 fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1836 <T::Read as VcRead<T>>::repr_to_value_ref(
1837 sr.0.downcast_ref::<VcReadRepr<T>>()
1838 .expect("cannot update SharedReference of different type"),
1839 )
1840 }
1841 self.conditional_update_with_shared_reference(|old_sr| {
1842 if let Some(old_sr) = old_sr {
1843 let old_value: &T = extract_sr_value(old_sr);
1844 let new_value = extract_sr_value(&new_shared_reference);
1845 if old_value == new_value {
1846 return None;
1847 }
1848 }
1849 Some(new_shared_reference)
1850 });
1851 }
1852
1853 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
1855 where
1856 T: VcValueType,
1857 {
1858 let tt = turbo_tasks();
1859 tt.update_own_task_cell(
1860 self.current_task,
1861 self.index,
1862 CellContent(Some(SharedReference::new(triomphe::Arc::new(
1863 <T::Read as VcRead<T>>::value_to_repr(new_value),
1864 )))),
1865 verification_mode,
1866 )
1867 }
1868
1869 pub fn update_with_shared_reference(
1878 &self,
1879 shared_ref: SharedReference,
1880 verification_mode: VerificationMode,
1881 ) {
1882 let tt = turbo_tasks();
1883 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
1884 let content = tt
1885 .read_own_task_cell(
1886 self.current_task,
1887 self.index,
1888 ReadCellOptions {
1889 tracking: ReadTracking::Untracked,
1891 ..Default::default()
1892 },
1893 )
1894 .ok();
1895 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1896 shared_ref_exp != shared_ref
1898 } else {
1899 true
1900 }
1901 } else {
1902 true
1903 };
1904 if update {
1905 tt.update_own_task_cell(
1906 self.current_task,
1907 self.index,
1908 CellContent(Some(shared_ref)),
1909 verification_mode,
1910 )
1911 }
1912 }
1913}
1914
1915impl From<CurrentCellRef> for RawVc {
1916 fn from(cell: CurrentCellRef) -> Self {
1917 RawVc::TaskCell(cell.current_task, cell.index)
1918 }
1919}
1920
1921pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
1922 CURRENT_TASK_STATE.with(|ts| {
1923 let current_task = current_task("celling turbo_tasks values");
1924 let mut ts = ts.write().unwrap();
1925 let map = ts.cell_counters.as_mut().unwrap();
1926 let current_index = map.entry(ty).or_default();
1927 let index = *current_index;
1928 *current_index += 1;
1929 CurrentCellRef {
1930 current_task,
1931 index: CellId { type_id: ty, index },
1932 }
1933 })
1934}
1935
1936pub(crate) async fn read_local_output(
1937 this: &dyn TurboTasksApi,
1938 execution_id: ExecutionId,
1939 local_task_id: LocalTaskId,
1940) -> Result<RawVc> {
1941 loop {
1942 match this.try_read_local_output(execution_id, local_task_id)? {
1943 Ok(raw_vc) => return Ok(raw_vc),
1944 Err(event_listener) => event_listener.await,
1945 }
1946 }
1947}