1use std::{
2 any::Any,
3 future::Future,
4 hash::BuildHasherDefault,
5 mem::take,
6 pin::Pin,
7 sync::{
8 Arc, Mutex, RwLock, Weak,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 thread,
12 time::{Duration, Instant},
13};
14
15use anyhow::{Result, anyhow};
16use auto_hash_map::AutoMap;
17use rustc_hash::FxHasher;
18use serde::{Deserialize, Serialize};
19use smallvec::SmallVec;
20use tokio::{runtime::Handle, select, sync::mpsc::Receiver, task_local};
21use tokio_util::task::TaskTracker;
22use tracing::{Instrument, Level, Span, info_span, instrument, trace_span};
23use turbo_tasks_malloc::TurboMalloc;
24
25use crate::{
26 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
27 ResolvedVc, SharedReference, TaskId, TaskIdSet, TraitMethod, ValueTypeId, Vc, VcRead,
28 VcValueTrait, VcValueType,
29 backend::{
30 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
31 TransientTaskType, TurboTasksExecutionError, TypedCellContent,
32 },
33 capture_future::{self, CaptureFuture},
34 event::{Event, EventListener},
35 id::{BackendJobId, ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
36 id_factory::IdFactoryWithReuse,
37 macro_helpers::NativeFunction,
38 magic_any::MagicAny,
39 message_queue::{CompilationEvent, CompilationEventQueue},
40 raw_vc::{CellId, RawVc},
41 registry,
42 serialization_invalidation::SerializationInvalidator,
43 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
44 task_statistics::TaskStatisticsApi,
45 trace::TraceRawVcs,
46 util::{IdFactory, StaticOrArc},
47 vc::ReadVcFuture,
48};
49
50pub trait TurboTasksCallApi: Sync + Send {
53 fn dynamic_call(
56 &self,
57 native_fn: &'static NativeFunction,
58 this: Option<RawVc>,
59 arg: Box<dyn MagicAny>,
60 persistence: TaskPersistence,
61 ) -> RawVc;
62 fn native_call(
65 &self,
66 native_fn: &'static NativeFunction,
67 this: Option<RawVc>,
68 arg: Box<dyn MagicAny>,
69 persistence: TaskPersistence,
70 ) -> RawVc;
71 fn trait_call(
74 &self,
75 trait_method: &'static TraitMethod,
76 this: RawVc,
77 arg: Box<dyn MagicAny>,
78 persistence: TaskPersistence,
79 ) -> RawVc;
80
81 fn run_once(
82 &self,
83 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84 ) -> TaskId;
85 fn run_once_with_reason(
86 &self,
87 reason: StaticOrArc<dyn InvalidationReason>,
88 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89 ) -> TaskId;
90 fn run_once_process(
91 &self,
92 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
93 ) -> TaskId;
94}
95
96pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
102 fn invalidate(&self, task: TaskId);
103 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
104
105 fn invalidate_serialization(&self, task: TaskId);
106
107 fn notify_scheduled_tasks(&self);
110
111 fn try_read_task_output(
112 &self,
113 task: TaskId,
114 consistency: ReadConsistency,
115 ) -> Result<Result<RawVc, EventListener>>;
116
117 fn try_read_task_output_untracked(
120 &self,
121 task: TaskId,
122 consistency: ReadConsistency,
123 ) -> Result<Result<RawVc, EventListener>>;
124
125 fn try_read_task_cell(
126 &self,
127 task: TaskId,
128 index: CellId,
129 options: ReadCellOptions,
130 ) -> Result<Result<TypedCellContent, EventListener>>;
131
132 fn try_read_task_cell_untracked(
135 &self,
136 task: TaskId,
137 index: CellId,
138 options: ReadCellOptions,
139 ) -> Result<Result<TypedCellContent, EventListener>>;
140
141 fn try_read_local_output(
156 &self,
157 execution_id: ExecutionId,
158 local_task_id: LocalTaskId,
159 ) -> Result<Result<RawVc, EventListener>>;
160
161 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
162
163 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
164 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
165 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
166
167 fn try_read_own_task_cell_untracked(
170 &self,
171 current_task: TaskId,
172 index: CellId,
173 options: ReadCellOptions,
174 ) -> Result<TypedCellContent>;
175
176 fn read_own_task_cell(
177 &self,
178 task: TaskId,
179 index: CellId,
180 options: ReadCellOptions,
181 ) -> Result<TypedCellContent>;
182 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
183 fn mark_own_task_as_finished(&self, task: TaskId);
184 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
185 fn mark_own_task_as_session_dependent(&self, task: TaskId);
186
187 fn connect_task(&self, task: TaskId);
188
189 fn detached_for_testing(
194 &self,
195 f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
196 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
197
198 fn task_statistics(&self) -> &TaskStatisticsApi;
199
200 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
201
202 fn subscribe_to_compilation_events(
203 &self,
204 event_types: Option<Vec<String>>,
205 ) -> Receiver<Arc<dyn CompilationEvent>>;
206 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
207}
208
209pub struct Unused<T> {
211 inner: T,
212}
213
214impl<T> Unused<T> {
215 pub unsafe fn new_unchecked(inner: T) -> Self {
221 Self { inner }
222 }
223
224 pub unsafe fn get_unchecked(&self) -> &T {
230 &self.inner
231 }
232
233 pub fn into(self) -> T {
235 self.inner
236 }
237}
238
239pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
241 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
242
243 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
244 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
245 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
249 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
253
254 fn schedule(&self, task: TaskId);
255 fn schedule_backend_background_job(&self, id: BackendJobId);
256 fn schedule_backend_foreground_job(&self, id: BackendJobId);
257
258 fn try_foreground_done(&self) -> Result<(), EventListener>;
259 fn wait_foreground_done_excluding_own<'a>(
260 &'a self,
261 ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>>;
262
263 fn schedule_notify_tasks(&self, tasks: &[TaskId]);
266
267 fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet);
270
271 fn program_duration_until(&self, instant: Instant) -> Duration;
273
274 fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState));
277
278 fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState));
281
282 fn is_idle(&self) -> bool;
284
285 fn backend(&self) -> &B;
287}
288
289pub trait TurboTasksBackendApiExt<B: Backend + 'static>: TurboTasksBackendApi<B> {
292 fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T {
297 let mut func = Some(func);
298 let mut out = None;
299 self.read_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
300 out.expect("read_task_state_dyn must call `func`")
301 }
302
303 fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T {
308 let mut func = Some(func);
309 let mut out = None;
310 self.write_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
311 out.expect("write_task_state_dyn must call `func`")
312 }
313}
314
315impl<TT, B> TurboTasksBackendApiExt<B> for TT
316where
317 TT: TurboTasksBackendApi<B> + ?Sized,
318 B: Backend + 'static,
319{
320}
321
322#[allow(clippy::manual_non_exhaustive)]
323pub struct UpdateInfo {
324 pub duration: Duration,
325 pub tasks: usize,
326 pub reasons: InvalidationReasonSet,
327 #[allow(dead_code)]
328 placeholder_for_future_fields: (),
329}
330
331#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
332pub enum TaskPersistence {
333 Persistent,
335
336 Transient,
343
344 Local,
353}
354
355#[derive(Clone, Copy, Debug, Eq, PartialEq)]
356pub enum ReadConsistency {
357 Eventual,
360 Strong,
365}
366
367pub struct TurboTasks<B: Backend + 'static> {
368 this: Weak<Self>,
369 backend: B,
370 task_id_factory: IdFactoryWithReuse<TaskId>,
371 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
372 execution_id_factory: IdFactory<ExecutionId>,
373 stopped: AtomicBool,
374 currently_scheduled_tasks: AtomicUsize,
375 currently_scheduled_foreground_jobs: AtomicUsize,
376 currently_scheduled_background_jobs: AtomicUsize,
377 scheduled_tasks: AtomicUsize,
378 start: Mutex<Option<Instant>>,
379 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
380 event: Event,
381 event_start: Event,
382 event_foreground: Event,
383 event_background: Event,
384 program_start: Instant,
385 compilation_events: CompilationEventQueue,
386}
387
388struct CurrentTaskState {
397 task_id: TaskId,
398 execution_id: ExecutionId,
399
400 tasks_to_notify: SmallVec<[TaskId; 4]>,
404
405 stateful: bool,
407
408 has_invalidator: bool,
410
411 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
416
417 local_tasks: Vec<LocalTask>,
419
420 local_task_tracker: TaskTracker,
423
424 backend_state: Box<dyn Any + Send + Sync>,
425}
426
427impl CurrentTaskState {
428 fn new(
429 task_id: TaskId,
430 execution_id: ExecutionId,
431 backend_state: Box<dyn Any + Send + Sync>,
432 ) -> Self {
433 Self {
434 task_id,
435 execution_id,
436 tasks_to_notify: SmallVec::new(),
437 stateful: false,
438 has_invalidator: false,
439 cell_counters: Some(AutoMap::default()),
440 local_tasks: Vec::new(),
441 local_task_tracker: TaskTracker::new(),
442 backend_state,
443 }
444 }
445
446 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
447 if self.execution_id != expected_execution_id {
448 panic!(
449 "Local tasks can only be scheduled/awaited within the same execution of the \
450 parent task that created them"
451 );
452 }
453 }
454
455 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
456 self.local_tasks.push(local_task);
457 if cfg!(debug_assertions) {
459 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
460 } else {
461 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
462 }
463 }
464
465 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
466 &self.local_tasks[(*local_task_id as usize) - 1]
468 }
469
470 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
471 &mut self.local_tasks[(*local_task_id as usize) - 1]
472 }
473}
474
475task_local! {
477 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
479
480 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
481}
482
483impl<B: Backend + 'static> TurboTasks<B> {
484 pub fn new(backend: B) -> Arc<Self> {
490 let task_id_factory = IdFactoryWithReuse::new(
491 TaskId::MIN,
492 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
493 );
494 let transient_task_id_factory =
495 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
496 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
497 let this = Arc::new_cyclic(|this| Self {
498 this: this.clone(),
499 backend,
500 task_id_factory,
501 transient_task_id_factory,
502 execution_id_factory,
503 stopped: AtomicBool::new(false),
504 currently_scheduled_tasks: AtomicUsize::new(0),
505 currently_scheduled_background_jobs: AtomicUsize::new(0),
506 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
507 scheduled_tasks: AtomicUsize::new(0),
508 start: Default::default(),
509 aggregated_update: Default::default(),
510 event: Event::new(|| "TurboTasks::event".to_string()),
511 event_start: Event::new(|| "TurboTasks::event_start".to_string()),
512 event_foreground: Event::new(|| "TurboTasks::event_foreground".to_string()),
513 event_background: Event::new(|| "TurboTasks::event_background".to_string()),
514 program_start: Instant::now(),
515 compilation_events: CompilationEventQueue::default(),
516 });
517 this.backend.startup(&*this);
518 this
519 }
520
521 pub fn pin(&self) -> Arc<Self> {
522 self.this.upgrade().unwrap()
523 }
524
525 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
527 where
528 T: ?Sized,
529 F: Fn() -> Fut + Send + Sync + Clone + 'static,
530 Fut: Future<Output = Result<Vc<T>>> + Send,
531 {
532 let id = self.backend.create_transient_task(
533 TransientTaskType::Root(Box::new(move || {
534 let functor = functor.clone();
535 Box::pin(async move {
536 let raw_vc = functor().await?.node;
537 raw_vc.to_non_local().await
538 })
539 })),
540 self,
541 );
542 self.schedule(id);
543 id
544 }
545
546 pub fn dispose_root_task(&self, task_id: TaskId) {
547 self.backend.dispose_root_task(task_id, self);
548 }
549
550 #[track_caller]
554 pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
555 where
556 T: ?Sized,
557 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
558 {
559 let id = self.backend.create_transient_task(
560 TransientTaskType::Once(Box::pin(async move {
561 let raw_vc = future.await?.node;
562 raw_vc.to_non_local().await
563 })),
564 self,
565 );
566 self.schedule(id);
567 id
568 }
569
570 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
571 &self,
572 future: impl Future<Output = Result<T>> + Send + 'static,
573 ) -> Result<T> {
574 let (tx, rx) = tokio::sync::oneshot::channel();
575 let task_id = self.spawn_once_task(async move {
576 let result = future.await?;
577 tx.send(result)
578 .map_err(|_| anyhow!("unable to send result"))?;
579 Ok(Completion::new())
580 });
581 let raw_result =
584 read_task_output_untracked(self, task_id, ReadConsistency::Eventual).await?;
585 turbo_tasks_future_scope(
586 self.pin(),
587 ReadVcFuture::<Completion>::from(raw_result.into_read().untracked()),
588 )
589 .await?;
590
591 Ok(rx.await?)
592 }
593
594 pub(crate) fn native_call(
595 &self,
596 native_fn: &'static NativeFunction,
597 this: Option<RawVc>,
598 arg: Box<dyn MagicAny>,
599 persistence: TaskPersistence,
600 ) -> RawVc {
601 match persistence {
602 TaskPersistence::Local => {
603 let task_type = LocalTaskSpec {
604 task_type: LocalTaskType::Native { native_fn },
605 this,
606 arg,
607 };
608 self.schedule_local_task(task_type, persistence)
609 }
610 TaskPersistence::Transient => {
611 let task_type = CachedTaskType {
612 native_fn,
613 this,
614 arg,
615 };
616
617 RawVc::TaskOutput(self.backend.get_or_create_transient_task(
618 task_type,
619 current_task("turbo_function calls"),
620 self,
621 ))
622 }
623 TaskPersistence::Persistent => {
624 let task_type = CachedTaskType {
625 native_fn,
626 this,
627 arg,
628 };
629
630 RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
631 task_type,
632 current_task("turbo_function calls"),
633 self,
634 ))
635 }
636 }
637 }
638
639 pub fn dynamic_call(
640 &self,
641 native_fn: &'static NativeFunction,
642 this: Option<RawVc>,
643 arg: Box<dyn MagicAny>,
644 persistence: TaskPersistence,
645 ) -> RawVc {
646 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
647 return self.native_call(native_fn, this, arg, persistence);
648 }
649 let task_type = LocalTaskSpec {
650 task_type: LocalTaskType::ResolveNative { native_fn },
651 this,
652 arg,
653 };
654 self.schedule_local_task(task_type, persistence)
655 }
656
657 pub fn trait_call(
658 &self,
659 trait_method: &'static TraitMethod,
660 this: RawVc,
661 arg: Box<dyn MagicAny>,
662 persistence: TaskPersistence,
663 ) -> RawVc {
664 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
668 match registry::get_value_type(type_id).get_trait_method(trait_method) {
669 Some(native_fn) => {
670 let arg = native_fn.arg_meta.filter_owned(arg);
671 return self.dynamic_call(native_fn, Some(this), arg, persistence);
672 }
673 None => {
674 }
678 }
679 }
680
681 let task_type = LocalTaskSpec {
683 task_type: LocalTaskType::ResolveTrait { trait_method },
684 this: Some(this),
685 arg,
686 };
687
688 self.schedule_local_task(task_type, persistence)
689 }
690
691 #[track_caller]
692 pub(crate) fn schedule(&self, task_id: TaskId) {
693 self.begin_primary_job();
694 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
695
696 let this = self.pin();
697 let future = async move {
698 let mut schedule_again = true;
699 while schedule_again {
700 let backend_state = this.backend.new_task_state(task_id);
701 let execution_id = this.execution_id_factory.wrapping_get();
703 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
704 task_id,
705 execution_id,
706 Box::new(backend_state),
707 )));
708 let single_execution_future = async {
709 if this.stopped.load(Ordering::Acquire) {
710 this.backend.task_execution_canceled(task_id, &*this);
711 return false;
712 }
713
714 let Some(TaskExecutionSpec { future, span }) =
715 this.backend.try_start_task_execution(task_id, &*this)
716 else {
717 return false;
718 };
719
720 async {
721 let (result, duration, memory_usage) = CaptureFuture::new(future).await;
722
723 let ltt = CURRENT_TASK_STATE
725 .with(|ts| ts.read().unwrap().local_task_tracker.clone());
726 ltt.close();
727 ltt.wait().await;
728
729 let result = match result {
730 Ok(Ok(raw_vc)) => Ok(raw_vc),
731 Ok(Err(err)) => Err(err.into()),
732 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
733 };
734
735 this.backend.task_execution_result(task_id, result, &*this);
736 let FinishedTaskState {
737 stateful,
738 has_invalidator,
739 } = this.finish_current_task_state();
740 let cell_counters = CURRENT_TASK_STATE
741 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
742 let schedule_again = this.backend.task_execution_completed(
743 task_id,
744 duration,
745 memory_usage,
746 &cell_counters,
747 stateful,
748 has_invalidator,
749 &*this,
750 );
751 this.notify_scheduled_tasks();
753 schedule_again
754 }
755 .instrument(span)
756 .await
757 };
758 schedule_again = CURRENT_TASK_STATE
759 .scope(current_task_state, single_execution_future)
760 .await;
761 }
762 this.finish_primary_job();
763 anyhow::Ok(())
764 };
765
766 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
767
768 #[cfg(feature = "tokio_tracing")]
769 {
770 let description = self.backend.get_task_description(task_id);
771 tokio::task::Builder::new()
772 .name(&description)
773 .spawn(future)
774 .unwrap();
775 }
776 #[cfg(not(feature = "tokio_tracing"))]
777 tokio::task::spawn(future);
778 }
779
780 fn schedule_local_task(
781 &self,
782 ty: LocalTaskSpec,
783 persistence: TaskPersistence,
791 ) -> RawVc {
792 let task_type = ty.task_type;
793 let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
794 .with(|gts| {
795 let mut gts_write = gts.write().unwrap();
796 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
797 done_event: Event::new({
798 move || format!("LocalTask({task_type})::done_event")
799 }),
800 });
801 (
802 Arc::clone(gts),
803 gts_write.task_id,
804 gts_write.execution_id,
805 local_task_id,
806 )
807 });
808
809 #[cfg(feature = "tokio_tracing")]
810 let description = format!(
811 "[local] (parent: {}) {}",
812 self.backend.get_task_description(parent_task_id),
813 ty,
814 );
815 #[cfg(not(feature = "tokio_tracing"))]
816 let _ = parent_task_id; let this = self.pin();
819 let future = async move {
820 let TaskExecutionSpec { future, span } =
821 crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
822 async move {
823 let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
824
825 let result = match result {
826 Ok(Ok(raw_vc)) => Ok(raw_vc),
827 Ok(Err(err)) => Err(err.into()),
828 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
829 };
830
831 let local_task = LocalTask::Done {
832 output: match result {
833 Ok(raw_vc) => OutputContent::Link(raw_vc),
834 Err(err) => OutputContent::Error(err.task_context(task_type)),
835 },
836 };
837
838 let done_event = CURRENT_TASK_STATE.with(move |gts| {
839 let mut gts_write = gts.write().unwrap();
840 let scheduled_task =
841 std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
842 let LocalTask::Scheduled { done_event } = scheduled_task else {
843 panic!("local task finished, but was not in the scheduled state?");
844 };
845 done_event
846 });
847 done_event.notify(usize::MAX)
848 }
849 .instrument(span)
850 .await
851 };
852 let future = global_task_state
853 .read()
854 .unwrap()
855 .local_task_tracker
856 .track_future(future);
857 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
858 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
859
860 #[cfg(feature = "tokio_tracing")]
861 tokio::task::Builder::new()
862 .name(&description)
863 .spawn(future)
864 .unwrap();
865 #[cfg(not(feature = "tokio_tracing"))]
866 tokio::task::spawn(future);
867
868 RawVc::LocalOutput(execution_id, local_task_id, persistence)
869 }
870
871 fn begin_primary_job(&self) {
872 if self
873 .currently_scheduled_tasks
874 .fetch_add(1, Ordering::AcqRel)
875 == 0
876 {
877 *self.start.lock().unwrap() = Some(Instant::now());
878 self.event_start.notify(usize::MAX);
879 self.backend.idle_end(self);
880 }
881 }
882
883 fn begin_foreground_job(&self) {
884 self.begin_primary_job();
885 self.currently_scheduled_foreground_jobs
886 .fetch_add(1, Ordering::AcqRel);
887 }
888
889 fn finish_primary_job(&self) {
890 if self
891 .currently_scheduled_tasks
892 .fetch_sub(1, Ordering::AcqRel)
893 == 1
894 {
895 self.backend.idle_start(self);
896 let total = self.scheduled_tasks.load(Ordering::Acquire);
899 self.scheduled_tasks.store(0, Ordering::Release);
900 if let Some(start) = *self.start.lock().unwrap() {
901 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
902 if let Some(update) = update.as_mut() {
903 update.0 += start.elapsed();
904 update.1 += total;
905 } else {
906 *update = Some((start.elapsed(), total));
907 }
908 }
909 self.event.notify(usize::MAX);
910 }
911 }
912
913 fn finish_foreground_job(&self) {
914 if self
915 .currently_scheduled_foreground_jobs
916 .fetch_sub(1, Ordering::AcqRel)
917 == 1
918 {
919 self.event_foreground.notify(usize::MAX);
920 }
921 self.finish_primary_job();
922 }
923
924 pub async fn wait_foreground_done(&self) {
925 if self
926 .currently_scheduled_foreground_jobs
927 .load(Ordering::Acquire)
928 == 0
929 {
930 return;
931 }
932 let listener = self.event_foreground.listen();
933 if self
934 .currently_scheduled_foreground_jobs
935 .load(Ordering::Acquire)
936 == 0
937 {
938 return;
939 }
940 listener
941 .instrument(trace_span!("wait_foreground_done"))
942 .await;
943 }
944
945 pub fn get_in_progress_count(&self) -> usize {
946 self.currently_scheduled_tasks.load(Ordering::Acquire)
947 }
948
949 pub async fn wait_task_completion(
961 &self,
962 id: TaskId,
963 consistency: ReadConsistency,
964 ) -> Result<()> {
965 read_task_output_untracked(self, id, consistency).await?;
967 Ok(())
968 }
969
970 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
973 self.aggregated_update_info(aggregation, Duration::MAX)
974 .await
975 .unwrap()
976 }
977
978 pub async fn aggregated_update_info(
982 &self,
983 aggregation: Duration,
984 timeout: Duration,
985 ) -> Option<UpdateInfo> {
986 let listener = self
987 .event
988 .listen_with_note(|| "wait for update info".to_string());
989 let wait_for_finish = {
990 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
991 if aggregation.is_zero() {
992 if let Some((duration, tasks)) = update.take() {
993 return Some(UpdateInfo {
994 duration,
995 tasks,
996 reasons: take(reason_set),
997 placeholder_for_future_fields: (),
998 });
999 } else {
1000 true
1001 }
1002 } else {
1003 update.is_none()
1004 }
1005 };
1006 if wait_for_finish {
1007 if timeout == Duration::MAX {
1008 listener.await;
1010 } else {
1011 let start_listener = self
1013 .event_start
1014 .listen_with_note(|| "wait for update info".to_string());
1015 if self.currently_scheduled_tasks.load(Ordering::Acquire) == 0 {
1016 start_listener.await;
1017 } else {
1018 drop(start_listener);
1019 }
1020 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1021 return None;
1023 }
1024 }
1025 }
1026 if !aggregation.is_zero() {
1027 loop {
1028 select! {
1029 () = tokio::time::sleep(aggregation) => {
1030 break;
1031 }
1032 () = self.event.listen_with_note(|| "wait for update info".to_string()) => {
1033 }
1035 }
1036 }
1037 }
1038 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1039 if let Some((duration, tasks)) = update.take() {
1040 Some(UpdateInfo {
1041 duration,
1042 tasks,
1043 reasons: take(reason_set),
1044 placeholder_for_future_fields: (),
1045 })
1046 } else {
1047 panic!("aggregated_update_info must not called concurrently")
1048 }
1049 }
1050
1051 pub async fn wait_background_done(&self) {
1052 let listener = self.event_background.listen();
1053 if self
1054 .currently_scheduled_background_jobs
1055 .load(Ordering::Acquire)
1056 != 0
1057 {
1058 listener.await;
1059 }
1060 }
1061
1062 pub async fn stop_and_wait(&self) {
1063 self.backend.stopping(self);
1064 self.stopped.store(true, Ordering::Release);
1065 {
1066 let listener = self.event.listen_with_note(|| "wait for stop".to_string());
1067 if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1068 listener.await;
1069 }
1070 }
1071 {
1072 let listener = self.event_background.listen();
1073 if self
1074 .currently_scheduled_background_jobs
1075 .load(Ordering::Acquire)
1076 != 0
1077 {
1078 listener.await;
1079 }
1080 }
1081 self.backend.stop(self);
1082 }
1083
1084 #[track_caller]
1085 pub(crate) fn schedule_background_job<
1086 T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1087 F: Future<Output = ()> + Send + 'static,
1088 >(
1089 &self,
1090 func: T,
1091 ) {
1092 let this = self.pin();
1093 self.currently_scheduled_background_jobs
1094 .fetch_add(1, Ordering::AcqRel);
1095 tokio::spawn(
1096 TURBO_TASKS
1097 .scope(this.clone(), async move {
1098 while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1099 let listener = this.event.listen_with_note(|| {
1100 "background job waiting for execution".to_string()
1101 });
1102 if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1103 listener.await;
1104 }
1105 }
1106 let this2 = this.clone();
1107 if !this.stopped.load(Ordering::Acquire) {
1108 func(this).await;
1109 }
1110 if this2
1111 .currently_scheduled_background_jobs
1112 .fetch_sub(1, Ordering::AcqRel)
1113 == 1
1114 {
1115 this2.event_background.notify(usize::MAX);
1116 }
1117 })
1118 .in_current_span(),
1119 );
1120 }
1121
1122 #[track_caller]
1123 pub(crate) fn schedule_foreground_job<
1124 T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1125 F: Future<Output = ()> + Send + 'static,
1126 >(
1127 &self,
1128 func: T,
1129 ) {
1130 let this = self.pin();
1131 this.begin_foreground_job();
1132 tokio::spawn(
1133 TURBO_TASKS
1134 .scope(this.clone(), async move {
1135 if !this.stopped.load(Ordering::Acquire) {
1136 func(this.clone()).await;
1137 }
1138 this.finish_foreground_job();
1139 })
1140 .in_current_span(),
1141 );
1142 }
1143
1144 fn finish_current_task_state(&self) -> FinishedTaskState {
1145 let (stateful, has_invalidator, tasks) = CURRENT_TASK_STATE.with(|cell| {
1146 let CurrentTaskState {
1147 tasks_to_notify,
1148 stateful,
1149 has_invalidator,
1150 ..
1151 } = &mut *cell.write().unwrap();
1152 (*stateful, *has_invalidator, take(tasks_to_notify))
1153 });
1154
1155 if !tasks.is_empty() {
1156 self.backend.invalidate_tasks(&tasks, self);
1157 }
1158 FinishedTaskState {
1159 stateful,
1160 has_invalidator,
1161 }
1162 }
1163
1164 pub fn backend(&self) -> &B {
1165 &self.backend
1166 }
1167}
1168
1169struct FinishedTaskState {
1170 stateful: bool,
1172
1173 has_invalidator: bool,
1175}
1176
1177impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1178 fn dynamic_call(
1179 &self,
1180 native_fn: &'static NativeFunction,
1181 this: Option<RawVc>,
1182 arg: Box<dyn MagicAny>,
1183 persistence: TaskPersistence,
1184 ) -> RawVc {
1185 self.dynamic_call(native_fn, this, arg, persistence)
1186 }
1187 fn native_call(
1188 &self,
1189 native_fn: &'static NativeFunction,
1190 this: Option<RawVc>,
1191 arg: Box<dyn MagicAny>,
1192 persistence: TaskPersistence,
1193 ) -> RawVc {
1194 self.native_call(native_fn, this, arg, persistence)
1195 }
1196 fn trait_call(
1197 &self,
1198 trait_method: &'static TraitMethod,
1199 this: RawVc,
1200 arg: Box<dyn MagicAny>,
1201 persistence: TaskPersistence,
1202 ) -> RawVc {
1203 self.trait_call(trait_method, this, arg, persistence)
1204 }
1205
1206 #[track_caller]
1207 fn run_once(
1208 &self,
1209 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1210 ) -> TaskId {
1211 self.spawn_once_task(async move {
1212 future.await?;
1213 Ok(Completion::new())
1214 })
1215 }
1216
1217 #[track_caller]
1218 fn run_once_with_reason(
1219 &self,
1220 reason: StaticOrArc<dyn InvalidationReason>,
1221 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1222 ) -> TaskId {
1223 {
1224 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1225 reason_set.insert(reason);
1226 }
1227 self.spawn_once_task(async move {
1228 future.await?;
1229 Ok(Completion::new())
1230 })
1231 }
1232
1233 #[track_caller]
1234 fn run_once_process(
1235 &self,
1236 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1237 ) -> TaskId {
1238 let this = self.pin();
1239 self.spawn_once_task(async move {
1240 this.finish_primary_job();
1241 future.await?;
1242 this.begin_primary_job();
1243 Ok(Completion::new())
1244 })
1245 }
1246}
1247
1248impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1249 #[instrument(level = Level::INFO, skip_all, name = "invalidate")]
1250 fn invalidate(&self, task: TaskId) {
1251 self.backend.invalidate_task(task, self);
1252 }
1253
1254 #[instrument(level = Level::INFO, skip_all, name = "invalidate", fields(name = display(&reason)))]
1255 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1256 {
1257 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1258 reason_set.insert(reason);
1259 }
1260 self.backend.invalidate_task(task, self);
1261 }
1262
1263 fn invalidate_serialization(&self, task: TaskId) {
1264 self.backend.invalidate_serialization(task, self);
1265 }
1266
1267 fn notify_scheduled_tasks(&self) {
1268 let _ = CURRENT_TASK_STATE.try_with(|cell| {
1269 let tasks = {
1270 let CurrentTaskState {
1271 tasks_to_notify, ..
1272 } = &mut *cell.write().unwrap();
1273 take(tasks_to_notify)
1274 };
1275 if tasks.is_empty() {
1276 return;
1277 }
1278 self.backend.invalidate_tasks(&tasks, self);
1279 });
1280 }
1281
1282 fn try_read_task_output(
1283 &self,
1284 task: TaskId,
1285 consistency: ReadConsistency,
1286 ) -> Result<Result<RawVc, EventListener>> {
1287 self.backend
1288 .try_read_task_output(task, current_task("reading Vcs"), consistency, self)
1289 }
1290
1291 fn try_read_task_output_untracked(
1292 &self,
1293 task: TaskId,
1294 consistency: ReadConsistency,
1295 ) -> Result<Result<RawVc, EventListener>> {
1296 self.backend
1297 .try_read_task_output_untracked(task, consistency, self)
1298 }
1299
1300 fn try_read_task_cell(
1301 &self,
1302 task: TaskId,
1303 index: CellId,
1304 options: ReadCellOptions,
1305 ) -> Result<Result<TypedCellContent, EventListener>> {
1306 self.backend
1307 .try_read_task_cell(task, index, current_task("reading Vcs"), options, self)
1308 }
1309
1310 fn try_read_task_cell_untracked(
1311 &self,
1312 task: TaskId,
1313 index: CellId,
1314 options: ReadCellOptions,
1315 ) -> Result<Result<TypedCellContent, EventListener>> {
1316 self.backend
1317 .try_read_task_cell_untracked(task, index, options, self)
1318 }
1319
1320 fn try_read_own_task_cell_untracked(
1321 &self,
1322 current_task: TaskId,
1323 index: CellId,
1324 options: ReadCellOptions,
1325 ) -> Result<TypedCellContent> {
1326 self.backend
1327 .try_read_own_task_cell_untracked(current_task, index, options, self)
1328 }
1329
1330 fn try_read_local_output(
1331 &self,
1332 execution_id: ExecutionId,
1333 local_task_id: LocalTaskId,
1334 ) -> Result<Result<RawVc, EventListener>> {
1335 CURRENT_TASK_STATE.with(|gts| {
1336 let gts_read = gts.read().unwrap();
1337
1338 gts_read.assert_execution_id(execution_id);
1343
1344 match gts_read.get_local_task(local_task_id) {
1345 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1346 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1347 }
1348 })
1349 }
1350
1351 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1352 self.backend.read_task_collectibles(
1353 task,
1354 trait_id,
1355 current_task("reading collectibles"),
1356 self,
1357 )
1358 }
1359
1360 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1361 self.backend.emit_collectible(
1362 trait_type,
1363 collectible,
1364 current_task("emitting collectible"),
1365 self,
1366 );
1367 }
1368
1369 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1370 self.backend.unemit_collectible(
1371 trait_type,
1372 collectible,
1373 count,
1374 current_task("emitting collectible"),
1375 self,
1376 );
1377 }
1378
1379 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1380 for (&collectible, &count) in collectibles {
1381 if count > 0 {
1382 self.backend.unemit_collectible(
1383 trait_type,
1384 collectible,
1385 count as u32,
1386 current_task("emitting collectible"),
1387 self,
1388 );
1389 }
1390 }
1391 }
1392
1393 fn read_own_task_cell(
1394 &self,
1395 task: TaskId,
1396 index: CellId,
1397 options: ReadCellOptions,
1398 ) -> Result<TypedCellContent> {
1399 self.try_read_own_task_cell_untracked(task, index, options)
1401 }
1402
1403 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1404 self.backend.update_task_cell(task, index, content, self);
1405 }
1406
1407 fn connect_task(&self, task: TaskId) {
1408 self.backend
1409 .connect_task(task, current_task("connecting task"), self);
1410 }
1411
1412 fn mark_own_task_as_finished(&self, task: TaskId) {
1413 self.backend.mark_own_task_as_finished(task, self);
1414 }
1415
1416 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1417 self.backend
1418 .set_own_task_aggregation_number(task, aggregation_number, self);
1419 }
1420
1421 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1422 self.backend.mark_own_task_as_session_dependent(task, self);
1423 }
1424
1425 fn detached_for_testing(
1428 &self,
1429 fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1430 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1431 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1434 let tracked_fut = {
1435 let ts = global_task_state.read().unwrap();
1436 ts.local_task_tracker.track_future(fut)
1437 };
1438 Box::pin(TURBO_TASKS.scope(
1439 turbo_tasks(),
1440 CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1441 ))
1442 }
1443
1444 fn task_statistics(&self) -> &TaskStatisticsApi {
1445 self.backend.task_statistics()
1446 }
1447
1448 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1449 let this = self.pin();
1450 Box::pin(async move {
1451 this.stop_and_wait().await;
1452 })
1453 }
1454
1455 fn subscribe_to_compilation_events(
1456 &self,
1457 event_types: Option<Vec<String>>,
1458 ) -> Receiver<Arc<dyn CompilationEvent>> {
1459 self.compilation_events.subscribe(event_types)
1460 }
1461
1462 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1463 if let Err(e) = self.compilation_events.send(event) {
1464 tracing::warn!("Failed to send compilation event: {e}");
1465 }
1466 }
1467}
1468
1469impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1470 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1471 self.pin()
1472 }
1473 fn backend(&self) -> &B {
1474 &self.backend
1475 }
1476
1477 #[track_caller]
1478 fn schedule_backend_background_job(&self, id: BackendJobId) {
1479 self.schedule_background_job(move |this| async move {
1480 this.backend.run_backend_job(id, &*this).await;
1481 })
1482 }
1483
1484 #[track_caller]
1485 fn schedule_backend_foreground_job(&self, id: BackendJobId) {
1486 self.schedule_foreground_job(move |this| async move {
1487 this.backend.run_backend_job(id, &*this).await;
1488 })
1489 }
1490
1491 fn try_foreground_done(&self) -> Result<(), EventListener> {
1492 if self
1493 .currently_scheduled_foreground_jobs
1494 .load(Ordering::Acquire)
1495 == 0
1496 {
1497 return Ok(());
1498 }
1499 let listener = self.event_foreground.listen();
1500 if self
1501 .currently_scheduled_foreground_jobs
1502 .load(Ordering::Acquire)
1503 == 0
1504 {
1505 return Ok(());
1506 }
1507 Err(listener)
1508 }
1509
1510 fn wait_foreground_done_excluding_own<'a>(
1511 &'a self,
1512 ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>> {
1513 if self
1514 .currently_scheduled_foreground_jobs
1515 .load(Ordering::Acquire)
1516 == 0
1517 {
1518 return None;
1519 }
1520 Some(Box::pin(async {
1521 self.finish_foreground_job();
1522 self.wait_foreground_done().await;
1523 self.begin_foreground_job();
1524 }))
1525 }
1526
1527 fn schedule_notify_tasks(&self, tasks: &[TaskId]) {
1530 let result = CURRENT_TASK_STATE.try_with(|cell| {
1531 let CurrentTaskState {
1532 tasks_to_notify, ..
1533 } = &mut *cell.write().unwrap();
1534 tasks_to_notify.extend(tasks.iter().copied());
1535 });
1536 if result.is_err() {
1537 let _guard = trace_span!("schedule_notify_tasks", count = tasks.len()).entered();
1538 self.backend.invalidate_tasks(tasks, self);
1539 }
1540 }
1541
1542 fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) {
1545 let result = CURRENT_TASK_STATE.try_with(|cell| {
1546 let CurrentTaskState {
1547 tasks_to_notify, ..
1548 } = &mut *cell.write().unwrap();
1549 tasks_to_notify.extend(tasks.iter().copied());
1550 });
1551 if result.is_err() {
1552 let _guard = trace_span!("schedule_notify_tasks_set", count = tasks.len()).entered();
1553 self.backend.invalidate_tasks_set(tasks, self);
1554 };
1555 }
1556
1557 #[track_caller]
1558 fn schedule(&self, task: TaskId) {
1559 self.schedule(task)
1560 }
1561
1562 fn program_duration_until(&self, instant: Instant) -> Duration {
1563 instant - self.program_start
1564 }
1565
1566 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1567 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1569 }
1570
1571 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1572 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1574 }
1575
1576 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1577 unsafe { self.task_id_factory.reuse(id.into()) }
1578 }
1579
1580 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1581 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1582 }
1583
1584 fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState)) {
1585 CURRENT_TASK_STATE
1586 .with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap()))
1587 }
1588
1589 fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState)) {
1590 CURRENT_TASK_STATE
1591 .with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap()))
1592 }
1593
1594 fn is_idle(&self) -> bool {
1595 self.currently_scheduled_tasks.load(Ordering::Acquire) == 0
1596 }
1597}
1598
1599pub(crate) fn current_task(from: &str) -> TaskId {
1600 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1601 Ok(id) => id,
1602 Err(_) => panic!("{from} can only be used in the context of turbo_tasks task execution"),
1603 }
1604}
1605
1606pub async fn run_once<T: Send + 'static>(
1607 tt: Arc<dyn TurboTasksApi>,
1608 future: impl Future<Output = Result<T>> + Send + 'static,
1609) -> Result<T> {
1610 let (tx, rx) = tokio::sync::oneshot::channel();
1611
1612 let task_id = tt.run_once(Box::pin(async move {
1613 let result = future.await?;
1614 tx.send(result)
1615 .map_err(|_| anyhow!("unable to send result"))?;
1616 Ok(())
1617 }));
1618
1619 let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1622 let raw_future = raw_result.into_read().untracked();
1623 turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1624
1625 Ok(rx.await?)
1626}
1627
1628pub async fn run_once_with_reason<T: Send + 'static>(
1629 tt: Arc<dyn TurboTasksApi>,
1630 reason: impl InvalidationReason,
1631 future: impl Future<Output = Result<T>> + Send + 'static,
1632) -> Result<T> {
1633 let (tx, rx) = tokio::sync::oneshot::channel();
1634
1635 let task_id = tt.run_once_with_reason(
1636 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1637 Box::pin(async move {
1638 let result = future.await?;
1639 tx.send(result)
1640 .map_err(|_| anyhow!("unable to send result"))?;
1641 Ok(())
1642 }),
1643 );
1644
1645 let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1648 let raw_future = raw_result.into_read().untracked();
1649 turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1650
1651 Ok(rx.await?)
1652}
1653
1654pub fn dynamic_call(
1656 func: &'static NativeFunction,
1657 this: Option<RawVc>,
1658 arg: Box<dyn MagicAny>,
1659 persistence: TaskPersistence,
1660) -> RawVc {
1661 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1662}
1663
1664pub fn trait_call(
1666 trait_method: &'static TraitMethod,
1667 this: RawVc,
1668 arg: Box<dyn MagicAny>,
1669 persistence: TaskPersistence,
1670) -> RawVc {
1671 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1672}
1673
1674pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1675 TURBO_TASKS.with(|arc| arc.clone())
1676}
1677
1678pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1679 TURBO_TASKS.with(|arc| func(arc))
1680}
1681
1682pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1683 TURBO_TASKS.sync_scope(tt, f)
1684}
1685
1686pub fn turbo_tasks_future_scope<T>(
1687 tt: Arc<dyn TurboTasksApi>,
1688 f: impl Future<Output = T>,
1689) -> impl Future<Output = T> {
1690 TURBO_TASKS.scope(tt, f)
1691}
1692
1693pub fn with_turbo_tasks_for_testing<T>(
1694 tt: Arc<dyn TurboTasksApi>,
1695 current_task: TaskId,
1696 execution_id: ExecutionId,
1697 f: impl Future<Output = T>,
1698) -> impl Future<Output = T> {
1699 TURBO_TASKS.scope(
1700 tt,
1701 CURRENT_TASK_STATE.scope(
1702 Arc::new(RwLock::new(CurrentTaskState::new(
1703 current_task,
1704 execution_id,
1705 Box::new(()),
1706 ))),
1707 f,
1708 ),
1709 )
1710}
1711
1712pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1717 tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1718}
1719
1720pub fn current_task_for_testing() -> TaskId {
1721 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1722}
1723
1724pub fn mark_session_dependent() {
1726 with_turbo_tasks(|tt| {
1727 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1728 });
1729}
1730
1731pub fn mark_root() {
1734 with_turbo_tasks(|tt| {
1735 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1736 });
1737}
1738
1739pub fn mark_finished() {
1742 with_turbo_tasks(|tt| {
1743 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1744 });
1745}
1746
1747pub fn mark_stateful() -> SerializationInvalidator {
1753 CURRENT_TASK_STATE.with(|cell| {
1754 let CurrentTaskState {
1755 stateful, task_id, ..
1756 } = &mut *cell.write().unwrap();
1757 *stateful = true;
1758 SerializationInvalidator::new(*task_id)
1759 })
1760}
1761
1762pub fn mark_invalidator() {
1763 CURRENT_TASK_STATE.with(|cell| {
1764 let CurrentTaskState {
1765 has_invalidator, ..
1766 } = &mut *cell.write().unwrap();
1767 *has_invalidator = true;
1768 })
1769}
1770
1771pub fn prevent_gc() {
1772 mark_stateful();
1774}
1775
1776pub fn notify_scheduled_tasks() {
1778 with_turbo_tasks(|tt| tt.notify_scheduled_tasks())
1779}
1780
1781pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1782 with_turbo_tasks(|tt| {
1783 let raw_vc = collectible.node.node;
1784 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1785 })
1786}
1787
1788pub async fn spawn_blocking<T: Send + 'static>(func: impl FnOnce() -> T + Send + 'static) -> T {
1789 let turbo_tasks = turbo_tasks();
1790 let span = Span::current();
1791 let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| {
1792 let _guard = span.entered();
1793 let start = Instant::now();
1794 let start_allocations = TurboMalloc::allocation_counters();
1795 let r = turbo_tasks_scope(turbo_tasks, func);
1796 (r, start.elapsed(), start_allocations.until_now())
1797 })
1798 .await
1799 .unwrap();
1800 capture_future::add_duration(duration);
1801 capture_future::add_allocation_info(alloc_info);
1802 result
1803}
1804
1805pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
1806 let handle = Handle::current();
1807 let span = info_span!("thread").or_current();
1808 thread::spawn(move || {
1809 let span = span.entered();
1810 let guard = handle.enter();
1811 func();
1812 drop(guard);
1813 drop(span);
1814 });
1815}
1816
1817pub(crate) async fn read_task_output(
1818 this: &dyn TurboTasksApi,
1819 id: TaskId,
1820 consistency: ReadConsistency,
1821) -> Result<RawVc> {
1822 loop {
1823 match this.try_read_task_output(id, consistency)? {
1824 Ok(result) => return Ok(result),
1825 Err(listener) => listener.await,
1826 }
1827 }
1828}
1829
1830pub(crate) async fn read_task_output_untracked(
1833 this: &dyn TurboTasksApi,
1834 id: TaskId,
1835 consistency: ReadConsistency,
1836) -> Result<RawVc> {
1837 loop {
1838 match this.try_read_task_output_untracked(id, consistency)? {
1839 Ok(result) => return Ok(result),
1840 Err(listener) => listener.await,
1841 }
1842 }
1843}
1844
1845pub(crate) async fn read_task_cell(
1846 this: &dyn TurboTasksApi,
1847 id: TaskId,
1848 index: CellId,
1849 options: ReadCellOptions,
1850) -> Result<TypedCellContent> {
1851 loop {
1852 match this.try_read_task_cell(id, index, options)? {
1853 Ok(result) => return Ok(result),
1854 Err(listener) => listener.await,
1855 }
1856 }
1857}
1858
1859#[derive(Clone, Copy, Serialize, Deserialize)]
1865pub struct CurrentCellRef {
1866 current_task: TaskId,
1867 index: CellId,
1868}
1869
1870type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1871
1872impl CurrentCellRef {
1873 pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1875 where
1876 T: VcValueType,
1877 {
1878 self.conditional_update_with_shared_reference(|old_shared_reference| {
1879 let old_ref = old_shared_reference
1880 .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1881 .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1882 let new_value = functor(old_ref)?;
1883 Some(SharedReference::new(triomphe::Arc::new(
1884 <T::Read as VcRead<T>>::value_to_repr(new_value),
1885 )))
1886 })
1887 }
1888
1889 pub fn conditional_update_with_shared_reference(
1891 &self,
1892 functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1893 ) {
1894 let tt = turbo_tasks();
1895 let cell_content = tt
1896 .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1897 .ok();
1898 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1899 if let Some(update) = update {
1900 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1901 }
1902 }
1903
1904 pub fn compare_and_update<T>(&self, new_value: T)
1938 where
1939 T: PartialEq + VcValueType,
1940 {
1941 self.conditional_update(|old_value| {
1942 if let Some(old_value) = old_value
1943 && old_value == &new_value
1944 {
1945 return None;
1946 }
1947 Some(new_value)
1948 });
1949 }
1950
1951 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1960 where
1961 T: VcValueType + PartialEq,
1962 {
1963 fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1964 <T::Read as VcRead<T>>::repr_to_value_ref(
1965 sr.0.downcast_ref::<VcReadRepr<T>>()
1966 .expect("cannot update SharedReference of different type"),
1967 )
1968 }
1969 self.conditional_update_with_shared_reference(|old_sr| {
1970 if let Some(old_sr) = old_sr {
1971 let old_value: &T = extract_sr_value(old_sr);
1972 let new_value = extract_sr_value(&new_shared_reference);
1973 if old_value == new_value {
1974 return None;
1975 }
1976 }
1977 Some(new_shared_reference)
1978 });
1979 }
1980
1981 pub fn update<T>(&self, new_value: T)
1983 where
1984 T: VcValueType,
1985 {
1986 let tt = turbo_tasks();
1987 tt.update_own_task_cell(
1988 self.current_task,
1989 self.index,
1990 CellContent(Some(SharedReference::new(triomphe::Arc::new(
1991 <T::Read as VcRead<T>>::value_to_repr(new_value),
1992 )))),
1993 )
1994 }
1995
1996 pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
2005 let tt = turbo_tasks();
2006 let content = tt
2007 .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
2008 .ok();
2009 let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2010 shared_ref_exp != shared_ref
2012 } else {
2013 true
2014 };
2015 if update {
2016 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
2017 }
2018 }
2019}
2020
2021impl From<CurrentCellRef> for RawVc {
2022 fn from(cell: CurrentCellRef) -> Self {
2023 RawVc::TaskCell(cell.current_task, cell.index)
2024 }
2025}
2026
2027pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
2028 CURRENT_TASK_STATE.with(|ts| {
2029 let current_task = current_task("celling turbo_tasks values");
2030 let mut ts = ts.write().unwrap();
2031 let map = ts.cell_counters.as_mut().unwrap();
2032 let current_index = map.entry(ty).or_default();
2033 let index = *current_index;
2034 *current_index += 1;
2035 CurrentCellRef {
2036 current_task,
2037 index: CellId { type_id: ty, index },
2038 }
2039 })
2040}
2041
2042pub(crate) async fn read_local_output(
2043 this: &dyn TurboTasksApi,
2044 execution_id: ExecutionId,
2045 local_task_id: LocalTaskId,
2046) -> Result<RawVc> {
2047 loop {
2048 match this.try_read_local_output(execution_id, local_task_id)? {
2049 Ok(raw_vc) => return Ok(raw_vc),
2050 Err(event_listener) => event_listener.await,
2051 }
2052 }
2053}