1use std::{
2 any::Any,
3 borrow::Cow,
4 future::Future,
5 hash::BuildHasherDefault,
6 mem::take,
7 pin::Pin,
8 sync::{
9 Arc, Mutex, RwLock, Weak,
10 atomic::{AtomicBool, AtomicUsize, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use anyhow::{Result, anyhow};
17use auto_hash_map::AutoMap;
18use rustc_hash::FxHasher;
19use serde::{Deserialize, Serialize};
20use tokio::{runtime::Handle, select, sync::mpsc::Receiver, task_local};
21use tokio_util::task::TaskTracker;
22use tracing::{Instrument, Level, Span, info_span, instrument, trace_span};
23use turbo_tasks_malloc::TurboMalloc;
24
25use crate::{
26 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
27 ResolvedVc, SharedReference, TaskId, TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait,
28 VcValueType,
29 backend::{
30 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
31 TransientTaskType, TurboTasksExecutionError, TypedCellContent,
32 },
33 capture_future::{self, CaptureFuture},
34 event::{Event, EventListener},
35 id::{BackendJobId, ExecutionId, FunctionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
36 id_factory::IdFactoryWithReuse,
37 magic_any::MagicAny,
38 message_queue::{CompilationEvent, CompilationEventQueue},
39 raw_vc::{CellId, RawVc},
40 registry,
41 serialization_invalidation::SerializationInvalidator,
42 task::local_task::{LocalTask, LocalTaskType},
43 task_statistics::TaskStatisticsApi,
44 trace::TraceRawVcs,
45 trait_helpers::get_trait_method,
46 util::{IdFactory, SharedError, StaticOrArc},
47 vc::ReadVcFuture,
48};
49
50pub trait TurboTasksCallApi: Sync + Send {
53 fn dynamic_call(
56 &self,
57 func: FunctionId,
58 this: Option<RawVc>,
59 arg: Box<dyn MagicAny>,
60 persistence: TaskPersistence,
61 ) -> RawVc;
62 fn native_call(
65 &self,
66 func: FunctionId,
67 this: Option<RawVc>,
68 arg: Box<dyn MagicAny>,
69 persistence: TaskPersistence,
70 ) -> RawVc;
71 fn trait_call(
74 &self,
75 trait_type: TraitTypeId,
76 trait_fn_name: Cow<'static, str>,
77 this: RawVc,
78 arg: Box<dyn MagicAny>,
79 persistence: TaskPersistence,
80 ) -> RawVc;
81
82 fn run_once(
83 &self,
84 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
85 ) -> TaskId;
86 fn run_once_with_reason(
87 &self,
88 reason: StaticOrArc<dyn InvalidationReason>,
89 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
90 ) -> TaskId;
91 fn run_once_process(
92 &self,
93 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
94 ) -> TaskId;
95}
96
97pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
103 fn pin(&self) -> Arc<dyn TurboTasksApi>;
104
105 fn invalidate(&self, task: TaskId);
106 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
107
108 fn invalidate_serialization(&self, task: TaskId);
109
110 fn notify_scheduled_tasks(&self);
113
114 fn try_read_task_output(
115 &self,
116 task: TaskId,
117 consistency: ReadConsistency,
118 ) -> Result<Result<RawVc, EventListener>>;
119
120 fn try_read_task_output_untracked(
123 &self,
124 task: TaskId,
125 consistency: ReadConsistency,
126 ) -> Result<Result<RawVc, EventListener>>;
127
128 fn try_read_task_cell(
129 &self,
130 task: TaskId,
131 index: CellId,
132 options: ReadCellOptions,
133 ) -> Result<Result<TypedCellContent, EventListener>>;
134
135 fn try_read_task_cell_untracked(
138 &self,
139 task: TaskId,
140 index: CellId,
141 options: ReadCellOptions,
142 ) -> Result<Result<TypedCellContent, EventListener>>;
143
144 fn try_read_local_output(
159 &self,
160 execution_id: ExecutionId,
161 local_task_id: LocalTaskId,
162 ) -> Result<Result<RawVc, EventListener>>;
163
164 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
165
166 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
167 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
168 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
169
170 fn try_read_own_task_cell_untracked(
173 &self,
174 current_task: TaskId,
175 index: CellId,
176 options: ReadCellOptions,
177 ) -> Result<TypedCellContent>;
178
179 fn read_own_task_cell(
180 &self,
181 task: TaskId,
182 index: CellId,
183 options: ReadCellOptions,
184 ) -> Result<TypedCellContent>;
185 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
186 fn mark_own_task_as_finished(&self, task: TaskId);
187 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
188 fn mark_own_task_as_session_dependent(&self, task: TaskId);
189
190 fn connect_task(&self, task: TaskId);
191
192 fn detached_for_testing(
197 &self,
198 f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
199 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
200
201 fn task_statistics(&self) -> &TaskStatisticsApi;
202
203 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
204
205 fn subscribe_to_compilation_events(
206 &self,
207 event_types: Option<Vec<String>>,
208 ) -> Receiver<Arc<dyn CompilationEvent>>;
209 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
210}
211
212pub struct Unused<T> {
214 inner: T,
215}
216
217impl<T> Unused<T> {
218 pub unsafe fn new_unchecked(inner: T) -> Self {
224 Self { inner }
225 }
226
227 pub unsafe fn get_unchecked(&self) -> &T {
233 &self.inner
234 }
235
236 pub fn into(self) -> T {
238 self.inner
239 }
240}
241
242pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
244 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
245
246 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
247 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
248 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
252 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
256
257 fn schedule(&self, task: TaskId);
258 fn schedule_backend_background_job(&self, id: BackendJobId);
259 fn schedule_backend_foreground_job(&self, id: BackendJobId);
260
261 fn try_foreground_done(&self) -> Result<(), EventListener>;
262 fn wait_foreground_done_excluding_own<'a>(
263 &'a self,
264 ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>>;
265
266 fn schedule_notify_tasks(&self, tasks: &[TaskId]);
269
270 fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet);
273
274 fn program_duration_until(&self, instant: Instant) -> Duration;
276
277 fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState));
280
281 fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState));
284
285 fn is_idle(&self) -> bool;
287
288 fn backend(&self) -> &B;
290}
291
292pub trait TurboTasksBackendApiExt<B: Backend + 'static>: TurboTasksBackendApi<B> {
295 fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T {
300 let mut func = Some(func);
301 let mut out = None;
302 self.read_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
303 out.expect("read_task_state_dyn must call `func`")
304 }
305
306 fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T {
311 let mut func = Some(func);
312 let mut out = None;
313 self.write_task_state_dyn(&mut |ts| out = Some((func.take().unwrap())(ts)));
314 out.expect("write_task_state_dyn must call `func`")
315 }
316}
317
318impl<TT, B> TurboTasksBackendApiExt<B> for TT
319where
320 TT: TurboTasksBackendApi<B> + ?Sized,
321 B: Backend + 'static,
322{
323}
324
325#[allow(clippy::manual_non_exhaustive)]
326pub struct UpdateInfo {
327 pub duration: Duration,
328 pub tasks: usize,
329 pub reasons: InvalidationReasonSet,
330 #[allow(dead_code)]
331 placeholder_for_future_fields: (),
332}
333
334#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
335pub enum TaskPersistence {
336 Persistent,
338
339 Transient,
346
347 Local,
356}
357
358#[derive(Clone, Copy, Debug, Eq, PartialEq)]
359pub enum ReadConsistency {
360 Eventual,
363 Strong,
368}
369
370pub struct TurboTasks<B: Backend + 'static> {
371 this: Weak<Self>,
372 backend: B,
373 task_id_factory: IdFactoryWithReuse<TaskId>,
374 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
375 execution_id_factory: IdFactory<ExecutionId>,
376 stopped: AtomicBool,
377 currently_scheduled_tasks: AtomicUsize,
378 currently_scheduled_foreground_jobs: AtomicUsize,
379 currently_scheduled_background_jobs: AtomicUsize,
380 scheduled_tasks: AtomicUsize,
381 start: Mutex<Option<Instant>>,
382 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
383 event: Event,
384 event_start: Event,
385 event_foreground: Event,
386 event_background: Event,
387 program_start: Instant,
388 compilation_events: CompilationEventQueue,
389}
390
391struct CurrentTaskState {
400 task_id: TaskId,
401 execution_id: ExecutionId,
402
403 tasks_to_notify: Vec<TaskId>,
407
408 stateful: bool,
410
411 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
416
417 local_tasks: Vec<LocalTask>,
419
420 local_task_tracker: TaskTracker,
423
424 backend_state: Box<dyn Any + Send + Sync>,
425}
426
427impl CurrentTaskState {
428 fn new(
429 task_id: TaskId,
430 execution_id: ExecutionId,
431 backend_state: Box<dyn Any + Send + Sync>,
432 ) -> Self {
433 Self {
434 task_id,
435 execution_id,
436 tasks_to_notify: Vec::new(),
437 stateful: false,
438 cell_counters: Some(AutoMap::default()),
439 local_tasks: Vec::new(),
440 local_task_tracker: TaskTracker::new(),
441 backend_state,
442 }
443 }
444
445 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
446 if self.execution_id != expected_execution_id {
447 panic!(
448 "Local tasks can only be scheduled/awaited within the same execution of the \
449 parent task that created them"
450 );
451 }
452 }
453
454 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
455 self.local_tasks.push(local_task);
456 if cfg!(debug_assertions) {
458 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
459 } else {
460 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
461 }
462 }
463
464 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
465 &self.local_tasks[(*local_task_id as usize) - 1]
467 }
468
469 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
470 &mut self.local_tasks[(*local_task_id as usize) - 1]
471 }
472}
473
474task_local! {
476 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
478
479 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
480}
481
482impl<B: Backend + 'static> TurboTasks<B> {
483 pub fn new(backend: B) -> Arc<Self> {
489 let task_id_factory = IdFactoryWithReuse::new(
490 TaskId::MIN,
491 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
492 );
493 let transient_task_id_factory =
494 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
495 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
496 let this = Arc::new_cyclic(|this| Self {
497 this: this.clone(),
498 backend,
499 task_id_factory,
500 transient_task_id_factory,
501 execution_id_factory,
502 stopped: AtomicBool::new(false),
503 currently_scheduled_tasks: AtomicUsize::new(0),
504 currently_scheduled_background_jobs: AtomicUsize::new(0),
505 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
506 scheduled_tasks: AtomicUsize::new(0),
507 start: Default::default(),
508 aggregated_update: Default::default(),
509 event: Event::new(|| "TurboTasks::event".to_string()),
510 event_start: Event::new(|| "TurboTasks::event_start".to_string()),
511 event_foreground: Event::new(|| "TurboTasks::event_foreground".to_string()),
512 event_background: Event::new(|| "TurboTasks::event_background".to_string()),
513 program_start: Instant::now(),
514 compilation_events: CompilationEventQueue::default(),
515 });
516 this.backend.startup(&*this);
517 this
518 }
519
520 pub fn pin(&self) -> Arc<Self> {
521 self.this.upgrade().unwrap()
522 }
523
524 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
526 where
527 T: ?Sized,
528 F: Fn() -> Fut + Send + Sync + Clone + 'static,
529 Fut: Future<Output = Result<Vc<T>>> + Send,
530 {
531 let id = self.backend.create_transient_task(
532 TransientTaskType::Root(Box::new(move || {
533 let functor = functor.clone();
534 Box::pin(async move {
535 let raw_vc = functor().await?.node;
536 raw_vc.to_non_local().await
537 })
538 })),
539 self,
540 );
541 self.schedule(id);
542 id
543 }
544
545 pub fn dispose_root_task(&self, task_id: TaskId) {
546 self.backend.dispose_root_task(task_id, self);
547 }
548
549 #[track_caller]
553 pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
554 where
555 T: ?Sized,
556 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
557 {
558 let id = self.backend.create_transient_task(
559 TransientTaskType::Once(Box::pin(async move {
560 let raw_vc = future.await?.node;
561 raw_vc.to_non_local().await
562 })),
563 self,
564 );
565 self.schedule(id);
566 id
567 }
568
569 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
570 &self,
571 future: impl Future<Output = Result<T>> + Send + 'static,
572 ) -> Result<T> {
573 let (tx, rx) = tokio::sync::oneshot::channel();
574 let task_id = self.spawn_once_task(async move {
575 let result = future.await?;
576 tx.send(result)
577 .map_err(|_| anyhow!("unable to send result"))?;
578 Ok(Completion::new())
579 });
580 let raw_result =
583 read_task_output_untracked(self, task_id, ReadConsistency::Eventual).await?;
584 turbo_tasks_future_scope(
585 self.pin(),
586 ReadVcFuture::<Completion>::from(raw_result.into_read().untracked()),
587 )
588 .await?;
589
590 Ok(rx.await?)
591 }
592
593 pub(crate) fn native_call(
594 &self,
595 fn_type: FunctionId,
596 this: Option<RawVc>,
597 arg: Box<dyn MagicAny>,
598 persistence: TaskPersistence,
599 ) -> RawVc {
600 match persistence {
601 TaskPersistence::Local => {
602 let task_type = LocalTaskType::Native { fn_type, this, arg };
603 self.schedule_local_task(task_type, persistence)
604 }
605 TaskPersistence::Transient => {
606 let task_type = CachedTaskType { fn_type, this, arg };
607 RawVc::TaskOutput(self.backend.get_or_create_transient_task(
608 task_type,
609 current_task("turbo_function calls"),
610 self,
611 ))
612 }
613 TaskPersistence::Persistent => {
614 let task_type = CachedTaskType { fn_type, this, arg };
615 RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
616 task_type,
617 current_task("turbo_function calls"),
618 self,
619 ))
620 }
621 }
622 }
623
624 pub fn dynamic_call(
625 &self,
626 fn_type: FunctionId,
627 this: Option<RawVc>,
628 arg: Box<dyn MagicAny>,
629 persistence: TaskPersistence,
630 ) -> RawVc {
631 if this.is_none_or(|this| this.is_resolved())
632 && registry::get_function(fn_type).arg_meta.is_resolved(&*arg)
633 {
634 return self.native_call(fn_type, this, arg, persistence);
635 }
636 let task_type = LocalTaskType::ResolveNative { fn_type, this, arg };
637 self.schedule_local_task(task_type, persistence)
638 }
639
640 pub fn trait_call(
641 &self,
642 trait_type: TraitTypeId,
643 mut trait_fn_name: Cow<'static, str>,
644 this: RawVc,
645 arg: Box<dyn MagicAny>,
646 persistence: TaskPersistence,
647 ) -> RawVc {
648 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
652 match get_trait_method(trait_type, type_id, trait_fn_name) {
653 Ok(native_fn) => {
654 let arg = registry::get_function(native_fn).arg_meta.filter_owned(arg);
655 return self.dynamic_call(native_fn, Some(this), arg, persistence);
656 }
657 Err(name) => {
658 trait_fn_name = name;
659 }
660 }
661 }
662
663 let task_type = LocalTaskType::ResolveTrait {
665 trait_type,
666 method_name: trait_fn_name,
667 this,
668 arg,
669 };
670
671 self.schedule_local_task(task_type, persistence)
672 }
673
674 #[track_caller]
675 pub(crate) fn schedule(&self, task_id: TaskId) {
676 self.begin_primary_job();
677 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
678
679 let this = self.pin();
680 let future = async move {
681 let mut schedule_again = true;
682 while schedule_again {
683 let backend_state = this.backend.new_task_state(task_id);
684 let execution_id = this.execution_id_factory.wrapping_get();
686 let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
687 task_id,
688 execution_id,
689 Box::new(backend_state),
690 )));
691 let single_execution_future = async {
692 if this.stopped.load(Ordering::Acquire) {
693 this.backend.task_execution_canceled(task_id, &*this);
694 return false;
695 }
696
697 let Some(TaskExecutionSpec { future, span }) =
698 this.backend.try_start_task_execution(task_id, &*this)
699 else {
700 return false;
701 };
702
703 async {
704 let (result, duration, memory_usage) = CaptureFuture::new(future).await;
705
706 let ltt = CURRENT_TASK_STATE
708 .with(|ts| ts.read().unwrap().local_task_tracker.clone());
709 ltt.close();
710 ltt.wait().await;
711
712 let result = match result {
713 Ok(Ok(raw_vc)) => Ok(raw_vc),
714 Ok(Err(err)) => Err(Arc::new(err.into())),
715 Err(err) => {
716 Err(Arc::new(TurboTasksExecutionError::Panic(Arc::new(err))))
717 }
718 };
719
720 this.backend.task_execution_result(task_id, result, &*this);
721 let stateful = this.finish_current_task_state();
722 let cell_counters = CURRENT_TASK_STATE
723 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
724 let schedule_again = this.backend.task_execution_completed(
725 task_id,
726 duration,
727 memory_usage,
728 &cell_counters,
729 stateful,
730 &*this,
731 );
732 this.notify_scheduled_tasks();
734 schedule_again
735 }
736 .instrument(span)
737 .await
738 };
739 schedule_again = CURRENT_TASK_STATE
740 .scope(current_task_state, single_execution_future)
741 .await;
742 }
743 this.finish_primary_job();
744 anyhow::Ok(())
745 };
746
747 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
748
749 #[cfg(feature = "tokio_tracing")]
750 {
751 let description = self.backend.get_task_description(task_id);
752 tokio::task::Builder::new()
753 .name(&description)
754 .spawn(future)
755 .unwrap();
756 }
757 #[cfg(not(feature = "tokio_tracing"))]
758 tokio::task::spawn(future);
759 }
760
761 fn schedule_local_task(
762 &self,
763 ty: LocalTaskType,
764 persistence: TaskPersistence,
772 ) -> RawVc {
773 let ty = Arc::new(ty);
774 let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
775 .with(|gts| {
776 let mut gts_write = gts.write().unwrap();
777 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
778 done_event: Event::new({
779 let ty = Arc::clone(&ty);
780 move || format!("LocalTask({ty})::done_event")
781 }),
782 });
783 (
784 Arc::clone(gts),
785 gts_write.task_id,
786 gts_write.execution_id,
787 local_task_id,
788 )
789 });
790
791 #[cfg(feature = "tokio_tracing")]
792 let description = format!(
793 "[local] (parent: {}) {}",
794 self.backend.get_task_description(parent_task_id),
795 ty,
796 );
797 #[cfg(not(feature = "tokio_tracing"))]
798 let _ = parent_task_id; let this = self.pin();
801 let future = async move {
802 let TaskExecutionSpec { future, span } =
803 crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
804 let ty = ty.clone();
805 async move {
806 let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
807
808 let result = match result {
809 Ok(Ok(raw_vc)) => Ok(raw_vc),
810 Ok(Err(err)) => Err(Arc::new(err.into())),
811 Err(err) => Err(Arc::new(TurboTasksExecutionError::Panic(Arc::new(err)))),
812 };
813
814 let local_task = LocalTask::Done {
815 output: match result {
816 Ok(raw_vc) => OutputContent::Link(raw_vc),
817 Err(err) => match &*err {
818 TurboTasksExecutionError::Error { .. } => {
819 OutputContent::Error(SharedError::new(
820 anyhow::Error::new(err)
821 .context(format!("Execution of {ty} failed")),
822 ))
823 }
824 TurboTasksExecutionError::Panic(err) => {
825 OutputContent::Panic(err.clone())
826 }
827 },
828 },
829 };
830
831 let done_event = CURRENT_TASK_STATE.with(move |gts| {
832 let mut gts_write = gts.write().unwrap();
833 let scheduled_task =
834 std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
835 let LocalTask::Scheduled { done_event } = scheduled_task else {
836 panic!("local task finished, but was not in the scheduled state?");
837 };
838 done_event
839 });
840 done_event.notify(usize::MAX)
841 }
842 .instrument(span)
843 .await
844 };
845 let future = global_task_state
846 .read()
847 .unwrap()
848 .local_task_tracker
849 .track_future(future);
850 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
851 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
852
853 #[cfg(feature = "tokio_tracing")]
854 tokio::task::Builder::new()
855 .name(&description)
856 .spawn(future)
857 .unwrap();
858 #[cfg(not(feature = "tokio_tracing"))]
859 tokio::task::spawn(future);
860
861 RawVc::LocalOutput(execution_id, local_task_id, persistence)
862 }
863
864 fn begin_primary_job(&self) {
865 if self
866 .currently_scheduled_tasks
867 .fetch_add(1, Ordering::AcqRel)
868 == 0
869 {
870 *self.start.lock().unwrap() = Some(Instant::now());
871 self.event_start.notify(usize::MAX);
872 self.backend.idle_end(self);
873 }
874 }
875
876 fn begin_foreground_job(&self) {
877 self.begin_primary_job();
878 self.currently_scheduled_foreground_jobs
879 .fetch_add(1, Ordering::AcqRel);
880 }
881
882 fn finish_primary_job(&self) {
883 if self
884 .currently_scheduled_tasks
885 .fetch_sub(1, Ordering::AcqRel)
886 == 1
887 {
888 self.backend.idle_start(self);
889 let total = self.scheduled_tasks.load(Ordering::Acquire);
892 self.scheduled_tasks.store(0, Ordering::Release);
893 if let Some(start) = *self.start.lock().unwrap() {
894 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
895 if let Some(update) = update.as_mut() {
896 update.0 += start.elapsed();
897 update.1 += total;
898 } else {
899 *update = Some((start.elapsed(), total));
900 }
901 }
902 self.event.notify(usize::MAX);
903 }
904 }
905
906 fn finish_foreground_job(&self) {
907 if self
908 .currently_scheduled_foreground_jobs
909 .fetch_sub(1, Ordering::AcqRel)
910 == 1
911 {
912 self.event_foreground.notify(usize::MAX);
913 }
914 self.finish_primary_job();
915 }
916
917 pub async fn wait_foreground_done(&self) {
918 if self
919 .currently_scheduled_foreground_jobs
920 .load(Ordering::Acquire)
921 == 0
922 {
923 return;
924 }
925 let listener = self.event_foreground.listen();
926 if self
927 .currently_scheduled_foreground_jobs
928 .load(Ordering::Acquire)
929 == 0
930 {
931 return;
932 }
933 listener
934 .instrument(trace_span!("wait_foreground_done"))
935 .await;
936 }
937
938 pub fn get_in_progress_count(&self) -> usize {
939 self.currently_scheduled_tasks.load(Ordering::Acquire)
940 }
941
942 pub async fn wait_task_completion(
954 &self,
955 id: TaskId,
956 consistency: ReadConsistency,
957 ) -> Result<()> {
958 read_task_output_untracked(self, id, consistency).await?;
960 Ok(())
961 }
962
963 #[deprecated(note = "Use get_or_wait_aggregated_update_info instead")]
964 pub async fn get_or_wait_update_info(&self, aggregation: Duration) -> (Duration, usize) {
965 let UpdateInfo {
966 duration, tasks, ..
967 } = self.get_or_wait_aggregated_update_info(aggregation).await;
968 (duration, tasks)
969 }
970
971 #[deprecated(note = "Use aggregated_update_info instead")]
972 pub async fn update_info(
973 &self,
974 aggregation: Duration,
975 timeout: Duration,
976 ) -> Option<(Duration, usize)> {
977 self.aggregated_update_info(aggregation, timeout).await.map(
978 |UpdateInfo {
979 duration, tasks, ..
980 }| (duration, tasks),
981 )
982 }
983
984 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
987 self.aggregated_update_info(aggregation, Duration::MAX)
988 .await
989 .unwrap()
990 }
991
992 pub async fn aggregated_update_info(
996 &self,
997 aggregation: Duration,
998 timeout: Duration,
999 ) -> Option<UpdateInfo> {
1000 let listener = self
1001 .event
1002 .listen_with_note(|| "wait for update info".to_string());
1003 let wait_for_finish = {
1004 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1005 if aggregation.is_zero() {
1006 if let Some((duration, tasks)) = update.take() {
1007 return Some(UpdateInfo {
1008 duration,
1009 tasks,
1010 reasons: take(reason_set),
1011 placeholder_for_future_fields: (),
1012 });
1013 } else {
1014 true
1015 }
1016 } else {
1017 update.is_none()
1018 }
1019 };
1020 if wait_for_finish {
1021 if timeout == Duration::MAX {
1022 listener.await;
1024 } else {
1025 let start_listener = self
1027 .event_start
1028 .listen_with_note(|| "wait for update info".to_string());
1029 if self.currently_scheduled_tasks.load(Ordering::Acquire) == 0 {
1030 start_listener.await;
1031 } else {
1032 drop(start_listener);
1033 }
1034 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1035 return None;
1037 }
1038 }
1039 }
1040 if !aggregation.is_zero() {
1041 loop {
1042 select! {
1043 () = tokio::time::sleep(aggregation) => {
1044 break;
1045 }
1046 () = self.event.listen_with_note(|| "wait for update info".to_string()) => {
1047 }
1049 }
1050 }
1051 }
1052 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1053 if let Some((duration, tasks)) = update.take() {
1054 Some(UpdateInfo {
1055 duration,
1056 tasks,
1057 reasons: take(reason_set),
1058 placeholder_for_future_fields: (),
1059 })
1060 } else {
1061 panic!("aggregated_update_info must not called concurrently")
1062 }
1063 }
1064
1065 pub async fn wait_background_done(&self) {
1066 let listener = self.event_background.listen();
1067 if self
1068 .currently_scheduled_background_jobs
1069 .load(Ordering::Acquire)
1070 != 0
1071 {
1072 listener.await;
1073 }
1074 }
1075
1076 pub async fn stop_and_wait(&self) {
1077 self.backend.stopping(self);
1078 self.stopped.store(true, Ordering::Release);
1079 {
1080 let listener = self.event.listen_with_note(|| "wait for stop".to_string());
1081 if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1082 listener.await;
1083 }
1084 }
1085 {
1086 let listener = self.event_background.listen();
1087 if self
1088 .currently_scheduled_background_jobs
1089 .load(Ordering::Acquire)
1090 != 0
1091 {
1092 listener.await;
1093 }
1094 }
1095 self.backend.stop(self);
1096 }
1097
1098 #[track_caller]
1099 pub(crate) fn schedule_background_job<
1100 T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1101 F: Future<Output = ()> + Send + 'static,
1102 >(
1103 &self,
1104 func: T,
1105 ) {
1106 let this = self.pin();
1107 self.currently_scheduled_background_jobs
1108 .fetch_add(1, Ordering::AcqRel);
1109 tokio::spawn(
1110 TURBO_TASKS
1111 .scope(this.clone(), async move {
1112 while this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1113 let listener = this.event.listen_with_note(|| {
1114 "background job waiting for execution".to_string()
1115 });
1116 if this.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1117 listener.await;
1118 }
1119 }
1120 let this2 = this.clone();
1121 if !this.stopped.load(Ordering::Acquire) {
1122 func(this).await;
1123 }
1124 if this2
1125 .currently_scheduled_background_jobs
1126 .fetch_sub(1, Ordering::AcqRel)
1127 == 1
1128 {
1129 this2.event_background.notify(usize::MAX);
1130 }
1131 })
1132 .in_current_span(),
1133 );
1134 }
1135
1136 #[track_caller]
1137 pub(crate) fn schedule_foreground_job<
1138 T: FnOnce(Arc<TurboTasks<B>>) -> F + Send + 'static,
1139 F: Future<Output = ()> + Send + 'static,
1140 >(
1141 &self,
1142 func: T,
1143 ) {
1144 let this = self.pin();
1145 this.begin_foreground_job();
1146 tokio::spawn(
1147 TURBO_TASKS
1148 .scope(this.clone(), async move {
1149 if !this.stopped.load(Ordering::Acquire) {
1150 func(this.clone()).await;
1151 }
1152 this.finish_foreground_job();
1153 })
1154 .in_current_span(),
1155 );
1156 }
1157
1158 fn finish_current_task_state(&self) -> bool {
1159 let (stateful, tasks) = CURRENT_TASK_STATE.with(|cell| {
1160 let CurrentTaskState {
1161 tasks_to_notify,
1162 stateful,
1163 ..
1164 } = &mut *cell.write().unwrap();
1165 (*stateful, take(tasks_to_notify))
1166 });
1167
1168 if !tasks.is_empty() {
1169 self.backend.invalidate_tasks(&tasks, self);
1170 }
1171 stateful
1172 }
1173
1174 pub fn backend(&self) -> &B {
1175 &self.backend
1176 }
1177}
1178
1179impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1180 fn dynamic_call(
1181 &self,
1182 func: FunctionId,
1183 this: Option<RawVc>,
1184 arg: Box<dyn MagicAny>,
1185 persistence: TaskPersistence,
1186 ) -> RawVc {
1187 self.dynamic_call(func, this, arg, persistence)
1188 }
1189 fn native_call(
1190 &self,
1191 func: FunctionId,
1192 this: Option<RawVc>,
1193 arg: Box<dyn MagicAny>,
1194 persistence: TaskPersistence,
1195 ) -> RawVc {
1196 self.native_call(func, this, arg, persistence)
1197 }
1198 fn trait_call(
1199 &self,
1200 trait_type: TraitTypeId,
1201 trait_fn_name: Cow<'static, str>,
1202 this: RawVc,
1203 arg: Box<dyn MagicAny>,
1204 persistence: TaskPersistence,
1205 ) -> RawVc {
1206 self.trait_call(trait_type, trait_fn_name, this, arg, persistence)
1207 }
1208
1209 #[track_caller]
1210 fn run_once(
1211 &self,
1212 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1213 ) -> TaskId {
1214 self.spawn_once_task(async move {
1215 future.await?;
1216 Ok(Completion::new())
1217 })
1218 }
1219
1220 #[track_caller]
1221 fn run_once_with_reason(
1222 &self,
1223 reason: StaticOrArc<dyn InvalidationReason>,
1224 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1225 ) -> TaskId {
1226 {
1227 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1228 reason_set.insert(reason);
1229 }
1230 self.spawn_once_task(async move {
1231 future.await?;
1232 Ok(Completion::new())
1233 })
1234 }
1235
1236 #[track_caller]
1237 fn run_once_process(
1238 &self,
1239 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1240 ) -> TaskId {
1241 let this = self.pin();
1242 self.spawn_once_task(async move {
1243 this.finish_primary_job();
1244 future.await?;
1245 this.begin_primary_job();
1246 Ok(Completion::new())
1247 })
1248 }
1249}
1250
1251impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1252 fn pin(&self) -> Arc<dyn TurboTasksApi> {
1253 self.pin()
1254 }
1255
1256 #[instrument(level = Level::INFO, skip_all, name = "invalidate")]
1257 fn invalidate(&self, task: TaskId) {
1258 self.backend.invalidate_task(task, self);
1259 }
1260
1261 #[instrument(level = Level::INFO, skip_all, name = "invalidate", fields(name = display(&reason)))]
1262 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1263 {
1264 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1265 reason_set.insert(reason);
1266 }
1267 self.backend.invalidate_task(task, self);
1268 }
1269
1270 fn invalidate_serialization(&self, task: TaskId) {
1271 self.backend.invalidate_serialization(task, self);
1272 }
1273
1274 fn notify_scheduled_tasks(&self) {
1275 let _ = CURRENT_TASK_STATE.try_with(|cell| {
1276 let tasks = {
1277 let CurrentTaskState {
1278 tasks_to_notify, ..
1279 } = &mut *cell.write().unwrap();
1280 take(tasks_to_notify)
1281 };
1282 if tasks.is_empty() {
1283 return;
1284 }
1285 self.backend.invalidate_tasks(&tasks, self);
1286 });
1287 }
1288
1289 fn try_read_task_output(
1290 &self,
1291 task: TaskId,
1292 consistency: ReadConsistency,
1293 ) -> Result<Result<RawVc, EventListener>> {
1294 self.backend
1295 .try_read_task_output(task, current_task("reading Vcs"), consistency, self)
1296 }
1297
1298 fn try_read_task_output_untracked(
1299 &self,
1300 task: TaskId,
1301 consistency: ReadConsistency,
1302 ) -> Result<Result<RawVc, EventListener>> {
1303 self.backend
1304 .try_read_task_output_untracked(task, consistency, self)
1305 }
1306
1307 fn try_read_task_cell(
1308 &self,
1309 task: TaskId,
1310 index: CellId,
1311 options: ReadCellOptions,
1312 ) -> Result<Result<TypedCellContent, EventListener>> {
1313 self.backend
1314 .try_read_task_cell(task, index, current_task("reading Vcs"), options, self)
1315 }
1316
1317 fn try_read_task_cell_untracked(
1318 &self,
1319 task: TaskId,
1320 index: CellId,
1321 options: ReadCellOptions,
1322 ) -> Result<Result<TypedCellContent, EventListener>> {
1323 self.backend
1324 .try_read_task_cell_untracked(task, index, options, self)
1325 }
1326
1327 fn try_read_own_task_cell_untracked(
1328 &self,
1329 current_task: TaskId,
1330 index: CellId,
1331 options: ReadCellOptions,
1332 ) -> Result<TypedCellContent> {
1333 self.backend
1334 .try_read_own_task_cell_untracked(current_task, index, options, self)
1335 }
1336
1337 fn try_read_local_output(
1338 &self,
1339 execution_id: ExecutionId,
1340 local_task_id: LocalTaskId,
1341 ) -> Result<Result<RawVc, EventListener>> {
1342 CURRENT_TASK_STATE.with(|gts| {
1343 let gts_read = gts.read().unwrap();
1344
1345 gts_read.assert_execution_id(execution_id);
1350
1351 match gts_read.get_local_task(local_task_id) {
1352 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1353 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1354 }
1355 })
1356 }
1357
1358 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1359 self.backend.read_task_collectibles(
1360 task,
1361 trait_id,
1362 current_task("reading collectibles"),
1363 self,
1364 )
1365 }
1366
1367 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1368 self.backend.emit_collectible(
1369 trait_type,
1370 collectible,
1371 current_task("emitting collectible"),
1372 self,
1373 );
1374 }
1375
1376 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1377 self.backend.unemit_collectible(
1378 trait_type,
1379 collectible,
1380 count,
1381 current_task("emitting collectible"),
1382 self,
1383 );
1384 }
1385
1386 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1387 for (&collectible, &count) in collectibles {
1388 if count > 0 {
1389 self.backend.unemit_collectible(
1390 trait_type,
1391 collectible,
1392 count as u32,
1393 current_task("emitting collectible"),
1394 self,
1395 );
1396 }
1397 }
1398 }
1399
1400 fn read_own_task_cell(
1401 &self,
1402 task: TaskId,
1403 index: CellId,
1404 options: ReadCellOptions,
1405 ) -> Result<TypedCellContent> {
1406 self.try_read_own_task_cell_untracked(task, index, options)
1408 }
1409
1410 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1411 self.backend.update_task_cell(task, index, content, self);
1412 }
1413
1414 fn connect_task(&self, task: TaskId) {
1415 self.backend
1416 .connect_task(task, current_task("connecting task"), self);
1417 }
1418
1419 fn mark_own_task_as_finished(&self, task: TaskId) {
1420 self.backend.mark_own_task_as_finished(task, self);
1421 }
1422
1423 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1424 self.backend
1425 .set_own_task_aggregation_number(task, aggregation_number, self);
1426 }
1427
1428 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1429 self.backend.mark_own_task_as_session_dependent(task, self);
1430 }
1431
1432 fn detached_for_testing(
1435 &self,
1436 fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1437 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1438 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1441 let tracked_fut = {
1442 let ts = global_task_state.read().unwrap();
1443 ts.local_task_tracker.track_future(fut)
1444 };
1445 Box::pin(TURBO_TASKS.scope(
1446 turbo_tasks(),
1447 CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1448 ))
1449 }
1450
1451 fn task_statistics(&self) -> &TaskStatisticsApi {
1452 self.backend.task_statistics()
1453 }
1454
1455 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1456 let this = self.pin();
1457 Box::pin(async move {
1458 this.stop_and_wait().await;
1459 })
1460 }
1461
1462 fn subscribe_to_compilation_events(
1463 &self,
1464 event_types: Option<Vec<String>>,
1465 ) -> Receiver<Arc<dyn CompilationEvent>> {
1466 self.compilation_events.subscribe(event_types)
1467 }
1468
1469 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1470 if let Err(e) = self.compilation_events.send(event) {
1471 tracing::warn!("Failed to send compilation event: {e}");
1472 }
1473 }
1474}
1475
1476impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1477 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1478 self.pin()
1479 }
1480 fn backend(&self) -> &B {
1481 &self.backend
1482 }
1483
1484 #[track_caller]
1485 fn schedule_backend_background_job(&self, id: BackendJobId) {
1486 self.schedule_background_job(move |this| async move {
1487 this.backend.run_backend_job(id, &*this).await;
1488 })
1489 }
1490
1491 #[track_caller]
1492 fn schedule_backend_foreground_job(&self, id: BackendJobId) {
1493 self.schedule_foreground_job(move |this| async move {
1494 this.backend.run_backend_job(id, &*this).await;
1495 })
1496 }
1497
1498 fn try_foreground_done(&self) -> Result<(), EventListener> {
1499 if self
1500 .currently_scheduled_foreground_jobs
1501 .load(Ordering::Acquire)
1502 == 0
1503 {
1504 return Ok(());
1505 }
1506 let listener = self.event_foreground.listen();
1507 if self
1508 .currently_scheduled_foreground_jobs
1509 .load(Ordering::Acquire)
1510 == 0
1511 {
1512 return Ok(());
1513 }
1514 Err(listener)
1515 }
1516
1517 fn wait_foreground_done_excluding_own<'a>(
1518 &'a self,
1519 ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + 'a>>> {
1520 if self
1521 .currently_scheduled_foreground_jobs
1522 .load(Ordering::Acquire)
1523 == 0
1524 {
1525 return None;
1526 }
1527 Some(Box::pin(async {
1528 self.finish_foreground_job();
1529 self.wait_foreground_done().await;
1530 self.begin_foreground_job();
1531 }))
1532 }
1533
1534 fn schedule_notify_tasks(&self, tasks: &[TaskId]) {
1537 let result = CURRENT_TASK_STATE.try_with(|cell| {
1538 let CurrentTaskState {
1539 tasks_to_notify, ..
1540 } = &mut *cell.write().unwrap();
1541 tasks_to_notify.extend(tasks.iter());
1542 });
1543 if result.is_err() {
1544 let _guard = trace_span!("schedule_notify_tasks", count = tasks.len()).entered();
1545 self.backend.invalidate_tasks(tasks, self);
1546 }
1547 }
1548
1549 fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) {
1552 let result = CURRENT_TASK_STATE.try_with(|cell| {
1553 let CurrentTaskState {
1554 tasks_to_notify, ..
1555 } = &mut *cell.write().unwrap();
1556 tasks_to_notify.extend(tasks.iter());
1557 });
1558 if result.is_err() {
1559 let _guard = trace_span!("schedule_notify_tasks_set", count = tasks.len()).entered();
1560 self.backend.invalidate_tasks_set(tasks, self);
1561 };
1562 }
1563
1564 #[track_caller]
1565 fn schedule(&self, task: TaskId) {
1566 self.schedule(task)
1567 }
1568
1569 fn program_duration_until(&self, instant: Instant) -> Duration {
1570 instant - self.program_start
1571 }
1572
1573 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1574 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1576 }
1577
1578 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1579 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1581 }
1582
1583 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1584 unsafe { self.task_id_factory.reuse(id.into()) }
1585 }
1586
1587 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1588 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1589 }
1590
1591 fn read_task_state_dyn(&self, func: &mut dyn FnMut(&B::TaskState)) {
1592 CURRENT_TASK_STATE
1593 .with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap()))
1594 }
1595
1596 fn write_task_state_dyn(&self, func: &mut dyn FnMut(&mut B::TaskState)) {
1597 CURRENT_TASK_STATE
1598 .with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap()))
1599 }
1600
1601 fn is_idle(&self) -> bool {
1602 self.currently_scheduled_tasks.load(Ordering::Acquire) == 0
1603 }
1604}
1605
1606pub(crate) fn current_task(from: &str) -> TaskId {
1607 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1608 Ok(id) => id,
1609 Err(_) => panic!("{from} can only be used in the context of turbo_tasks task execution"),
1610 }
1611}
1612
1613pub async fn run_once<T: Send + 'static>(
1614 tt: Arc<dyn TurboTasksApi>,
1615 future: impl Future<Output = Result<T>> + Send + 'static,
1616) -> Result<T> {
1617 let (tx, rx) = tokio::sync::oneshot::channel();
1618
1619 let task_id = tt.run_once(Box::pin(async move {
1620 let result = future.await?;
1621 tx.send(result)
1622 .map_err(|_| anyhow!("unable to send result"))?;
1623 Ok(())
1624 }));
1625
1626 let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1629 let raw_future = raw_result.into_read().untracked();
1630 turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1631
1632 Ok(rx.await?)
1633}
1634
1635pub async fn run_once_with_reason<T: Send + 'static>(
1636 tt: Arc<dyn TurboTasksApi>,
1637 reason: impl InvalidationReason,
1638 future: impl Future<Output = Result<T>> + Send + 'static,
1639) -> Result<T> {
1640 let (tx, rx) = tokio::sync::oneshot::channel();
1641
1642 let task_id = tt.run_once_with_reason(
1643 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1644 Box::pin(async move {
1645 let result = future.await?;
1646 tx.send(result)
1647 .map_err(|_| anyhow!("unable to send result"))?;
1648 Ok(())
1649 }),
1650 );
1651
1652 let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1655 let raw_future = raw_result.into_read().untracked();
1656 turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1657
1658 Ok(rx.await?)
1659}
1660
1661pub fn dynamic_call(
1663 func: FunctionId,
1664 this: Option<RawVc>,
1665 arg: Box<dyn MagicAny>,
1666 persistence: TaskPersistence,
1667) -> RawVc {
1668 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1669}
1670
1671pub fn trait_call(
1673 trait_type: TraitTypeId,
1674 trait_fn_name: Cow<'static, str>,
1675 this: RawVc,
1676 arg: Box<dyn MagicAny>,
1677 persistence: TaskPersistence,
1678) -> RawVc {
1679 with_turbo_tasks(|tt| tt.trait_call(trait_type, trait_fn_name, this, arg, persistence))
1680}
1681
1682pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1683 TURBO_TASKS.with(|arc| arc.clone())
1684}
1685
1686pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1687 TURBO_TASKS.with(|arc| func(arc))
1688}
1689
1690pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1691 TURBO_TASKS.sync_scope(tt, f)
1692}
1693
1694pub fn turbo_tasks_future_scope<T>(
1695 tt: Arc<dyn TurboTasksApi>,
1696 f: impl Future<Output = T>,
1697) -> impl Future<Output = T> {
1698 TURBO_TASKS.scope(tt, f)
1699}
1700
1701pub fn with_turbo_tasks_for_testing<T>(
1702 tt: Arc<dyn TurboTasksApi>,
1703 current_task: TaskId,
1704 execution_id: ExecutionId,
1705 f: impl Future<Output = T>,
1706) -> impl Future<Output = T> {
1707 TURBO_TASKS.scope(
1708 tt,
1709 CURRENT_TASK_STATE.scope(
1710 Arc::new(RwLock::new(CurrentTaskState::new(
1711 current_task,
1712 execution_id,
1713 Box::new(()),
1714 ))),
1715 f,
1716 ),
1717 )
1718}
1719
1720pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1725 tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1726}
1727
1728pub fn current_task_for_testing() -> TaskId {
1729 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1730}
1731
1732pub fn mark_session_dependent() {
1734 with_turbo_tasks(|tt| {
1735 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1736 });
1737}
1738
1739pub fn mark_root() {
1742 with_turbo_tasks(|tt| {
1743 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1744 });
1745}
1746
1747pub fn mark_finished() {
1750 with_turbo_tasks(|tt| {
1751 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1752 });
1753}
1754
1755pub fn mark_stateful() -> SerializationInvalidator {
1761 CURRENT_TASK_STATE.with(|cell| {
1762 let CurrentTaskState {
1763 stateful, task_id, ..
1764 } = &mut *cell.write().unwrap();
1765 *stateful = true;
1766 SerializationInvalidator::new(*task_id)
1767 })
1768}
1769
1770pub fn prevent_gc() {
1771 mark_stateful();
1773}
1774
1775pub fn notify_scheduled_tasks() {
1777 with_turbo_tasks(|tt| tt.notify_scheduled_tasks())
1778}
1779
1780pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1781 with_turbo_tasks(|tt| {
1782 let raw_vc = collectible.node.node;
1783 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1784 })
1785}
1786
1787pub async fn spawn_blocking<T: Send + 'static>(func: impl FnOnce() -> T + Send + 'static) -> T {
1788 let turbo_tasks = turbo_tasks();
1789 let span = Span::current();
1790 let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| {
1791 let _guard = span.entered();
1792 let start = Instant::now();
1793 let start_allocations = TurboMalloc::allocation_counters();
1794 let r = turbo_tasks_scope(turbo_tasks, func);
1795 (r, start.elapsed(), start_allocations.until_now())
1796 })
1797 .await
1798 .unwrap();
1799 capture_future::add_duration(duration);
1800 capture_future::add_allocation_info(alloc_info);
1801 result
1802}
1803
1804pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
1805 let handle = Handle::current();
1806 let span = info_span!("thread").or_current();
1807 thread::spawn(move || {
1808 let span = span.entered();
1809 let guard = handle.enter();
1810 func();
1811 drop(guard);
1812 drop(span);
1813 });
1814}
1815
1816pub(crate) async fn read_task_output(
1817 this: &dyn TurboTasksApi,
1818 id: TaskId,
1819 consistency: ReadConsistency,
1820) -> Result<RawVc> {
1821 loop {
1822 match this.try_read_task_output(id, consistency)? {
1823 Ok(result) => return Ok(result),
1824 Err(listener) => listener.await,
1825 }
1826 }
1827}
1828
1829pub(crate) async fn read_task_output_untracked(
1832 this: &dyn TurboTasksApi,
1833 id: TaskId,
1834 consistency: ReadConsistency,
1835) -> Result<RawVc> {
1836 loop {
1837 match this.try_read_task_output_untracked(id, consistency)? {
1838 Ok(result) => return Ok(result),
1839 Err(listener) => listener.await,
1840 }
1841 }
1842}
1843
1844pub(crate) async fn read_task_cell(
1845 this: &dyn TurboTasksApi,
1846 id: TaskId,
1847 index: CellId,
1848 options: ReadCellOptions,
1849) -> Result<TypedCellContent> {
1850 loop {
1851 match this.try_read_task_cell(id, index, options)? {
1852 Ok(result) => return Ok(result),
1853 Err(listener) => listener.await,
1854 }
1855 }
1856}
1857
1858#[derive(Clone, Copy, Serialize, Deserialize)]
1864pub struct CurrentCellRef {
1865 current_task: TaskId,
1866 index: CellId,
1867}
1868
1869type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1870
1871impl CurrentCellRef {
1872 pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1874 where
1875 T: VcValueType,
1876 {
1877 self.conditional_update_with_shared_reference(|old_shared_reference| {
1878 let old_ref = old_shared_reference
1879 .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1880 .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1881 let new_value = functor(old_ref)?;
1882 Some(SharedReference::new(triomphe::Arc::new(
1883 <T::Read as VcRead<T>>::value_to_repr(new_value),
1884 )))
1885 })
1886 }
1887
1888 pub fn conditional_update_with_shared_reference(
1890 &self,
1891 functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1892 ) {
1893 let tt = turbo_tasks();
1894 let cell_content = tt
1895 .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
1896 .ok();
1897 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1898 if let Some(update) = update {
1899 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1900 }
1901 }
1902
1903 pub fn compare_and_update<T>(&self, new_value: T)
1937 where
1938 T: PartialEq + VcValueType,
1939 {
1940 self.conditional_update(|old_value| {
1941 if let Some(old_value) = old_value {
1942 if old_value == &new_value {
1943 return None;
1944 }
1945 }
1946 Some(new_value)
1947 });
1948 }
1949
1950 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1959 where
1960 T: VcValueType + PartialEq,
1961 {
1962 fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1963 <T::Read as VcRead<T>>::repr_to_value_ref(
1964 sr.0.downcast_ref::<VcReadRepr<T>>()
1965 .expect("cannot update SharedReference of different type"),
1966 )
1967 }
1968 self.conditional_update_with_shared_reference(|old_sr| {
1969 if let Some(old_sr) = old_sr {
1970 let old_value: &T = extract_sr_value(old_sr);
1971 let new_value = extract_sr_value(&new_shared_reference);
1972 if old_value == new_value {
1973 return None;
1974 }
1975 }
1976 Some(new_shared_reference)
1977 });
1978 }
1979
1980 pub fn update<T>(&self, new_value: T)
1982 where
1983 T: VcValueType,
1984 {
1985 let tt = turbo_tasks();
1986 tt.update_own_task_cell(
1987 self.current_task,
1988 self.index,
1989 CellContent(Some(SharedReference::new(triomphe::Arc::new(
1990 <T::Read as VcRead<T>>::value_to_repr(new_value),
1991 )))),
1992 )
1993 }
1994
1995 pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
2004 let tt = turbo_tasks();
2005 let content = tt
2006 .read_own_task_cell(self.current_task, self.index, ReadCellOptions::default())
2007 .ok();
2008 let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
2009 shared_ref_exp != shared_ref
2011 } else {
2012 true
2013 };
2014 if update {
2015 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
2016 }
2017 }
2018}
2019
2020impl From<CurrentCellRef> for RawVc {
2021 fn from(cell: CurrentCellRef) -> Self {
2022 RawVc::TaskCell(cell.current_task, cell.index)
2023 }
2024}
2025
2026pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
2027 CURRENT_TASK_STATE.with(|ts| {
2028 let current_task = current_task("celling turbo_tasks values");
2029 let mut ts = ts.write().unwrap();
2030 let map = ts.cell_counters.as_mut().unwrap();
2031 let current_index = map.entry(ty).or_default();
2032 let index = *current_index;
2033 *current_index += 1;
2034 CurrentCellRef {
2035 current_task,
2036 index: CellId { type_id: ty, index },
2037 }
2038 })
2039}
2040
2041pub(crate) async fn read_local_output(
2042 this: &dyn TurboTasksApi,
2043 execution_id: ExecutionId,
2044 local_task_id: LocalTaskId,
2045) -> Result<RawVc> {
2046 loop {
2047 match this.try_read_local_output(execution_id, local_task_id)? {
2048 Ok(raw_vc) => return Ok(raw_vc),
2049 Err(event_listener) => event_listener.await,
2050 }
2051 }
2052}