1use std::{
2 fmt::Display,
3 future::Future,
4 hash::BuildHasherDefault,
5 mem::take,
6 pin::Pin,
7 sync::{
8 Arc, Mutex, RwLock, Weak,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use tokio::{select, sync::mpsc::Receiver, task_local};
19use tokio_util::task::TaskTracker;
20use tracing::{Instrument, instrument};
21
22use crate::{
23 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
24 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
25 VcValueTrait, VcValueType,
26 backend::{
27 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
28 TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
29 },
30 capture_future::CaptureFuture,
31 event::{Event, EventListener},
32 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
33 id_factory::IdFactoryWithReuse,
34 macro_helpers::NativeFunction,
35 magic_any::MagicAny,
36 message_queue::{CompilationEvent, CompilationEventQueue},
37 raw_vc::{CellId, RawVc},
38 registry,
39 serialization_invalidation::SerializationInvalidator,
40 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
41 task_statistics::TaskStatisticsApi,
42 trace::TraceRawVcs,
43 util::{IdFactory, StaticOrArc},
44};
45
46pub trait TurboTasksCallApi: Sync + Send {
49 fn dynamic_call(
52 &self,
53 native_fn: &'static NativeFunction,
54 this: Option<RawVc>,
55 arg: Box<dyn MagicAny>,
56 persistence: TaskPersistence,
57 ) -> RawVc;
58 fn native_call(
61 &self,
62 native_fn: &'static NativeFunction,
63 this: Option<RawVc>,
64 arg: Box<dyn MagicAny>,
65 persistence: TaskPersistence,
66 ) -> RawVc;
67 fn trait_call(
70 &self,
71 trait_method: &'static TraitMethod,
72 this: RawVc,
73 arg: Box<dyn MagicAny>,
74 persistence: TaskPersistence,
75 ) -> RawVc;
76
77 fn run(
78 &self,
79 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
81 fn run_once(
82 &self,
83 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
85 fn run_once_with_reason(
86 &self,
87 reason: StaticOrArc<dyn InvalidationReason>,
88 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
90 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
91}
92
93pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
99 fn invalidate(&self, task: TaskId);
100 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
101
102 fn invalidate_serialization(&self, task: TaskId);
103
104 fn try_read_task_output(
105 &self,
106 task: TaskId,
107 options: ReadOutputOptions,
108 ) -> Result<Result<RawVc, EventListener>>;
109
110 fn try_read_task_cell(
111 &self,
112 task: TaskId,
113 index: CellId,
114 options: ReadCellOptions,
115 ) -> Result<Result<TypedCellContent, EventListener>>;
116
117 fn try_read_local_output(
132 &self,
133 execution_id: ExecutionId,
134 local_task_id: LocalTaskId,
135 ) -> Result<Result<RawVc, EventListener>>;
136
137 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
138
139 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
140 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
141 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
142
143 fn try_read_own_task_cell(
146 &self,
147 current_task: TaskId,
148 index: CellId,
149 options: ReadCellOptions,
150 ) -> Result<TypedCellContent>;
151
152 fn read_own_task_cell(
153 &self,
154 task: TaskId,
155 index: CellId,
156 options: ReadCellOptions,
157 ) -> Result<TypedCellContent>;
158 fn update_own_task_cell(
159 &self,
160 task: TaskId,
161 index: CellId,
162 content: CellContent,
163 verification_mode: VerificationMode,
164 );
165 fn mark_own_task_as_finished(&self, task: TaskId);
166 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
167 fn mark_own_task_as_session_dependent(&self, task: TaskId);
168
169 fn connect_task(&self, task: TaskId);
170
171 fn detached_for_testing(
176 &self,
177 f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
178 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
179
180 fn task_statistics(&self) -> &TaskStatisticsApi;
181
182 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
183
184 fn subscribe_to_compilation_events(
185 &self,
186 event_types: Option<Vec<String>>,
187 ) -> Receiver<Arc<dyn CompilationEvent>>;
188 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
189
190 fn is_tracking_dependencies(&self) -> bool;
192}
193
194pub struct Unused<T> {
196 inner: T,
197}
198
199impl<T> Unused<T> {
200 pub unsafe fn new_unchecked(inner: T) -> Self {
206 Self { inner }
207 }
208
209 pub unsafe fn get_unchecked(&self) -> &T {
215 &self.inner
216 }
217
218 pub fn into(self) -> T {
220 self.inner
221 }
222}
223
224pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
226 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
227
228 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
229 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
230 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
234 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
238
239 fn schedule(&self, task: TaskId);
241
242 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
244
245 fn schedule_backend_background_job(&self, job: B::BackendJob);
250
251 fn program_duration_until(&self, instant: Instant) -> Duration;
253
254 fn is_idle(&self) -> bool;
256
257 fn backend(&self) -> &B;
259}
260
261#[allow(clippy::manual_non_exhaustive)]
262pub struct UpdateInfo {
263 pub duration: Duration,
264 pub tasks: usize,
265 pub reasons: InvalidationReasonSet,
266 #[allow(dead_code)]
267 placeholder_for_future_fields: (),
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
271pub enum TaskPersistence {
272 Persistent,
274
275 Transient,
282}
283
284impl Display for TaskPersistence {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 match self {
287 TaskPersistence::Persistent => write!(f, "persistent"),
288 TaskPersistence::Transient => write!(f, "transient"),
289 }
290 }
291}
292
293#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
294pub enum ReadConsistency {
295 #[default]
298 Eventual,
299 Strong,
304}
305
306#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
307pub enum ReadTracking {
308 #[default]
310 Tracked,
311 TrackOnlyError,
316 Untracked,
321}
322
323impl ReadTracking {
324 pub fn should_track(&self, is_err: bool) -> bool {
325 match self {
326 ReadTracking::Tracked => true,
327 ReadTracking::TrackOnlyError => is_err,
328 ReadTracking::Untracked => false,
329 }
330 }
331}
332
333impl Display for ReadTracking {
334 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335 match self {
336 ReadTracking::Tracked => write!(f, "tracked"),
337 ReadTracking::TrackOnlyError => write!(f, "track only error"),
338 ReadTracking::Untracked => write!(f, "untracked"),
339 }
340 }
341}
342
343pub struct TurboTasks<B: Backend + 'static> {
344 this: Weak<Self>,
345 backend: B,
346 task_id_factory: IdFactoryWithReuse<TaskId>,
347 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
348 execution_id_factory: IdFactory<ExecutionId>,
349 stopped: AtomicBool,
350 currently_scheduled_foreground_jobs: AtomicUsize,
351 currently_scheduled_background_jobs: AtomicUsize,
352 scheduled_tasks: AtomicUsize,
353 start: Mutex<Option<Instant>>,
354 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
355 event_foreground_start: Event,
357 event_foreground_done: Event,
360 event_background_done: Event,
362 program_start: Instant,
363 compilation_events: CompilationEventQueue,
364}
365
366struct CurrentTaskState {
375 task_id: Option<TaskId>,
376 execution_id: ExecutionId,
377
378 stateful: bool,
380
381 has_invalidator: bool,
383
384 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
389
390 local_tasks: Vec<LocalTask>,
392
393 local_task_tracker: TaskTracker,
396}
397
398impl CurrentTaskState {
399 fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
400 Self {
401 task_id: Some(task_id),
402 execution_id,
403 stateful: false,
404 has_invalidator: false,
405 cell_counters: Some(AutoMap::default()),
406 local_tasks: Vec::new(),
407 local_task_tracker: TaskTracker::new(),
408 }
409 }
410
411 fn new_temporary(execution_id: ExecutionId) -> Self {
412 Self {
413 task_id: None,
414 execution_id,
415 stateful: false,
416 has_invalidator: false,
417 cell_counters: None,
418 local_tasks: Vec::new(),
419 local_task_tracker: TaskTracker::new(),
420 }
421 }
422
423 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
424 if self.execution_id != expected_execution_id {
425 panic!(
426 "Local tasks can only be scheduled/awaited within the same execution of the \
427 parent task that created them"
428 );
429 }
430 }
431
432 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
433 self.local_tasks.push(local_task);
434 if cfg!(debug_assertions) {
436 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
437 } else {
438 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
439 }
440 }
441
442 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
443 &self.local_tasks[(*local_task_id as usize) - 1]
445 }
446
447 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
448 &mut self.local_tasks[(*local_task_id as usize) - 1]
449 }
450}
451
452task_local! {
454 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
456
457 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
458}
459
460impl<B: Backend + 'static> TurboTasks<B> {
461 pub fn new(backend: B) -> Arc<Self> {
467 let task_id_factory = IdFactoryWithReuse::new(
468 TaskId::MIN,
469 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
470 );
471 let transient_task_id_factory =
472 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
473 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
474 let this = Arc::new_cyclic(|this| Self {
475 this: this.clone(),
476 backend,
477 task_id_factory,
478 transient_task_id_factory,
479 execution_id_factory,
480 stopped: AtomicBool::new(false),
481 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
482 currently_scheduled_background_jobs: AtomicUsize::new(0),
483 scheduled_tasks: AtomicUsize::new(0),
484 start: Default::default(),
485 aggregated_update: Default::default(),
486 event_foreground_done: Event::new(|| {
487 || "TurboTasks::event_foreground_done".to_string()
488 }),
489 event_foreground_start: Event::new(|| {
490 || "TurboTasks::event_foreground_start".to_string()
491 }),
492 event_background_done: Event::new(|| {
493 || "TurboTasks::event_background_done".to_string()
494 }),
495 program_start: Instant::now(),
496 compilation_events: CompilationEventQueue::default(),
497 });
498 this.backend.startup(&*this);
499 this
500 }
501
502 pub fn pin(&self) -> Arc<Self> {
503 self.this.upgrade().unwrap()
504 }
505
506 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
508 where
509 T: ?Sized,
510 F: Fn() -> Fut + Send + Sync + Clone + 'static,
511 Fut: Future<Output = Result<Vc<T>>> + Send,
512 {
513 let id = self.backend.create_transient_task(
514 TransientTaskType::Root(Box::new(move || {
515 let functor = functor.clone();
516 Box::pin(async move {
517 let raw_vc = functor().await?.node;
518 raw_vc.to_non_local().await
519 })
520 })),
521 self,
522 );
523 self.schedule(id);
524 id
525 }
526
527 pub fn dispose_root_task(&self, task_id: TaskId) {
528 self.backend.dispose_root_task(task_id, self);
529 }
530
531 #[track_caller]
535 fn spawn_once_task<T, Fut>(&self, future: Fut)
536 where
537 T: ?Sized,
538 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
539 {
540 let id = self.backend.create_transient_task(
541 TransientTaskType::Once(Box::pin(async move {
542 let raw_vc = future.await?.node;
543 raw_vc.to_non_local().await
544 })),
545 self,
546 );
547 self.schedule(id);
548 }
549
550 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
551 &self,
552 future: impl Future<Output = Result<T>> + Send + 'static,
553 ) -> Result<T> {
554 let (tx, rx) = tokio::sync::oneshot::channel();
555 self.spawn_once_task(async move {
556 let result = future.await;
557 tx.send(result)
558 .map_err(|_| anyhow!("unable to send result"))?;
559 Ok(Completion::new())
560 });
561
562 rx.await?
563 }
564
565 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
566 pub async fn run<T: TraceRawVcs + Send + 'static>(
567 &self,
568 future: impl Future<Output = Result<T>> + Send + 'static,
569 ) -> Result<T, TurboTasksExecutionError> {
570 self.begin_foreground_job();
571 let execution_id = self.execution_id_factory.wrapping_get();
573 let current_task_state =
574 Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
575
576 let result = TURBO_TASKS
577 .scope(
578 self.pin(),
579 CURRENT_TASK_STATE.scope(current_task_state, async {
580 let result = CaptureFuture::new(future).await;
581
582 let ltt =
584 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
585 ltt.close();
586 ltt.wait().await;
587
588 match result {
589 Ok(Ok(raw_vc)) => Ok(raw_vc),
590 Ok(Err(err)) => Err(err.into()),
591 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
592 }
593 }),
594 )
595 .await;
596 self.finish_foreground_job();
597 result
598 }
599
600 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
601 let this = self.pin();
602 tokio::spawn(async move {
603 this.pin()
604 .run_once(async move {
605 this.finish_foreground_job();
606 future.await;
607 this.begin_foreground_job();
608 Ok(())
609 })
610 .await
611 .unwrap()
612 });
613 }
614
615 pub(crate) fn native_call(
616 &self,
617 native_fn: &'static NativeFunction,
618 this: Option<RawVc>,
619 arg: Box<dyn MagicAny>,
620 persistence: TaskPersistence,
621 ) -> RawVc {
622 let task_type = CachedTaskType {
623 native_fn,
624 this,
625 arg,
626 };
627 RawVc::TaskOutput(match persistence {
628 TaskPersistence::Transient => self.backend.get_or_create_transient_task(
629 task_type,
630 current_task_if_available("turbo_function calls"),
631 self,
632 ),
633 TaskPersistence::Persistent => self.backend.get_or_create_persistent_task(
634 task_type,
635 current_task_if_available("turbo_function calls"),
636 self,
637 ),
638 })
639 }
640
641 pub fn dynamic_call(
642 &self,
643 native_fn: &'static NativeFunction,
644 this: Option<RawVc>,
645 arg: Box<dyn MagicAny>,
646 persistence: TaskPersistence,
647 ) -> RawVc {
648 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
649 return self.native_call(native_fn, this, arg, persistence);
650 }
651 let task_type = LocalTaskSpec {
652 task_type: LocalTaskType::ResolveNative { native_fn },
653 this,
654 arg,
655 };
656 self.schedule_local_task(task_type, persistence)
657 }
658
659 pub fn trait_call(
660 &self,
661 trait_method: &'static TraitMethod,
662 this: RawVc,
663 arg: Box<dyn MagicAny>,
664 persistence: TaskPersistence,
665 ) -> RawVc {
666 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
670 match registry::get_value_type(type_id).get_trait_method(trait_method) {
671 Some(native_fn) => {
672 let arg = native_fn.arg_meta.filter_owned(arg);
673 return self.dynamic_call(native_fn, Some(this), arg, persistence);
674 }
675 None => {
676 }
680 }
681 }
682
683 let task_type = LocalTaskSpec {
685 task_type: LocalTaskType::ResolveTrait { trait_method },
686 this: Some(this),
687 arg,
688 };
689
690 self.schedule_local_task(task_type, persistence)
691 }
692
693 #[track_caller]
694 pub(crate) fn schedule(&self, task_id: TaskId) {
695 self.begin_foreground_job();
696 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
697
698 let this = self.pin();
699 let future = async move {
700 let mut schedule_again = true;
701 while schedule_again {
702 let execution_id = this.execution_id_factory.wrapping_get();
704 let current_task_state =
705 Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
706 let single_execution_future = async {
707 if this.stopped.load(Ordering::Acquire) {
708 this.backend.task_execution_canceled(task_id, &*this);
709 return false;
710 }
711
712 let Some(TaskExecutionSpec { future, span }) =
713 this.backend.try_start_task_execution(task_id, &*this)
714 else {
715 return false;
716 };
717
718 async {
719 let result = CaptureFuture::new(future).await;
720
721 let ltt = CURRENT_TASK_STATE
723 .with(|ts| ts.read().unwrap().local_task_tracker.clone());
724 ltt.close();
725 ltt.wait().await;
726
727 let result = match result {
728 Ok(Ok(raw_vc)) => Ok(raw_vc),
729 Ok(Err(err)) => Err(err.into()),
730 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
731 };
732
733 let FinishedTaskState {
734 stateful,
735 has_invalidator,
736 } = this.finish_current_task_state();
737 let cell_counters = CURRENT_TASK_STATE
738 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
739 this.backend.task_execution_completed(
740 task_id,
741 result,
742 &cell_counters,
743 stateful,
744 has_invalidator,
745 &*this,
746 )
747 }
748 .instrument(span)
749 .await
750 };
751 schedule_again = CURRENT_TASK_STATE
752 .scope(current_task_state, single_execution_future)
753 .await;
754 }
755 this.finish_foreground_job();
756 anyhow::Ok(())
757 };
758
759 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
760
761 #[cfg(feature = "tokio_tracing")]
762 {
763 let description = self.backend.get_task_description(task_id);
764 tokio::task::Builder::new()
765 .name(&description)
766 .spawn(future)
767 .unwrap();
768 }
769 #[cfg(not(feature = "tokio_tracing"))]
770 tokio::task::spawn(future);
771 }
772
773 fn schedule_local_task(
774 &self,
775 ty: LocalTaskSpec,
776 persistence: TaskPersistence,
778 ) -> RawVc {
779 let task_type = ty.task_type;
780 let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
781 .with(|gts| {
782 let mut gts_write = gts.write().unwrap();
783 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
784 done_event: Event::new(move || {
785 move || format!("LocalTask({task_type})::done_event")
786 }),
787 });
788 (
789 Arc::clone(gts),
790 gts_write.task_id,
791 gts_write.execution_id,
792 local_task_id,
793 )
794 });
795
796 #[cfg(feature = "tokio_tracing")]
797 let description = format!(
798 "[local] (parent: {}) {}",
799 self.backend.get_task_description(parent_task_id),
800 ty.task_type,
801 );
802 #[cfg(not(feature = "tokio_tracing"))]
803 let _ = parent_task_id; let this = self.pin();
806 let future = async move {
807 let span = match &ty.task_type {
808 LocalTaskType::ResolveNative { native_fn } => native_fn.resolve_span(),
809 LocalTaskType::ResolveTrait { trait_method } => trait_method.resolve_span(),
810 };
811 async move {
812 let result = match ty.task_type {
813 LocalTaskType::ResolveNative { native_fn } => {
814 LocalTaskType::run_resolve_native(
815 native_fn,
816 ty.this,
817 &*ty.arg,
818 persistence,
819 this,
820 )
821 .await
822 }
823 LocalTaskType::ResolveTrait { trait_method } => {
824 LocalTaskType::run_resolve_trait(
825 trait_method,
826 ty.this.unwrap(),
827 &*ty.arg,
828 persistence,
829 this,
830 )
831 .await
832 }
833 };
834
835 let output = match result {
836 Ok(raw_vc) => OutputContent::Link(raw_vc),
837 Err(err) => OutputContent::Error(
838 TurboTasksExecutionError::from(err).with_task_context(task_type, None),
839 ),
840 };
841
842 let local_task = LocalTask::Done { output };
843
844 let done_event = CURRENT_TASK_STATE.with(move |gts| {
845 let mut gts_write = gts.write().unwrap();
846 let scheduled_task =
847 std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
848 let LocalTask::Scheduled { done_event } = scheduled_task else {
849 panic!("local task finished, but was not in the scheduled state?");
850 };
851 done_event
852 });
853 done_event.notify(usize::MAX)
854 }
855 .instrument(span)
856 .await
857 };
858 let future = global_task_state
859 .read()
860 .unwrap()
861 .local_task_tracker
862 .track_future(future);
863 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
864 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
865
866 #[cfg(feature = "tokio_tracing")]
867 tokio::task::Builder::new()
868 .name(&description)
869 .spawn(future)
870 .unwrap();
871 #[cfg(not(feature = "tokio_tracing"))]
872 tokio::task::spawn(future);
873
874 RawVc::LocalOutput(execution_id, local_task_id, persistence)
875 }
876
877 fn begin_foreground_job(&self) {
878 if self
879 .currently_scheduled_foreground_jobs
880 .fetch_add(1, Ordering::AcqRel)
881 == 0
882 {
883 *self.start.lock().unwrap() = Some(Instant::now());
884 self.event_foreground_start.notify(usize::MAX);
885 self.backend.idle_end(self);
886 }
887 }
888
889 fn finish_foreground_job(&self) {
890 if self
891 .currently_scheduled_foreground_jobs
892 .fetch_sub(1, Ordering::AcqRel)
893 == 1
894 {
895 self.backend.idle_start(self);
896 let total = self.scheduled_tasks.load(Ordering::Acquire);
899 self.scheduled_tasks.store(0, Ordering::Release);
900 if let Some(start) = *self.start.lock().unwrap() {
901 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
902 if let Some(update) = update.as_mut() {
903 update.0 += start.elapsed();
904 update.1 += total;
905 } else {
906 *update = Some((start.elapsed(), total));
907 }
908 }
909 self.event_foreground_done.notify(usize::MAX);
910 }
911 }
912
913 fn begin_background_job(&self) {
914 self.currently_scheduled_background_jobs
915 .fetch_add(1, Ordering::Relaxed);
916 }
917
918 fn finish_background_job(&self) {
919 if self
920 .currently_scheduled_background_jobs
921 .fetch_sub(1, Ordering::Relaxed)
922 == 1
923 {
924 self.event_background_done.notify(usize::MAX);
925 }
926 }
927
928 pub fn get_in_progress_count(&self) -> usize {
929 self.currently_scheduled_foreground_jobs
930 .load(Ordering::Acquire)
931 }
932
933 pub async fn wait_task_completion(
945 &self,
946 id: TaskId,
947 consistency: ReadConsistency,
948 ) -> Result<()> {
949 read_task_output(
950 self,
951 id,
952 ReadOutputOptions {
953 tracking: ReadTracking::Untracked,
955 consistency,
956 },
957 )
958 .await?;
959 Ok(())
960 }
961
962 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
965 self.aggregated_update_info(aggregation, Duration::MAX)
966 .await
967 .unwrap()
968 }
969
970 pub async fn aggregated_update_info(
974 &self,
975 aggregation: Duration,
976 timeout: Duration,
977 ) -> Option<UpdateInfo> {
978 let listener = self
979 .event_foreground_done
980 .listen_with_note(|| || "wait for update info".to_string());
981 let wait_for_finish = {
982 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
983 if aggregation.is_zero() {
984 if let Some((duration, tasks)) = update.take() {
985 return Some(UpdateInfo {
986 duration,
987 tasks,
988 reasons: take(reason_set),
989 placeholder_for_future_fields: (),
990 });
991 } else {
992 true
993 }
994 } else {
995 update.is_none()
996 }
997 };
998 if wait_for_finish {
999 if timeout == Duration::MAX {
1000 listener.await;
1002 } else {
1003 let start_listener = self
1005 .event_foreground_start
1006 .listen_with_note(|| || "wait for update info".to_string());
1007 if self
1008 .currently_scheduled_foreground_jobs
1009 .load(Ordering::Acquire)
1010 == 0
1011 {
1012 start_listener.await;
1013 } else {
1014 drop(start_listener);
1015 }
1016 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1017 return None;
1019 }
1020 }
1021 }
1022 if !aggregation.is_zero() {
1023 loop {
1024 select! {
1025 () = tokio::time::sleep(aggregation) => {
1026 break;
1027 }
1028 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1029 }
1031 }
1032 }
1033 }
1034 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1035 if let Some((duration, tasks)) = update.take() {
1036 Some(UpdateInfo {
1037 duration,
1038 tasks,
1039 reasons: take(reason_set),
1040 placeholder_for_future_fields: (),
1041 })
1042 } else {
1043 panic!("aggregated_update_info must not called concurrently")
1044 }
1045 }
1046
1047 pub async fn wait_background_done(&self) {
1048 let listener = self.event_background_done.listen();
1049 if self
1050 .currently_scheduled_background_jobs
1051 .load(Ordering::Acquire)
1052 != 0
1053 {
1054 listener.await;
1055 }
1056 }
1057
1058 pub async fn stop_and_wait(&self) {
1059 turbo_tasks_future_scope(self.pin(), async move {
1060 self.backend.stopping(self);
1061 self.stopped.store(true, Ordering::Release);
1062 {
1063 let listener = self
1064 .event_foreground_done
1065 .listen_with_note(|| || "wait for stop".to_string());
1066 if self
1067 .currently_scheduled_foreground_jobs
1068 .load(Ordering::Acquire)
1069 != 0
1070 {
1071 listener.await;
1072 }
1073 }
1074 {
1075 let listener = self.event_background_done.listen();
1076 if self
1077 .currently_scheduled_background_jobs
1078 .load(Ordering::Acquire)
1079 != 0
1080 {
1081 listener.await;
1082 }
1083 }
1084 self.backend.stop(self);
1085 })
1086 .await;
1087 }
1088
1089 #[track_caller]
1090 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1091 where
1092 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1093 T::CallOnceFuture: Send,
1094 {
1095 let mut this = self.pin();
1096 this.begin_foreground_job();
1097 tokio::spawn(
1098 TURBO_TASKS
1099 .scope(this.clone(), async move {
1100 if !this.stopped.load(Ordering::Acquire) {
1101 this = func(this.clone()).await;
1102 }
1103 this.finish_foreground_job();
1104 })
1105 .in_current_span(),
1106 );
1107 }
1108
1109 #[track_caller]
1110 pub(crate) fn schedule_background_job<T>(&self, func: T)
1111 where
1112 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1113 T::CallOnceFuture: Send,
1114 {
1115 let mut this = self.pin();
1116 self.begin_background_job();
1117 tokio::spawn(
1118 TURBO_TASKS
1119 .scope(this.clone(), async move {
1120 if !this.stopped.load(Ordering::Acquire) {
1121 this = func(this).await;
1122 }
1123 this.finish_background_job();
1124 })
1125 .in_current_span(),
1126 );
1127 }
1128
1129 fn finish_current_task_state(&self) -> FinishedTaskState {
1130 let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1131 let CurrentTaskState {
1132 stateful,
1133 has_invalidator,
1134 ..
1135 } = &mut *cell.write().unwrap();
1136 (*stateful, *has_invalidator)
1137 });
1138
1139 FinishedTaskState {
1140 stateful,
1141 has_invalidator,
1142 }
1143 }
1144
1145 pub fn backend(&self) -> &B {
1146 &self.backend
1147 }
1148}
1149
1150struct FinishedTaskState {
1151 stateful: bool,
1153
1154 has_invalidator: bool,
1156}
1157
1158impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1159 fn dynamic_call(
1160 &self,
1161 native_fn: &'static NativeFunction,
1162 this: Option<RawVc>,
1163 arg: Box<dyn MagicAny>,
1164 persistence: TaskPersistence,
1165 ) -> RawVc {
1166 self.dynamic_call(native_fn, this, arg, persistence)
1167 }
1168 fn native_call(
1169 &self,
1170 native_fn: &'static NativeFunction,
1171 this: Option<RawVc>,
1172 arg: Box<dyn MagicAny>,
1173 persistence: TaskPersistence,
1174 ) -> RawVc {
1175 self.native_call(native_fn, this, arg, persistence)
1176 }
1177 fn trait_call(
1178 &self,
1179 trait_method: &'static TraitMethod,
1180 this: RawVc,
1181 arg: Box<dyn MagicAny>,
1182 persistence: TaskPersistence,
1183 ) -> RawVc {
1184 self.trait_call(trait_method, this, arg, persistence)
1185 }
1186
1187 #[track_caller]
1188 fn run(
1189 &self,
1190 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1191 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1192 let this = self.pin();
1193 Box::pin(async move { this.run(future).await })
1194 }
1195
1196 #[track_caller]
1197 fn run_once(
1198 &self,
1199 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1200 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1201 let this = self.pin();
1202 Box::pin(async move { this.run_once(future).await })
1203 }
1204
1205 #[track_caller]
1206 fn run_once_with_reason(
1207 &self,
1208 reason: StaticOrArc<dyn InvalidationReason>,
1209 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1210 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1211 {
1212 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1213 reason_set.insert(reason);
1214 }
1215 let this = self.pin();
1216 Box::pin(async move { this.run_once(future).await })
1217 }
1218
1219 #[track_caller]
1220 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1221 self.start_once_process(future)
1222 }
1223}
1224
1225impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1226 #[instrument(level = "info", skip_all, name = "invalidate")]
1227 fn invalidate(&self, task: TaskId) {
1228 self.backend.invalidate_task(task, self);
1229 }
1230
1231 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1232 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1233 {
1234 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1235 reason_set.insert(reason);
1236 }
1237 self.backend.invalidate_task(task, self);
1238 }
1239
1240 fn invalidate_serialization(&self, task: TaskId) {
1241 self.backend.invalidate_serialization(task, self);
1242 }
1243
1244 fn try_read_task_output(
1245 &self,
1246 task: TaskId,
1247 options: ReadOutputOptions,
1248 ) -> Result<Result<RawVc, EventListener>> {
1249 self.backend.try_read_task_output(
1250 task,
1251 current_task_if_available("reading Vcs"),
1252 options,
1253 self,
1254 )
1255 }
1256
1257 fn try_read_task_cell(
1258 &self,
1259 task: TaskId,
1260 index: CellId,
1261 options: ReadCellOptions,
1262 ) -> Result<Result<TypedCellContent, EventListener>> {
1263 self.backend.try_read_task_cell(
1264 task,
1265 index,
1266 current_task_if_available("reading Vcs"),
1267 options,
1268 self,
1269 )
1270 }
1271
1272 fn try_read_own_task_cell(
1273 &self,
1274 current_task: TaskId,
1275 index: CellId,
1276 options: ReadCellOptions,
1277 ) -> Result<TypedCellContent> {
1278 self.backend
1279 .try_read_own_task_cell(current_task, index, options, self)
1280 }
1281
1282 fn try_read_local_output(
1283 &self,
1284 execution_id: ExecutionId,
1285 local_task_id: LocalTaskId,
1286 ) -> Result<Result<RawVc, EventListener>> {
1287 CURRENT_TASK_STATE.with(|gts| {
1288 let gts_read = gts.read().unwrap();
1289
1290 gts_read.assert_execution_id(execution_id);
1295
1296 match gts_read.get_local_task(local_task_id) {
1297 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1298 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1299 }
1300 })
1301 }
1302
1303 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1304 self.backend.read_task_collectibles(
1305 task,
1306 trait_id,
1307 current_task_if_available("reading collectibles"),
1308 self,
1309 )
1310 }
1311
1312 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1313 self.backend.emit_collectible(
1314 trait_type,
1315 collectible,
1316 current_task("emitting collectible"),
1317 self,
1318 );
1319 }
1320
1321 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1322 self.backend.unemit_collectible(
1323 trait_type,
1324 collectible,
1325 count,
1326 current_task("emitting collectible"),
1327 self,
1328 );
1329 }
1330
1331 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1332 for (&collectible, &count) in collectibles {
1333 if count > 0 {
1334 self.backend.unemit_collectible(
1335 trait_type,
1336 collectible,
1337 count as u32,
1338 current_task("emitting collectible"),
1339 self,
1340 );
1341 }
1342 }
1343 }
1344
1345 fn read_own_task_cell(
1346 &self,
1347 task: TaskId,
1348 index: CellId,
1349 options: ReadCellOptions,
1350 ) -> Result<TypedCellContent> {
1351 self.try_read_own_task_cell(task, index, options)
1352 }
1353
1354 fn update_own_task_cell(
1355 &self,
1356 task: TaskId,
1357 index: CellId,
1358 content: CellContent,
1359 verification_mode: VerificationMode,
1360 ) {
1361 self.backend
1362 .update_task_cell(task, index, content, verification_mode, self);
1363 }
1364
1365 fn connect_task(&self, task: TaskId) {
1366 self.backend
1367 .connect_task(task, current_task_if_available("connecting task"), self);
1368 }
1369
1370 fn mark_own_task_as_finished(&self, task: TaskId) {
1371 self.backend.mark_own_task_as_finished(task, self);
1372 }
1373
1374 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1375 self.backend
1376 .set_own_task_aggregation_number(task, aggregation_number, self);
1377 }
1378
1379 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1380 self.backend.mark_own_task_as_session_dependent(task, self);
1381 }
1382
1383 fn detached_for_testing(
1386 &self,
1387 fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1388 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1389 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1392 let tracked_fut = {
1393 let ts = global_task_state.read().unwrap();
1394 ts.local_task_tracker.track_future(fut)
1395 };
1396 Box::pin(TURBO_TASKS.scope(
1397 turbo_tasks(),
1398 CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1399 ))
1400 }
1401
1402 fn task_statistics(&self) -> &TaskStatisticsApi {
1403 self.backend.task_statistics()
1404 }
1405
1406 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1407 let this = self.pin();
1408 Box::pin(async move {
1409 this.stop_and_wait().await;
1410 })
1411 }
1412
1413 fn subscribe_to_compilation_events(
1414 &self,
1415 event_types: Option<Vec<String>>,
1416 ) -> Receiver<Arc<dyn CompilationEvent>> {
1417 self.compilation_events.subscribe(event_types)
1418 }
1419
1420 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1421 if let Err(e) = self.compilation_events.send(event) {
1422 tracing::warn!("Failed to send compilation event: {e}");
1423 }
1424 }
1425
1426 fn is_tracking_dependencies(&self) -> bool {
1427 self.backend.is_tracking_dependencies()
1428 }
1429}
1430
1431impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1432 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1433 self.pin()
1434 }
1435 fn backend(&self) -> &B {
1436 &self.backend
1437 }
1438
1439 #[track_caller]
1440 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1441 self.schedule_background_job(async move |this| {
1442 this.backend.run_backend_job(job, &*this).await;
1443 this
1444 })
1445 }
1446
1447 #[track_caller]
1448 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1449 self.schedule_foreground_job(async move |this| {
1450 this.backend.run_backend_job(job, &*this).await;
1451 this
1452 })
1453 }
1454
1455 #[track_caller]
1456 fn schedule(&self, task: TaskId) {
1457 self.schedule(task)
1458 }
1459
1460 fn program_duration_until(&self, instant: Instant) -> Duration {
1461 instant - self.program_start
1462 }
1463
1464 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1465 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1467 }
1468
1469 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1470 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1472 }
1473
1474 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1475 unsafe { self.task_id_factory.reuse(id.into()) }
1476 }
1477
1478 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1479 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1480 }
1481
1482 fn is_idle(&self) -> bool {
1483 self.currently_scheduled_foreground_jobs
1484 .load(Ordering::Acquire)
1485 == 0
1486 }
1487}
1488
1489pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1490 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1491 Ok(id) => id,
1492 Err(_) => panic!(
1493 "{from} can only be used in the context of a turbo_tasks task execution or \
1494 turbo_tasks run"
1495 ),
1496 }
1497}
1498
1499pub(crate) fn current_task(from: &str) -> TaskId {
1500 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1501 Ok(Some(id)) => id,
1502 Ok(None) | Err(_) => {
1503 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1504 }
1505 }
1506}
1507
1508pub async fn run<T: Send + 'static>(
1509 tt: Arc<dyn TurboTasksApi>,
1510 future: impl Future<Output = Result<T>> + Send + 'static,
1511) -> Result<T> {
1512 let (tx, rx) = tokio::sync::oneshot::channel();
1513
1514 tt.run(Box::pin(async move {
1515 let result = future.await?;
1516 tx.send(result)
1517 .map_err(|_| anyhow!("unable to send result"))?;
1518 Ok(())
1519 }))
1520 .await?;
1521
1522 Ok(rx.await?)
1523}
1524
1525pub async fn run_once<T: Send + 'static>(
1526 tt: Arc<dyn TurboTasksApi>,
1527 future: impl Future<Output = Result<T>> + Send + 'static,
1528) -> Result<T> {
1529 let (tx, rx) = tokio::sync::oneshot::channel();
1530
1531 tt.run_once(Box::pin(async move {
1532 let result = future.await?;
1533 tx.send(result)
1534 .map_err(|_| anyhow!("unable to send result"))?;
1535 Ok(())
1536 }))
1537 .await?;
1538
1539 Ok(rx.await?)
1540}
1541
1542pub async fn run_once_with_reason<T: Send + 'static>(
1543 tt: Arc<dyn TurboTasksApi>,
1544 reason: impl InvalidationReason,
1545 future: impl Future<Output = Result<T>> + Send + 'static,
1546) -> Result<T> {
1547 let (tx, rx) = tokio::sync::oneshot::channel();
1548
1549 tt.run_once_with_reason(
1550 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1551 Box::pin(async move {
1552 let result = future.await?;
1553 tx.send(result)
1554 .map_err(|_| anyhow!("unable to send result"))?;
1555 Ok(())
1556 }),
1557 )
1558 .await?;
1559
1560 Ok(rx.await?)
1561}
1562
1563pub fn dynamic_call(
1565 func: &'static NativeFunction,
1566 this: Option<RawVc>,
1567 arg: Box<dyn MagicAny>,
1568 persistence: TaskPersistence,
1569) -> RawVc {
1570 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1571}
1572
1573pub fn trait_call(
1575 trait_method: &'static TraitMethod,
1576 this: RawVc,
1577 arg: Box<dyn MagicAny>,
1578 persistence: TaskPersistence,
1579) -> RawVc {
1580 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1581}
1582
1583pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1584 TURBO_TASKS.with(|arc| arc.clone())
1585}
1586
1587pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1588 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1589}
1590
1591pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1592 TURBO_TASKS.with(|arc| func(arc))
1593}
1594
1595pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1596 TURBO_TASKS.sync_scope(tt, f)
1597}
1598
1599pub fn turbo_tasks_future_scope<T>(
1600 tt: Arc<dyn TurboTasksApi>,
1601 f: impl Future<Output = T>,
1602) -> impl Future<Output = T> {
1603 TURBO_TASKS.scope(tt, f)
1604}
1605
1606pub fn with_turbo_tasks_for_testing<T>(
1607 tt: Arc<dyn TurboTasksApi>,
1608 current_task: TaskId,
1609 execution_id: ExecutionId,
1610 f: impl Future<Output = T>,
1611) -> impl Future<Output = T> {
1612 TURBO_TASKS.scope(
1613 tt,
1614 CURRENT_TASK_STATE.scope(
1615 Arc::new(RwLock::new(CurrentTaskState::new(
1616 current_task,
1617 execution_id,
1618 ))),
1619 f,
1620 ),
1621 )
1622}
1623
1624pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1629 tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1630}
1631
1632pub fn current_task_for_testing() -> Option<TaskId> {
1633 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1634}
1635
1636pub fn mark_session_dependent() {
1638 with_turbo_tasks(|tt| {
1639 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1640 });
1641}
1642
1643pub fn mark_root() {
1646 with_turbo_tasks(|tt| {
1647 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1648 });
1649}
1650
1651pub fn mark_finished() {
1654 with_turbo_tasks(|tt| {
1655 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1656 });
1657}
1658
1659pub fn mark_stateful() -> SerializationInvalidator {
1665 CURRENT_TASK_STATE.with(|cell| {
1666 let CurrentTaskState {
1667 stateful, task_id, ..
1668 } = &mut *cell.write().unwrap();
1669 *stateful = true;
1670 let Some(task_id) = *task_id else {
1671 panic!(
1672 "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1673 );
1674 };
1675 SerializationInvalidator::new(task_id)
1676 })
1677}
1678
1679pub fn mark_invalidator() {
1680 CURRENT_TASK_STATE.with(|cell| {
1681 let CurrentTaskState {
1682 has_invalidator, ..
1683 } = &mut *cell.write().unwrap();
1684 *has_invalidator = true;
1685 })
1686}
1687
1688pub fn prevent_gc() {
1689 mark_stateful();
1691}
1692
1693pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1694 with_turbo_tasks(|tt| {
1695 let raw_vc = collectible.node.node;
1696 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1697 })
1698}
1699
1700pub(crate) async fn read_task_output(
1701 this: &dyn TurboTasksApi,
1702 id: TaskId,
1703 options: ReadOutputOptions,
1704) -> Result<RawVc> {
1705 loop {
1706 match this.try_read_task_output(id, options)? {
1707 Ok(result) => return Ok(result),
1708 Err(listener) => listener.await,
1709 }
1710 }
1711}
1712
1713pub(crate) async fn read_task_cell(
1714 this: &dyn TurboTasksApi,
1715 id: TaskId,
1716 index: CellId,
1717 options: ReadCellOptions,
1718) -> Result<TypedCellContent> {
1719 loop {
1720 match this.try_read_task_cell(id, index, options)? {
1721 Ok(result) => return Ok(result),
1722 Err(listener) => listener.await,
1723 }
1724 }
1725}
1726
1727#[derive(Clone, Copy, Serialize, Deserialize)]
1733pub struct CurrentCellRef {
1734 current_task: TaskId,
1735 index: CellId,
1736}
1737
1738type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1739
1740impl CurrentCellRef {
1741 pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1743 where
1744 T: VcValueType,
1745 {
1746 self.conditional_update_with_shared_reference(|old_shared_reference| {
1747 let old_ref = old_shared_reference
1748 .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1749 .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1750 let new_value = functor(old_ref)?;
1751 Some(SharedReference::new(triomphe::Arc::new(
1752 <T::Read as VcRead<T>>::value_to_repr(new_value),
1753 )))
1754 })
1755 }
1756
1757 pub fn conditional_update_with_shared_reference(
1759 &self,
1760 functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1761 ) {
1762 let tt = turbo_tasks();
1763 let cell_content = tt
1764 .read_own_task_cell(
1765 self.current_task,
1766 self.index,
1767 ReadCellOptions {
1768 tracking: ReadTracking::Untracked,
1770 ..Default::default()
1771 },
1772 )
1773 .ok();
1774 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1775 if let Some(update) = update {
1776 tt.update_own_task_cell(
1777 self.current_task,
1778 self.index,
1779 CellContent(Some(update)),
1780 VerificationMode::EqualityCheck,
1781 )
1782 }
1783 }
1784
1785 pub fn compare_and_update<T>(&self, new_value: T)
1819 where
1820 T: PartialEq + VcValueType,
1821 {
1822 self.conditional_update(|old_value| {
1823 if let Some(old_value) = old_value
1824 && old_value == &new_value
1825 {
1826 return None;
1827 }
1828 Some(new_value)
1829 });
1830 }
1831
1832 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1841 where
1842 T: VcValueType + PartialEq,
1843 {
1844 fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1845 <T::Read as VcRead<T>>::repr_to_value_ref(
1846 sr.0.downcast_ref::<VcReadRepr<T>>()
1847 .expect("cannot update SharedReference of different type"),
1848 )
1849 }
1850 self.conditional_update_with_shared_reference(|old_sr| {
1851 if let Some(old_sr) = old_sr {
1852 let old_value: &T = extract_sr_value(old_sr);
1853 let new_value = extract_sr_value(&new_shared_reference);
1854 if old_value == new_value {
1855 return None;
1856 }
1857 }
1858 Some(new_shared_reference)
1859 });
1860 }
1861
1862 pub fn update<T>(&self, new_value: T, verification_mode: VerificationMode)
1864 where
1865 T: VcValueType,
1866 {
1867 let tt = turbo_tasks();
1868 tt.update_own_task_cell(
1869 self.current_task,
1870 self.index,
1871 CellContent(Some(SharedReference::new(triomphe::Arc::new(
1872 <T::Read as VcRead<T>>::value_to_repr(new_value),
1873 )))),
1874 verification_mode,
1875 )
1876 }
1877
1878 pub fn update_with_shared_reference(
1887 &self,
1888 shared_ref: SharedReference,
1889 verification_mode: VerificationMode,
1890 ) {
1891 let tt = turbo_tasks();
1892 let update = if matches!(verification_mode, VerificationMode::EqualityCheck) {
1893 let content = tt
1894 .read_own_task_cell(
1895 self.current_task,
1896 self.index,
1897 ReadCellOptions {
1898 tracking: ReadTracking::Untracked,
1900 ..Default::default()
1901 },
1902 )
1903 .ok();
1904 if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1905 shared_ref_exp != shared_ref
1907 } else {
1908 true
1909 }
1910 } else {
1911 true
1912 };
1913 if update {
1914 tt.update_own_task_cell(
1915 self.current_task,
1916 self.index,
1917 CellContent(Some(shared_ref)),
1918 verification_mode,
1919 )
1920 }
1921 }
1922}
1923
1924impl From<CurrentCellRef> for RawVc {
1925 fn from(cell: CurrentCellRef) -> Self {
1926 RawVc::TaskCell(cell.current_task, cell.index)
1927 }
1928}
1929
1930pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
1931 CURRENT_TASK_STATE.with(|ts| {
1932 let current_task = current_task("celling turbo_tasks values");
1933 let mut ts = ts.write().unwrap();
1934 let map = ts.cell_counters.as_mut().unwrap();
1935 let current_index = map.entry(ty).or_default();
1936 let index = *current_index;
1937 *current_index += 1;
1938 CurrentCellRef {
1939 current_task,
1940 index: CellId { type_id: ty, index },
1941 }
1942 })
1943}
1944
1945pub(crate) async fn read_local_output(
1946 this: &dyn TurboTasksApi,
1947 execution_id: ExecutionId,
1948 local_task_id: LocalTaskId,
1949) -> Result<RawVc> {
1950 loop {
1951 match this.try_read_local_output(execution_id, local_task_id)? {
1952 Ok(raw_vc) => return Ok(raw_vc),
1953 Err(event_listener) => event_listener.await,
1954 }
1955 }
1956}