1use std::{
2 any::Any,
3 future::Future,
4 hash::BuildHasherDefault,
5 mem::take,
6 pin::Pin,
7 sync::{
8 Arc, Mutex, RwLock, Weak,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use smallvec::SmallVec;
19use tokio::{select, sync::mpsc::Receiver, task_local};
20use tokio_util::task::TaskTracker;
21use tracing::{Instrument, Level, instrument, trace_span};
22
23use crate::{
24 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
25 ResolvedVc, SharedReference, TaskId, TaskIdSet, TraitMethod, ValueTypeId, Vc, VcRead,
26 VcValueTrait, VcValueType,
27 backend::{
28 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
29 TransientTaskType, TurboTasksExecutionError, TypedCellContent,
30 },
31 capture_future::CaptureFuture,
32 event::{Event, EventListener},
33 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
34 id_factory::IdFactoryWithReuse,
35 macro_helpers::NativeFunction,
36 magic_any::MagicAny,
37 message_queue::{CompilationEvent, CompilationEventQueue},
38 raw_vc::{CellId, RawVc},
39 registry,
40 serialization_invalidation::SerializationInvalidator,
41 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
42 task_statistics::TaskStatisticsApi,
43 trace::TraceRawVcs,
44 util::{IdFactory, StaticOrArc},
45 vc::ReadVcFuture,
46};
47
48pub trait TurboTasksCallApi: Sync + Send {
51 fn dynamic_call(
54 &self,
55 native_fn: &'static NativeFunction,
56 this: Option<RawVc>,
57 arg: Box<dyn MagicAny>,
58 persistence: TaskPersistence,
59 ) -> RawVc;
60 fn native_call(
63 &self,
64 native_fn: &'static NativeFunction,
65 this: Option<RawVc>,
66 arg: Box<dyn MagicAny>,
67 persistence: TaskPersistence,
68 ) -> RawVc;
69 fn trait_call(
72 &self,
73 trait_method: &'static TraitMethod,
74 this: RawVc,
75 arg: Box<dyn MagicAny>,
76 persistence: TaskPersistence,
77 ) -> RawVc;
78
79 fn run_once(
80 &self,
81 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
82 ) -> TaskId;
83 fn run_once_with_reason(
84 &self,
85 reason: StaticOrArc<dyn InvalidationReason>,
86 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87 ) -> TaskId;
88 fn run_once_process(
89 &self,
90 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
91 ) -> TaskId;
92}
93
94pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
100 fn invalidate(&self, task: TaskId);
101 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
102
103 fn invalidate_serialization(&self, task: TaskId);
104
105 fn notify_scheduled_tasks(&self);
108
109 fn try_read_task_output(
110 &self,
111 task: TaskId,
112 consistency: ReadConsistency,
113 ) -> Result<Result<RawVc, EventListener>>;
114
115 fn try_read_task_output_untracked(
118 &self,
119 task: TaskId,
120 consistency: ReadConsistency,
121 ) -> Result<Result<RawVc, EventListener>>;
122
123 fn try_read_task_cell(
124 &self,
125 task: TaskId,
126 index: CellId,
127 options: ReadCellOptions,
128 ) -> Result<Result<TypedCellContent, EventListener>>;
129
130 fn try_read_task_cell_untracked(
133 &self,
134 task: TaskId,
135 index: CellId,
136 options: ReadCellOptions,
137 ) -> Result<Result<TypedCellContent, EventListener>>;
138
139 fn try_read_local_output(
154 &self,
155 execution_id: ExecutionId,
156 local_task_id: LocalTaskId,
157 ) -> Result<Result<RawVc, EventListener>>;
158
159 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
160
161 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
162 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
163 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
164
165 fn try_read_own_task_cell_untracked(
168 &self,
169 current_task: TaskId,
170 index: CellId,
171 options: ReadCellOptions,
172 ) -> Result<TypedCellContent>;
173
174 fn read_own_task_cell(
175 &self,
176 task: TaskId,
177 index: CellId,
178 options: ReadCellOptions,
179 ) -> Result<TypedCellContent>;
180 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
181 fn mark_own_task_as_finished(&self, task: TaskId);
182 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
183 fn mark_own_task_as_session_dependent(&self, task: TaskId);
184
185 fn connect_task(&self, task: TaskId);
186
187 fn detached_for_testing(
192 &self,
193 f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
194 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
195
196 fn task_statistics(&self) -> &TaskStatisticsApi;
197
198 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
199
200 fn subscribe_to_compilation_events(
201 &self,
202 event_types: Option<Vec<String>>,
203 ) -> Receiver<Arc<dyn CompilationEvent>>;
204 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
205}
206
207pub struct Unused<T> {
209 inner: T,
210}
211
212impl<T> Unused<T> {
213 pub unsafe fn new_unchecked(inner: T) -> Self {
219 Self { inner }
220 }
221
222 pub unsafe fn get_unchecked(&self) -> &T {
228 &self.inner
229 }
230
231 pub fn into(self) -> T {
233 self.inner
234 }
235}
236
237pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
239 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
240
241 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
242 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
243 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
247 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
251
252 fn schedule(&self, task: TaskId);
253 fn schedule_backend_background_job(&self, job: B::BackendJob);
254 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
255
256 fn try_foreground_done(&self) -> Result<(), EventListener>;
257 fn wait_foreground_done_excluding_own<'a>(
258 &'a self,
259 ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>>;
260
261 fn schedule_notify_tasks(&self, tasks: &[TaskId]);
264
265 fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet);
268
269 fn program_duration_until(&self, instant: Instant) -> Duration;
271
272 fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState));
275
276 fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState));
279
280 fn is_idle(&self) -> bool;
282
283 fn backend(&self) -> &B;
285}
286
287pub trait TurboTasksBackendApiExt<B: Backend + 'static>: TurboTasksBackendApi<B> {
290 fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T {
295 let mut func = Some(func);
296 let mut out = None;
297 self.read_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
298 out.expect("read_task_state_dyn must call `func`")
299 }
300
301 fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T {
306 let mut func = Some(func);
307 let mut out = None;
308 self.write_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
309 out.expect("write_task_state_dyn must call `func`")
310 }
311}
312
313impl<TT, B> TurboTasksBackendApiExt<B> for TT
314where
315 TT: TurboTasksBackendApi<B> + ?Sized,
316 B: Backend + 'static,
317{
318}
319
320#[allow(clippy::manual_non_exhaustive)]
321pub struct UpdateInfo {
322 pub duration: Duration,
323 pub tasks: usize,
324 pub reasons: InvalidationReasonSet,
325 #[allow(dead_code)]
326 placeholder_for_future_fields: (),
327}
328
329#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
330pub enum TaskPersistence {
331 Persistent,
333
334 Transient,
341
342 Local,
351}
352
353#[derive(Clone, Copy, Debug, Eq, PartialEq)]
354pub enum ReadConsistency {
355 Eventual,
358 Strong,
363}
364
365pub struct TurboTasks<B: Backend + 'static> {
366 this: Weak<Self>,
367 backend: B,
368 task_id_factory: IdFactoryWithReuse<TaskId>,
369 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
370 execution_id_factory: IdFactory<ExecutionId>,
371 stopped: AtomicBool,
372 currently_scheduled_tasks: AtomicUsize,
373 currently_scheduled_foreground_jobs: AtomicUsize,
374 currently_scheduled_background_jobs: AtomicUsize,
375 scheduled_tasks: AtomicUsize,
376 start: Mutex<Option<Instant>>,
377 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
378 event: Event,
379 event_start: Event,
380 event_foreground: Event,
381 event_background: Event,
382 program_start: Instant,
383 compilation_events: CompilationEventQueue,
384}
385
386struct CurrentTaskState {
395 task_id: TaskId,
396 execution_id: ExecutionId,
397
398 tasks_to_notify: SmallVec<[TaskId; 4]>,
402
403 stateful: bool,
405
406 has_invalidator: bool,
408
409 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
414
415 local_tasks: Vec<LocalTask>,
417
418 local_task_tracker: TaskTracker,
421
422 backend_state: Box<dyn Any + Send + Sync>,
423}
424
425impl CurrentTaskState {
426 fn new(
427 task_id: TaskId,
428 execution_id: ExecutionId,
429 backend_state: Box<dyn Any + Send + Sync>,
430 ) -> Self {
431 Self {
432 task_id,
433 execution_id,
434 tasks_to_notify: SmallVec::new(),
435 stateful: false,
436 has_invalidator: false,
437 cell_counters: Some(AutoMap::default()),
438 local_tasks: Vec::new(),
439 local_task_tracker: TaskTracker::new(),
440 backend_state,
441 }
442 }
443
444 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
445 if self.execution_id != expected_execution_id {
446 panic!(
447 "Local tasks can only be scheduled/awaited within the same execution of the \
448 parent task that created them"
449 );
450 }
451 }
452
453 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
454 self.local_tasks.push(local_task);
455 if cfg!(debug_assertions) {
457 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
458 } else {
459 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
460 }
461 }
462
463 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
464 &self.local_tasks[(*local_task_id as usize) - 1]
466 }
467
468 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
469 &mut self.local_tasks[(*local_task_id as usize) - 1]
470 }
471}
472
473task_local! {
475 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
477
478 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
479}
480
481impl<B: Backend + 'static> TurboTasks<B> {
482 pub fn new(backend: B) -> Arc<Self> {
488 let task_id_factory = IdFactoryWithReuse::new(
489 TaskId::MIN,
490 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
491 );
492 let transient_task_id_factory =
493 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
494 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
495 let this = Arc::new_cyclic(|this| Self {
496 this: this.clone(),
497 backend,
498 task_id_factory,
499 transient_task_id_factory,
500 execution_id_factory,
501 stopped: AtomicBool::new(false),
502 currently_scheduled_tasks: AtomicUsize::new(0),
503 currently_scheduled_background_jobs: AtomicUsize::new(0),
504 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
505 scheduled_tasks: AtomicUsize::new(0),
506 start: Default::default(),
507 aggregated_update: Default::default(),
508 event: Event::new(|| || "TurboTasks::event".to_string()),
509 event_start: Event::new(|| || "TurboTasks::event_start".to_string()),
510 event_foreground: Event::new(|| || "TurboTasks::event_foreground".to_string()),
511 event_background: Event::new(|| || "TurboTasks::event_background".to_string()),
512 program_start: Instant::now(),
513 compilation_events: CompilationEventQueue::default(),
514 });
515 this.backend.startup(&*this);
516 this
517 }
518
519 pub fn pin(&self) -> Arc<Self> {
520 self.this.upgrade().unwrap()
521 }
522
523 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
525 where
526 T: ?Sized,
527 F: Fn() -> Fut + Send + Sync + Clone + 'static,
528 Fut: Future<Output = Result<Vc<T>>> + Send,
529 {
530 let id = self.backend.create_transient_task(
531 TransientTaskType::Root(Box::new(move || {
532 let functor = functor.clone();
533 Box::pin(async move {
534 let raw_vc = functor().await?.node;
535 raw_vc.to_non_local().await
536 })
537 })),
538 self,
539 );
540 self.schedule(id);
541 id
542 }
543
544 pub fn dispose_root_task(&self, task_id: TaskId) {
545 self.backend.dispose_root_task(task_id, self);
546 }
547
548 #[track_caller]
552 pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
553 where
554 T: ?Sized,
555 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
556 {
557 let id = self.backend.create_transient_task(
558 TransientTaskType::Once(Box::pin(async move {
559 let raw_vc = future.await?.node;
560 raw_vc.to_non_local().await
561 })),
562 self,
563 );
564 self.schedule(id);
565 id
566 }
567
568 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
569 &self,
570 future: impl Future<Output = Result<T>> + Send + 'static,
571 ) -> Result<T> {
572 let (tx, rx) = tokio::sync::oneshot::channel();
573 let task_id = self.spawn_once_task(async move {
574 let result = future.await?;
575 tx.send(result)
576 .map_err(|_| anyhow!("unable to send result"))?;
577 Ok(Completion::new())
578 });
579 let raw_result =
582 read_task_output_untracked(self, task_id, ReadConsistency::Eventual).await?;
583 turbo_tasks_future_scope(
584 self.pin(),
585 ReadVcFuture::<Completion>::from(raw_result.into_read().untracked()),
586 )
587 .await?;
588
589 Ok(rx.await?)
590 }
591
592 pub(crate) fn native_call(
593 &self,
594 native_fn: &'static NativeFunction,
595 this: Option<RawVc>,
596 arg: Box<dyn MagicAny>,
597 persistence: TaskPersistence,
598 ) -> RawVc {
599 match persistence {
600 TaskPersistence::Local => {
601 let task_type = LocalTaskSpec {
602 task_type: LocalTaskType::Native { native_fn },
603 this,
604 arg,
605 };
606 self.schedule_local_task(task_type, persistence)
607 }
608 TaskPersistence::Transient => {
609 let task_type = CachedTaskType {
610 native_fn,
611 this,
612 arg,
613 };
614
615 RawVc::TaskOutput(self.backend.get_or_create_transient_task(
616 task_type,
617 current_task("turbo_function calls"),
618 self,
619 ))
620 }
621 TaskPersistence::Persistent => {
622 let task_type = CachedTaskType {
623 native_fn,
624 this,
625 arg,
626 };
627
628 RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
629 task_type,
630 current_task("turbo_function calls"),
631 self,
632 ))
633 }
634 }
635 }
636
637 pub fn dynamic_call(
638 &self,
639 native_fn: &'static NativeFunction,
640 this: Option<RawVc>,
641 arg: Box<dyn MagicAny>,
642 persistence: TaskPersistence,
643 ) -> RawVc {
644 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
645 return self.native_call(native_fn, this, arg, persistence);
646 }
647 let task_type = LocalTaskSpec {
648 task_type: LocalTaskType::ResolveNative { native_fn },
649 this,
650 arg,
651 };
652 self.schedule_local_task(task_type, persistence)
653 }
654
655 pub fn trait_call(
656 &self,
657 trait_method: &'static TraitMethod,
658 this: RawVc,
659 arg: Box<dyn MagicAny>,
660 persistence: TaskPersistence,
661 ) -> RawVc {
662 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
666 match registry::get_value_type(type_id).get_trait_method(trait_method) {
667 Some(native_fn) => {
668 let arg = native_fn.arg_meta.filter_owned(arg);
669 return self.dynamic_call(native_fn, Some(this), arg, persistence);
670 }
671 None => {
672 }
676 }
677 }
678
679 let task_type = LocalTaskSpec {
681 task_type: LocalTaskType::ResolveTrait { trait_method },
682 this: Some(this),
683 arg,
684 };
685
686 self.schedule_local_task(task_type, persistence)
687 }
688
689 #[track_caller]
690 pub(crate) fn schedule(&self, task_id: TaskId) {
691 self.begin_primary_job();
692 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
693
694 let this = self.pin();
695 let future = async move {
696 let mut schedule_again = true;
697 while schedule_again {
698 let backend_state = this.backend.new_task_state(task_id);
699 let execution_id = this.execution_id_factory.wrapping_get();
701 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
702 task_id,
703 execution_id,
704 Box::new(backend_state),
705 )));
706 let single_execution_future = async {
707 if this.stopped.load(Ordering::Acquire) {
708 this.backend.task_execution_canceled(task_id, &*this);
709 return false;
710 }
711
712 let Some(TaskExecutionSpec { future, span }) =
713 this.backend.try_start_task_execution(task_id, &*this)
714 else {
715 return false;
716 };
717
718 async {
719 let (result, duration, alloc_info) = CaptureFuture::new(future).await;
720
721 let ltt = CURRENT_TASK_STATE
723 .with(|ts| ts.read().unwrap().local_task_tracker.clone());
724 ltt.close();
725 ltt.wait().await;
726
727 let result = match result {
728 Ok(Ok(raw_vc)) => Ok(raw_vc),
729 Ok(Err(err)) => Err(err.into()),
730 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
731 };
732
733 this.backend.task_execution_result(task_id, result, &*this);
734 let FinishedTaskState {
735 stateful,
736 has_invalidator,
737 } = this.finish_current_task_state();
738 let cell_counters = CURRENT_TASK_STATE
739 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
740 let schedule_again = this.backend.task_execution_completed(
741 task_id,
742 duration,
743 alloc_info.memory_usage(),
744 &cell_counters,
745 stateful,
746 has_invalidator,
747 &*this,
748 );
749 this.notify_scheduled_tasks();
751 schedule_again
752 }
753 .instrument(span)
754 .await
755 };
756 schedule_again = CURRENT_TASK_STATE
757 .scope(current_task_state, single_execution_future)
758 .await;
759 }
760 this.finish_primary_job();
761 anyhow::Ok(())
762 };
763
764 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
765
766 #[cfg(feature = "tokio_tracing")]
767 {
768 let description = self.backend.get_task_description(task_id);
769 tokio::task::Builder::new()
770 .name(&description)
771 .spawn(future)
772 .unwrap();
773 }
774 #[cfg(not(feature = "tokio_tracing"))]
775 tokio::task::spawn(future);
776 }
777
778 fn schedule_local_task(
779 &self,
780 ty: LocalTaskSpec,
781 persistence: TaskPersistence,
789 ) -> RawVc {
790 let task_type = ty.task_type;
791 let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
792 .with(|gts| {
793 let mut gts_write = gts.write().unwrap();
794 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
795 done_event: Event::new(move || {
796 move || format!("LocalTask({task_type})::done_event")
797 }),
798 });
799 (
800 Arc::clone(gts),
801 gts_write.task_id,
802 gts_write.execution_id,
803 local_task_id,
804 )
805 });
806
807 #[cfg(feature = "tokio_tracing")]
808 let description = format!(
809 "[local] (parent: {}) {}",
810 self.backend.get_task_description(parent_task_id),
811 ty.task_type,
812 );
813 #[cfg(not(feature = "tokio_tracing"))]
814 let _ = parent_task_id; let this = self.pin();
817 let future = async move {
818 let TaskExecutionSpec { future, span } =
819 crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
820 async move {
821 let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
822
823 let result = match result {
824 Ok(Ok(raw_vc)) => Ok(raw_vc),
825 Ok(Err(err)) => Err(err.into()),
826 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
827 };
828
829 let local_task = LocalTask::Done {
830 output: match result {
831 Ok(raw_vc) => OutputContent::Link(raw_vc),
832 Err(err) => OutputContent::Error(err.with_task_context(task_type)),
833 },
834 };
835
836 let done_event = CURRENT_TASK_STATE.with(move |gts| {
837 let mut gts_write = gts.write().unwrap();
838 let scheduled_task =
839 std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
840 let LocalTask::Scheduled { done_event } = scheduled_task else {
841 panic!("local task finished, but was not in the scheduled state?");
842 };
843 done_event
844 });
845 done_event.notify(usize::MAX)
846 }
847 .instrument(span)
848 .await
849 };
850 let future = global_task_state
851 .read()
852 .unwrap()
853 .local_task_tracker
854 .track_future(future);
855 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
856 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
857
858 #[cfg(feature = "tokio_tracing")]
859 tokio::task::Builder::new()
860 .name(&description)
861 .spawn(future)
862 .unwrap();
863 #[cfg(not(feature = "tokio_tracing"))]
864 tokio::task::spawn(future);
865
866 RawVc::LocalOutput(execution_id, local_task_id, persistence)
867 }
868
869 fn begin_primary_job(&self) {
870 if self
871 .currently_scheduled_tasks
872 .fetch_add(1, Ordering::AcqRel)
873 == 0
874 {
875 *self.start.lock().unwrap() = Some(Instant::now());
876 self.event_start.notify(usize::MAX);
877 self.backend.idle_end(self);
878 }
879 }
880
881 fn begin_foreground_job(&self) {
882 self.begin_primary_job();
883 self.currently_scheduled_foreground_jobs
884 .fetch_add(1, Ordering::AcqRel);
885 }
886
887 fn finish_primary_job(&self) {
888 if self
889 .currently_scheduled_tasks
890 .fetch_sub(1, Ordering::AcqRel)
891 == 1
892 {
893 self.backend.idle_start(self);
894 let total = self.scheduled_tasks.load(Ordering::Acquire);
897 self.scheduled_tasks.store(0, Ordering::Release);
898 if let Some(start) = *self.start.lock().unwrap() {
899 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
900 if let Some(update) = update.as_mut() {
901 update.0 += start.elapsed();
902 update.1 += total;
903 } else {
904 *update = Some((start.elapsed(), total));
905 }
906 }
907 self.event.notify(usize::MAX);
908 }
909 }
910
911 fn finish_foreground_job(&self) {
912 if self
913 .currently_scheduled_foreground_jobs
914 .fetch_sub(1, Ordering::AcqRel)
915 == 1
916 {
917 self.event_foreground.notify(usize::MAX);
918 }
919 self.finish_primary_job();
920 }
921
922 pub async fn wait_foreground_done(&self) {
923 if self
924 .currently_scheduled_foreground_jobs
925 .load(Ordering::Acquire)
926 == 0
927 {
928 return;
929 }
930 let listener = self.event_foreground.listen();
931 if self
932 .currently_scheduled_foreground_jobs
933 .load(Ordering::Acquire)
934 == 0
935 {
936 return;
937 }
938 listener
939 .instrument(trace_span!("wait_foreground_done"))
940 .await;
941 }
942
943 pub fn get_in_progress_count(&self) -> usize {
944 self.currently_scheduled_tasks.load(Ordering::Acquire)
945 }
946
947 pub async fn wait_task_completion(
959 &self,
960 id: TaskId,
961 consistency: ReadConsistency,
962 ) -> Result<()> {
963 read_task_output_untracked(self, id, consistency).await?;
965 Ok(())
966 }
967
968 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
971 self.aggregated_update_info(aggregation, Duration::MAX)
972 .await
973 .unwrap()
974 }
975
976 pub async fn aggregated_update_info(
980 &self,
981 aggregation: Duration,
982 timeout: Duration,
983 ) -> Option<UpdateInfo> {
984 let listener = self
985 .event
986 .listen_with_note(|| || "wait for update info".to_string());
987 let wait_for_finish = {
988 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
989 if aggregation.is_zero() {
990 if let Some((duration, tasks)) = update.take() {
991 return Some(UpdateInfo {
992 duration,
993 tasks,
994 reasons: take(reason_set),
995 placeholder_for_future_fields: (),
996 });
997 } else {
998 true
999 }
1000 } else {
1001 update.is_none()
1002 }
1003 };
1004 if wait_for_finish {
1005 if timeout == Duration::MAX {
1006 listener.await;
1008 } else {
1009 let start_listener = self
1011 .event_start
1012 .listen_with_note(|| || "wait for update info".to_string());
1013 if self.currently_scheduled_tasks.load(Ordering::Acquire) == 0 {
1014 start_listener.await;
1015 } else {
1016 drop(start_listener);
1017 }
1018 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1019 return None;
1021 }
1022 }
1023 }
1024 if !aggregation.is_zero() {
1025 loop {
1026 select! {
1027 () = tokio::time::sleep(aggregation) => {
1028 break;
1029 }
1030 () = self.event.listen_with_note(|| || "wait for update info".to_string()) => {
1031 }
1033 }
1034 }
1035 }
1036 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1037 if let Some((duration, tasks)) = update.take() {
1038 Some(UpdateInfo {
1039 duration,
1040 tasks,
1041 reasons: take(reason_set),
1042 placeholder_for_future_fields: (),
1043 })
1044 } else {
1045 panic!("aggregated_update_info must not called concurrently")
1046 }
1047 }
1048
1049 pub async fn wait_background_done(&self) {
1050 let listener = self.event_background.listen();
1051 if self
1052 .currently_scheduled_background_jobs
1053 .load(Ordering::Acquire)
1054 != 0
1055 {
1056 listener.await;
1057 }
1058 }
1059
1060 pub async fn stop_and_wait(&self) {
1061 turbo_tasks_future_scope(self.pin(), async move {
1062 self.backend.stopping(self);
1063 self.stopped.store(true, Ordering::Release);
1064 {
1065 let listener = self
1066 .event
1067 .listen_with_note(|| || "wait for stop".to_string());
1068 if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1069 listener.await;
1070 }
1071 }
1072 {
1073 let listener = self.event_background.listen();
1074 if self
1075 .currently_scheduled_background_jobs
1076 .load(Ordering::Acquire)
1077 != 0
1078 {
1079 listener.await;
1080 }
1081 }
1082 self.backend.stop(self);
1083 })
1084 .await;
1085 }
1086
1087 #[track_caller]
1088 pub(crate) fn schedule_background_job<
1089 T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1090 F: Future<Output = ()> + Send + 'static,
1091 >(
1092 &self,
1093 func: T,
1094 ) {
1095 let this = self.pin();
1096 self.currently_scheduled_background_jobs
1097 .fetch_add(1, Ordering::AcqRel);
1098 tokio::spawn(
1099 TURBO_TASKS
1100 .scope(this.clone(), async move {
1101 while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1102 let listener = this.event.listen_with_note(|| {
1103 || "background job waiting for execution".to_string()
1104 });
1105 if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1106 listener.await;
1107 }
1108 }
1109 let this2 = this.clone();
1110 if !this.stopped.load(Ordering::Acquire) {
1111 func(this).await;
1112 }
1113 if this2
1114 .currently_scheduled_background_jobs
1115 .fetch_sub(1, Ordering::AcqRel)
1116 == 1
1117 {
1118 this2.event_background.notify(usize::MAX);
1119 }
1120 })
1121 .in_current_span(),
1122 );
1123 }
1124
1125 #[track_caller]
1126 pub(crate) fn schedule_foreground_job<
1127 T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1128 F: Future<Output = ()> + Send + 'static,
1129 >(
1130 &self,
1131 func: T,
1132 ) {
1133 let this = self.pin();
1134 this.begin_foreground_job();
1135 tokio::spawn(
1136 TURBO_TASKS
1137 .scope(this.clone(), async move {
1138 if !this.stopped.load(Ordering::Acquire) {
1139 func(this.clone()).await;
1140 }
1141 this.finish_foreground_job();
1142 })
1143 .in_current_span(),
1144 );
1145 }
1146
1147 fn finish_current_task_state(&self) -> FinishedTaskState {
1148 let (stateful, has_invalidator, tasks) = CURRENT_TASK_STATE.with(|cell| {
1149 let CurrentTaskState {
1150 tasks_to_notify,
1151 stateful,
1152 has_invalidator,
1153 ..
1154 } = &mut *cell.write().unwrap();
1155 (*stateful, *has_invalidator, take(tasks_to_notify))
1156 });
1157
1158 if !tasks.is_empty() {
1159 self.backend.invalidate_tasks(&tasks, self);
1160 }
1161 FinishedTaskState {
1162 stateful,
1163 has_invalidator,
1164 }
1165 }
1166
1167 pub fn backend(&self) -> &B {
1168 &self.backend
1169 }
1170}
1171
1172struct FinishedTaskState {
1173 stateful: bool,
1175
1176 has_invalidator: bool,
1178}
1179
1180impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1181 fn dynamic_call(
1182 &self,
1183 native_fn: &'static NativeFunction,
1184 this: Option<RawVc>,
1185 arg: Box<dyn MagicAny>,
1186 persistence: TaskPersistence,
1187 ) -> RawVc {
1188 self.dynamic_call(native_fn, this, arg, persistence)
1189 }
1190 fn native_call(
1191 &self,
1192 native_fn: &'static NativeFunction,
1193 this: Option<RawVc>,
1194 arg: Box<dyn MagicAny>,
1195 persistence: TaskPersistence,
1196 ) -> RawVc {
1197 self.native_call(native_fn, this, arg, persistence)
1198 }
1199 fn trait_call(
1200 &self,
1201 trait_method: &'static TraitMethod,
1202 this: RawVc,
1203 arg: Box<dyn MagicAny>,
1204 persistence: TaskPersistence,
1205 ) -> RawVc {
1206 self.trait_call(trait_method, this, arg, persistence)
1207 }
1208
1209 #[track_caller]
1210 fn run_once(
1211 &self,
1212 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1213 ) -> TaskId {
1214 self.spawn_once_task(async move {
1215 future.await?;
1216 Ok(Completion::new())
1217 })
1218 }
1219
1220 #[track_caller]
1221 fn run_once_with_reason(
1222 &self,
1223 reason: StaticOrArc<dyn InvalidationReason>,
1224 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1225 ) -> TaskId {
1226 {
1227 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1228 reason_set.insert(reason);
1229 }
1230 self.spawn_once_task(async move {
1231 future.await?;
1232 Ok(Completion::new())
1233 })
1234 }
1235
1236 #[track_caller]
1237 fn run_once_process(
1238 &self,
1239 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1240 ) -> TaskId {
1241 let this = self.pin();
1242 self.spawn_once_task(async move {
1243 this.finish_primary_job();
1244 future.await?;
1245 this.begin_primary_job();
1246 Ok(Completion::new())
1247 })
1248 }
1249}
1250
1251impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1252 #[instrument(level = Level::INFO, skip_all, name = "invalidate")]
1253 fn invalidate(&self, task: TaskId) {
1254 self.backend.invalidate_task(task, self);
1255 }
1256
1257 #[instrument(level = Level::INFO, skip_all, name = "invalidate", fields(name = display(&reason)))]
1258 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1259 {
1260 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1261 reason_set.insert(reason);
1262 }
1263 self.backend.invalidate_task(task, self);
1264 }
1265
1266 fn invalidate_serialization(&self, task: TaskId) {
1267 self.backend.invalidate_serialization(task, self);
1268 }
1269
1270 fn notify_scheduled_tasks(&self) {
1271 let _ = CURRENT_TASK_STATE.try_with(|cell| {
1272 let tasks = {
1273 let CurrentTaskState {
1274 tasks_to_notify, ..
1275 } = &mut *cell.write().unwrap();
1276 take(tasks_to_notify)
1277 };
1278 if tasks.is_empty() {
1279 return;
1280 }
1281 self.backend.invalidate_tasks(&tasks, self);
1282 });
1283 }
1284
1285 fn try_read_task_output(
1286 &self,
1287 task: TaskId,
1288 consistency: ReadConsistency,
1289 ) -> Result<Result<RawVc, EventListener>> {
1290 self.backend
1291 .try_read_task_output(task, current_task("reading Vcs"), consistency, self)
1292 }
1293
1294 fn try_read_task_output_untracked(
1295 &self,
1296 task: TaskId,
1297 consistency: ReadConsistency,
1298 ) -> Result<Result<RawVc, EventListener>> {
1299 self.backend
1300 .try_read_task_output_untracked(task, consistency, self)
1301 }
1302
1303 fn try_read_task_cell(
1304 &self,
1305 task: TaskId,
1306 index: CellId,
1307 options: ReadCellOptions,
1308 ) -> Result<Result<TypedCellContent, EventListener>> {
1309 self.backend
1310 .try_read_task_cell(task, index, current_task("reading Vcs"), options, self)
1311 }
1312
1313 fn try_read_task_cell_untracked(
1314 &self,
1315 task: TaskId,
1316 index: CellId,
1317 options: ReadCellOptions,
1318 ) -> Result<Result<TypedCellContent, EventListener>> {
1319 self.backend
1320 .try_read_task_cell_untracked(task, index, options, self)
1321 }
1322
1323 fn try_read_own_task_cell_untracked(
1324 &self,
1325 current_task: TaskId,
1326 index: CellId,
1327 options: ReadCellOptions,
1328 ) -> Result<TypedCellContent> {
1329 self.backend
1330 .try_read_own_task_cell_untracked(current_task, index, options, self)
1331 }
1332
1333 fn try_read_local_output(
1334 &self,
1335 execution_id: ExecutionId,
1336 local_task_id: LocalTaskId,
1337 ) -> Result<Result<RawVc, EventListener>> {
1338 CURRENT_TASK_STATE.with(|gts| {
1339 let gts_read = gts.read().unwrap();
1340
1341 gts_read.assert_execution_id(execution_id);
1346
1347 match gts_read.get_local_task(local_task_id) {
1348 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1349 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1350 }
1351 })
1352 }
1353
1354 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1355 self.backend.read_task_collectibles(
1356 task,
1357 trait_id,
1358 current_task("reading collectibles"),
1359 self,
1360 )
1361 }
1362
1363 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1364 self.backend.emit_collectible(
1365 trait_type,
1366 collectible,
1367 current_task("emitting collectible"),
1368 self,
1369 );
1370 }
1371
1372 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1373 self.backend.unemit_collectible(
1374 trait_type,
1375 collectible,
1376 count,
1377 current_task("emitting collectible"),
1378 self,
1379 );
1380 }
1381
1382 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1383 for (&collectible, &count) in collectibles {
1384 if count > 0 {
1385 self.backend.unemit_collectible(
1386 trait_type,
1387 collectible,
1388 count as u32,
1389 current_task("emitting collectible"),
1390 self,
1391 );
1392 }
1393 }
1394 }
1395
1396 fn read_own_task_cell(
1397 &self,
1398 task: TaskId,
1399 index: CellId,
1400 options: ReadCellOptions,
1401 ) -> Result<TypedCellContent> {
1402 self.try_read_own_task_cell_untracked(task, index, options)
1404 }
1405
1406 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1407 self.backend.update_task_cell(task, index, content, self);
1408 }
1409
1410 fn connect_task(&self, task: TaskId) {
1411 self.backend
1412 .connect_task(task, current_task("connecting task"), self);
1413 }
1414
1415 fn mark_own_task_as_finished(&self, task: TaskId) {
1416 self.backend.mark_own_task_as_finished(task, self);
1417 }
1418
1419 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1420 self.backend
1421 .set_own_task_aggregation_number(task, aggregation_number, self);
1422 }
1423
1424 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1425 self.backend.mark_own_task_as_session_dependent(task, self);
1426 }
1427
1428 fn detached_for_testing(
1431 &self,
1432 fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1433 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1434 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1437 let tracked_fut = {
1438 let ts = global_task_state.read().unwrap();
1439 ts.local_task_tracker.track_future(fut)
1440 };
1441 Box::pin(TURBO_TASKS.scope(
1442 turbo_tasks(),
1443 CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1444 ))
1445 }
1446
1447 fn task_statistics(&self) -> &TaskStatisticsApi {
1448 self.backend.task_statistics()
1449 }
1450
1451 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1452 let this = self.pin();
1453 Box::pin(async move {
1454 this.stop_and_wait().await;
1455 })
1456 }
1457
1458 fn subscribe_to_compilation_events(
1459 &self,
1460 event_types: Option<Vec<String>>,
1461 ) -> Receiver<Arc<dyn CompilationEvent>> {
1462 self.compilation_events.subscribe(event_types)
1463 }
1464
1465 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1466 if let Err(e) = self.compilation_events.send(event) {
1467 tracing::warn!("Failed to send compilation event: {e}");
1468 }
1469 }
1470}
1471
1472impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1473 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1474 self.pin()
1475 }
1476 fn backend(&self) -> &B {
1477 &self.backend
1478 }
1479
1480 #[track_caller]
1481 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1482 self.schedule_background_job(move |this| async move {
1483 this.backend.run_backend_job(job, &*this).await;
1484 })
1485 }
1486
1487 #[track_caller]
1488 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1489 self.schedule_foreground_job(move |this| async move {
1490 this.backend.run_backend_job(job, &*this).await;
1491 })
1492 }
1493
1494 fn try_foreground_done(&self) -> Result<(), EventListener> {
1495 if self
1496 .currently_scheduled_foreground_jobs
1497 .load(Ordering::Acquire)
1498 == 0
1499 {
1500 return Ok(());
1501 }
1502 let listener = self.event_foreground.listen();
1503 if self
1504 .currently_scheduled_foreground_jobs
1505 .load(Ordering::Acquire)
1506 == 0
1507 {
1508 return Ok(());
1509 }
1510 Err(listener)
1511 }
1512
1513 fn wait_foreground_done_excluding_own<'a>(
1514 &'a self,
1515 ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>> {
1516 if self
1517 .currently_scheduled_foreground_jobs
1518 .load(Ordering::Acquire)
1519 == 0
1520 {
1521 return None;
1522 }
1523 Some(Box::pin(async {
1524 self.finish_foreground_job();
1525 self.wait_foreground_done().await;
1526 self.begin_foreground_job();
1527 }))
1528 }
1529
1530 fn schedule_notify_tasks(&self, tasks: &[TaskId]) {
1533 let result = CURRENT_TASK_STATE.try_with(|cell| {
1534 let CurrentTaskState {
1535 tasks_to_notify, ..
1536 } = &mut *cell.write().unwrap();
1537 tasks_to_notify.extend(tasks.iter().copied());
1538 });
1539 if result.is_err() {
1540 let _guard = trace_span!("schedule_notify_tasks", count = tasks.len()).entered();
1541 self.backend.invalidate_tasks(tasks, self);
1542 }
1543 }
1544
1545 fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) {
1548 let result = CURRENT_TASK_STATE.try_with(|cell| {
1549 let CurrentTaskState {
1550 tasks_to_notify, ..
1551 } = &mut *cell.write().unwrap();
1552 tasks_to_notify.extend(tasks.iter().copied());
1553 });
1554 if result.is_err() {
1555 let _guard = trace_span!("schedule_notify_tasks_set", count = tasks.len()).entered();
1556 self.backend.invalidate_tasks_set(tasks, self);
1557 };
1558 }
1559
1560 #[track_caller]
1561 fn schedule(&self, task: TaskId) {
1562 self.schedule(task)
1563 }
1564
1565 fn program_duration_until(&self, instant: Instant) -> Duration {
1566 instant - self.program_start
1567 }
1568
1569 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1570 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1572 }
1573
1574 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1575 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1577 }
1578
1579 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1580 unsafe { self.task_id_factory.reuse(id.into()) }
1581 }
1582
1583 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1584 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1585 }
1586
1587 fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState)) {
1588 CURRENT_TASK_STATE
1589 .with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap()))
1590 }
1591
1592 fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState)) {
1593 CURRENT_TASK_STATE
1594 .with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap()))
1595 }
1596
1597 fn is_idle(&self) -> bool {
1598 self.currently_scheduled_tasks.load(Ordering::Acquire) == 0
1599 }
1600}
1601
1602pub(crate) fn current_task(from: &str) -> TaskId {
1603 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1604 Ok(id) => id,
1605 Err(_) => panic!("{from} can only be used in the context of turbo_tasks task execution"),
1606 }
1607}
1608
1609pub async fn run_once<T: Send + 'static>(
1610 tt: Arc<dyn TurboTasksApi>,
1611 future: impl Future<Output = Result<T>> + Send + 'static,
1612) -> Result<T> {
1613 let (tx, rx) = tokio::sync::oneshot::channel();
1614
1615 let task_id = tt.run_once(Box::pin(async move {
1616 let result = future.await?;
1617 tx.send(result)
1618 .map_err(|_| anyhow!("unable to send result"))?;
1619 Ok(())
1620 }));
1621
1622 let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1625 let raw_future = raw_result.into_read().untracked();
1626 turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1627
1628 Ok(rx.await?)
1629}
1630
1631pub async fn run_once_with_reason<T: Send + 'static>(
1632 tt: Arc<dyn TurboTasksApi>,
1633 reason: impl InvalidationReason,
1634 future: impl Future<Output = Result<T>> + Send + 'static,
1635) -> Result<T> {
1636 let (tx, rx) = tokio::sync::oneshot::channel();
1637
1638 let task_id = tt.run_once_with_reason(
1639 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1640 Box::pin(async move {
1641 let result = future.await?;
1642 tx.send(result)
1643 .map_err(|_| anyhow!("unable to send result"))?;
1644 Ok(())
1645 }),
1646 );
1647
1648 let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1651 let raw_future = raw_result.into_read().untracked();
1652 turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1653
1654 Ok(rx.await?)
1655}
1656
1657pub fn dynamic_call(
1659 func: &'static NativeFunction,
1660 this: Option<RawVc>,
1661 arg: Box<dyn MagicAny>,
1662 persistence: TaskPersistence,
1663) -> RawVc {
1664 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1665}
1666
1667pub fn trait_call(
1669 trait_method: &'static TraitMethod,
1670 this: RawVc,
1671 arg: Box<dyn MagicAny>,
1672 persistence: TaskPersistence,
1673) -> RawVc {
1674 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1675}
1676
1677pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1678 TURBO_TASKS.with(|arc| arc.clone())
1679}
1680
1681pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1682 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1683}
1684
1685pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1686 TURBO_TASKS.with(|arc| func(arc))
1687}
1688
1689pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1690 TURBO_TASKS.sync_scope(tt, f)
1691}
1692
1693pub fn turbo_tasks_future_scope<T>(
1694 tt: Arc<dyn TurboTasksApi>,
1695 f: impl Future<Output = T>,
1696) -> impl Future<Output = T> {
1697 TURBO_TASKS.scope(tt, f)
1698}
1699
1700pub fn with_turbo_tasks_for_testing<T>(
1701 tt: Arc<dyn TurboTasksApi>,
1702 current_task: TaskId,
1703 execution_id: ExecutionId,
1704 f: impl Future<Output = T>,
1705) -> impl Future<Output = T> {
1706 TURBO_TASKS.scope(
1707 tt,
1708 CURRENT_TASK_STATE.scope(
1709 Arc::new(RwLock::new(CurrentTaskState::new(
1710 current_task,
1711 execution_id,
1712 Box::new(()),
1713 ))),
1714 f,
1715 ),
1716 )
1717}
1718
1719pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1724 tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1725}
1726
1727pub fn current_task_for_testing() -> TaskId {
1728 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1729}
1730
1731pub fn mark_session_dependent() {
1733 with_turbo_tasks(|tt| {
1734 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1735 });
1736}
1737
1738pub fn mark_root() {
1741 with_turbo_tasks(|tt| {
1742 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1743 });
1744}
1745
1746pub fn mark_finished() {
1749 with_turbo_tasks(|tt| {
1750 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1751 });
1752}
1753
1754pub fn mark_stateful() -> SerializationInvalidator {
1760 CURRENT_TASK_STATE.with(|cell| {
1761 let CurrentTaskState {
1762 stateful, task_id, ..
1763 } = &mut *cell.write().unwrap();
1764 *stateful = true;
1765 SerializationInvalidator::new(*task_id)
1766 })
1767}
1768
1769pub fn mark_invalidator() {
1770 CURRENT_TASK_STATE.with(|cell| {
1771 let CurrentTaskState {
1772 has_invalidator, ..
1773 } = &mut *cell.write().unwrap();
1774 *has_invalidator = true;
1775 })
1776}
1777
1778pub fn prevent_gc() {
1779 mark_stateful();
1781}
1782
1783pub fn notify_scheduled_tasks() {
1785 with_turbo_tasks(|tt| tt.notify_scheduled_tasks())
1786}
1787
1788pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1789 with_turbo_tasks(|tt| {
1790 let raw_vc = collectible.node.node;
1791 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1792 })
1793}
1794
1795pub(crate) async fn read_task_output(
1796 this: &dyn TurboTasksApi,
1797 id: TaskId,
1798 consistency: ReadConsistency,
1799) -> Result<RawVc> {
1800 loop {
1801 match this.try_read_task_output(id, consistency)? {
1802 Ok(result) => return Ok(result),
1803 Err(listener) => listener.await,
1804 }
1805 }
1806}
1807
1808pub(crate) async fn read_task_output_untracked(
1811 this: &dyn TurboTasksApi,
1812 id: TaskId,
1813 consistency: ReadConsistency,
1814) -> Result<RawVc> {
1815 loop {
1816 match this.try_read_task_output_untracked(id, consistency)? {
1817 Ok(result) => return Ok(result),
1818 Err(listener) => listener.await,
1819 }
1820 }
1821}
1822
1823pub(crate) async fn read_task_cell(
1824 this: &dyn TurboTasksApi,
1825 id: TaskId,
1826 index: CellId,
1827 options: ReadCellOptions,
1828) -> Result<TypedCellContent> {
1829 loop {
1830 match this.try_read_task_cell(id, index, options)? {
1831 Ok(result) => return Ok(result),
1832 Err(listener) => listener.await,
1833 }
1834 }
1835}
1836
1837#[derive(Clone, Copy, Serialize, Deserialize)]
1843pub struct CurrentCellRef {
1844 current_task: TaskId,
1845 index: CellId,
1846}
1847
1848type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1849
1850impl CurrentCellRef {
1851 pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1853 where
1854 T: VcValueType,
1855 {
1856 self.conditional_update_with_shared_reference(|old_shared_reference| {
1857 let old_ref = old_shared_reference
1858 .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1859 .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1860 let new_value = functor(old_ref)?;
1861 Some(SharedReference::new(triomphe::Arc::new(
1862 <T::Read as VcRead<T>>::value_to_repr(new_value),
1863 )))
1864 })
1865 }
1866
1867 pub fn conditional_update_with_shared_reference(
1869 &self,
1870 functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1871 ) {
1872 let tt = turbo_tasks();
1873 let cell_content = tt
1874 .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1875 .ok();
1876 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1877 if let Some(update) = update {
1878 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1879 }
1880 }
1881
1882 pub fn compare_and_update<T>(&self, new_value: T)
1916 where
1917 T: PartialEq + VcValueType,
1918 {
1919 self.conditional_update(|old_value| {
1920 if let Some(old_value) = old_value
1921 && old_value == &new_value
1922 {
1923 return None;
1924 }
1925 Some(new_value)
1926 });
1927 }
1928
1929 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1938 where
1939 T: VcValueType + PartialEq,
1940 {
1941 fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1942 <T::Read as VcRead<T>>::repr_to_value_ref(
1943 sr.0.downcast_ref::<VcReadRepr<T>>()
1944 .expect("cannot update SharedReference of different type"),
1945 )
1946 }
1947 self.conditional_update_with_shared_reference(|old_sr| {
1948 if let Some(old_sr) = old_sr {
1949 let old_value: &T = extract_sr_value(old_sr);
1950 let new_value = extract_sr_value(&new_shared_reference);
1951 if old_value == new_value {
1952 return None;
1953 }
1954 }
1955 Some(new_shared_reference)
1956 });
1957 }
1958
1959 pub fn update<T>(&self, new_value: T)
1961 where
1962 T: VcValueType,
1963 {
1964 let tt = turbo_tasks();
1965 tt.update_own_task_cell(
1966 self.current_task,
1967 self.index,
1968 CellContent(Some(SharedReference::new(triomphe::Arc::new(
1969 <T::Read as VcRead<T>>::value_to_repr(new_value),
1970 )))),
1971 )
1972 }
1973
1974 pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
1983 let tt = turbo_tasks();
1984 let content = tt
1985 .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1986 .ok();
1987 let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1988 shared_ref_exp != shared_ref
1990 } else {
1991 true
1992 };
1993 if update {
1994 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
1995 }
1996 }
1997}
1998
1999impl From<CurrentCellRef> for RawVc {
2000 fn from(cell: CurrentCellRef) -> Self {
2001 RawVc::TaskCell(cell.current_task, cell.index)
2002 }
2003}
2004
2005pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
2006 CURRENT_TASK_STATE.with(|ts| {
2007 let current_task = current_task("celling turbo_tasks values");
2008 let mut ts = ts.write().unwrap();
2009 let map = ts.cell_counters.as_mut().unwrap();
2010 let current_index = map.entry(ty).or_default();
2011 let index = *current_index;
2012 *current_index += 1;
2013 CurrentCellRef {
2014 current_task,
2015 index: CellId { type_id: ty, index },
2016 }
2017 })
2018}
2019
2020pub(crate) async fn read_local_output(
2021 this: &dyn TurboTasksApi,
2022 execution_id: ExecutionId,
2023 local_task_id: LocalTaskId,
2024) -> Result<RawVc> {
2025 loop {
2026 match this.try_read_local_output(execution_id, local_task_id)? {
2027 Ok(raw_vc) => return Ok(raw_vc),
2028 Err(event_listener) => event_listener.await,
2029 }
2030 }
2031}