1use std::{
2 fmt::Display,
3 future::Future,
4 hash::BuildHasherDefault,
5 mem::take,
6 pin::Pin,
7 sync::{
8 Arc, Mutex, RwLock, Weak,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use anyhow::{Result, anyhow};
15use auto_hash_map::AutoMap;
16use rustc_hash::FxHasher;
17use serde::{Deserialize, Serialize};
18use tokio::{select, sync::mpsc::Receiver, task_local};
19use tokio_util::task::TaskTracker;
20use tracing::{Instrument, instrument};
21
22use crate::{
23 Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
24 ReadOutputOptions, ResolvedVc, SharedReference, TaskId, TraitMethod, ValueTypeId, Vc, VcRead,
25 VcValueTrait, VcValueType,
26 backend::{
27 Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
28 TransientTaskType, TurboTasksExecutionError, TypedCellContent,
29 },
30 capture_future::CaptureFuture,
31 event::{Event, EventListener},
32 id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
33 id_factory::IdFactoryWithReuse,
34 macro_helpers::NativeFunction,
35 magic_any::MagicAny,
36 message_queue::{CompilationEvent, CompilationEventQueue},
37 raw_vc::{CellId, RawVc},
38 registry,
39 serialization_invalidation::SerializationInvalidator,
40 task::local_task::{LocalTask, LocalTaskSpec, LocalTaskType},
41 task_statistics::TaskStatisticsApi,
42 trace::TraceRawVcs,
43 util::{IdFactory, StaticOrArc},
44};
45
46pub trait TurboTasksCallApi: Sync + Send {
49 fn dynamic_call(
52 &self,
53 native_fn: &'static NativeFunction,
54 this: Option<RawVc>,
55 arg: Box<dyn MagicAny>,
56 persistence: TaskPersistence,
57 ) -> RawVc;
58 fn native_call(
61 &self,
62 native_fn: &'static NativeFunction,
63 this: Option<RawVc>,
64 arg: Box<dyn MagicAny>,
65 persistence: TaskPersistence,
66 ) -> RawVc;
67 fn trait_call(
70 &self,
71 trait_method: &'static TraitMethod,
72 this: RawVc,
73 arg: Box<dyn MagicAny>,
74 persistence: TaskPersistence,
75 ) -> RawVc;
76
77 fn run(
78 &self,
79 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>>;
81 fn run_once(
82 &self,
83 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
84 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
85 fn run_once_with_reason(
86 &self,
87 reason: StaticOrArc<dyn InvalidationReason>,
88 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
90 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
91}
92
93pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
99 fn invalidate(&self, task: TaskId);
100 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);
101
102 fn invalidate_serialization(&self, task: TaskId);
103
104 fn try_read_task_output(
105 &self,
106 task: TaskId,
107 options: ReadOutputOptions,
108 ) -> Result<Result<RawVc, EventListener>>;
109
110 fn try_read_task_cell(
111 &self,
112 task: TaskId,
113 index: CellId,
114 options: ReadCellOptions,
115 ) -> Result<Result<TypedCellContent, EventListener>>;
116
117 fn try_read_local_output(
132 &self,
133 execution_id: ExecutionId,
134 local_task_id: LocalTaskId,
135 ) -> Result<Result<RawVc, EventListener>>;
136
137 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
138
139 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
140 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32);
141 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap);
142
143 fn try_read_own_task_cell(
146 &self,
147 current_task: TaskId,
148 index: CellId,
149 options: ReadCellOptions,
150 ) -> Result<TypedCellContent>;
151
152 fn read_own_task_cell(
153 &self,
154 task: TaskId,
155 index: CellId,
156 options: ReadCellOptions,
157 ) -> Result<TypedCellContent>;
158 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
159 fn mark_own_task_as_finished(&self, task: TaskId);
160 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32);
161 fn mark_own_task_as_session_dependent(&self, task: TaskId);
162
163 fn connect_task(&self, task: TaskId);
164
165 fn detached_for_testing(
170 &self,
171 f: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
172 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
173
174 fn task_statistics(&self) -> &TaskStatisticsApi;
175
176 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
177
178 fn subscribe_to_compilation_events(
179 &self,
180 event_types: Option<Vec<String>>,
181 ) -> Receiver<Arc<dyn CompilationEvent>>;
182 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>);
183
184 fn is_tracking_dependencies(&self) -> bool;
186}
187
188pub struct Unused<T> {
190 inner: T,
191}
192
193impl<T> Unused<T> {
194 pub unsafe fn new_unchecked(inner: T) -> Self {
200 Self { inner }
201 }
202
203 pub unsafe fn get_unchecked(&self) -> &T {
209 &self.inner
210 }
211
212 pub fn into(self) -> T {
214 self.inner
215 }
216}
217
218pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
220 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;
221
222 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
223 fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
224 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
228 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);
232
233 fn schedule(&self, task: TaskId);
235
236 fn schedule_backend_foreground_job(&self, job: B::BackendJob);
238
239 fn schedule_backend_background_job(&self, job: B::BackendJob);
244
245 fn program_duration_until(&self, instant: Instant) -> Duration;
247
248 fn is_idle(&self) -> bool;
250
251 fn backend(&self) -> &B;
253}
254
255#[allow(clippy::manual_non_exhaustive)]
256pub struct UpdateInfo {
257 pub duration: Duration,
258 pub tasks: usize,
259 pub reasons: InvalidationReasonSet,
260 #[allow(dead_code)]
261 placeholder_for_future_fields: (),
262}
263
264#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
265pub enum TaskPersistence {
266 Persistent,
268
269 Transient,
276
277 Local,
286}
287
288#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
289pub enum ReadConsistency {
290 #[default]
293 Eventual,
294 Strong,
299}
300
301#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
302pub enum ReadTracking {
303 #[default]
305 Tracked,
306 TrackOnlyError,
311 Untracked,
316}
317
318impl ReadTracking {
319 pub fn should_track(&self, is_err: bool) -> bool {
320 match self {
321 ReadTracking::Tracked => true,
322 ReadTracking::TrackOnlyError => is_err,
323 ReadTracking::Untracked => false,
324 }
325 }
326}
327
328impl Display for ReadTracking {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 match self {
331 ReadTracking::Tracked => write!(f, "tracked"),
332 ReadTracking::TrackOnlyError => write!(f, "track only error"),
333 ReadTracking::Untracked => write!(f, "untracked"),
334 }
335 }
336}
337
338pub struct TurboTasks<B: Backend + 'static> {
339 this: Weak<Self>,
340 backend: B,
341 task_id_factory: IdFactoryWithReuse<TaskId>,
342 transient_task_id_factory: IdFactoryWithReuse<TaskId>,
343 execution_id_factory: IdFactory<ExecutionId>,
344 stopped: AtomicBool,
345 currently_scheduled_foreground_jobs: AtomicUsize,
346 currently_scheduled_background_jobs: AtomicUsize,
347 scheduled_tasks: AtomicUsize,
348 start: Mutex<Option<Instant>>,
349 aggregated_update: Mutex<(Option<(Duration, usize)>, InvalidationReasonSet)>,
350 event_foreground_start: Event,
352 event_foreground_done: Event,
355 event_background_done: Event,
357 program_start: Instant,
358 compilation_events: CompilationEventQueue,
359}
360
361struct CurrentTaskState {
370 task_id: Option<TaskId>,
371 execution_id: ExecutionId,
372
373 stateful: bool,
375
376 has_invalidator: bool,
378
379 cell_counters: Option<AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>>,
384
385 local_tasks: Vec<LocalTask>,
387
388 local_task_tracker: TaskTracker,
391}
392
393impl CurrentTaskState {
394 fn new(task_id: TaskId, execution_id: ExecutionId) -> Self {
395 Self {
396 task_id: Some(task_id),
397 execution_id,
398 stateful: false,
399 has_invalidator: false,
400 cell_counters: Some(AutoMap::default()),
401 local_tasks: Vec::new(),
402 local_task_tracker: TaskTracker::new(),
403 }
404 }
405
406 fn new_temporary(execution_id: ExecutionId) -> Self {
407 Self {
408 task_id: None,
409 execution_id,
410 stateful: false,
411 has_invalidator: false,
412 cell_counters: None,
413 local_tasks: Vec::new(),
414 local_task_tracker: TaskTracker::new(),
415 }
416 }
417
418 fn assert_execution_id(&self, expected_execution_id: ExecutionId) {
419 if self.execution_id != expected_execution_id {
420 panic!(
421 "Local tasks can only be scheduled/awaited within the same execution of the \
422 parent task that created them"
423 );
424 }
425 }
426
427 fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
428 self.local_tasks.push(local_task);
429 if cfg!(debug_assertions) {
431 LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
432 } else {
433 unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
434 }
435 }
436
437 fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask {
438 &self.local_tasks[(*local_task_id as usize) - 1]
440 }
441
442 fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
443 &mut self.local_tasks[(*local_task_id as usize) - 1]
444 }
445}
446
447task_local! {
449 static TURBO_TASKS: Arc<dyn TurboTasksApi>;
451
452 static CURRENT_TASK_STATE: Arc<RwLock<CurrentTaskState>>;
453}
454
455impl<B: Backend + 'static> TurboTasks<B> {
456 pub fn new(backend: B) -> Arc<Self> {
462 let task_id_factory = IdFactoryWithReuse::new(
463 TaskId::MIN,
464 TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
465 );
466 let transient_task_id_factory =
467 IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
468 let execution_id_factory = IdFactory::new(ExecutionId::MIN, ExecutionId::MAX);
469 let this = Arc::new_cyclic(|this| Self {
470 this: this.clone(),
471 backend,
472 task_id_factory,
473 transient_task_id_factory,
474 execution_id_factory,
475 stopped: AtomicBool::new(false),
476 currently_scheduled_foreground_jobs: AtomicUsize::new(0),
477 currently_scheduled_background_jobs: AtomicUsize::new(0),
478 scheduled_tasks: AtomicUsize::new(0),
479 start: Default::default(),
480 aggregated_update: Default::default(),
481 event_foreground_done: Event::new(|| {
482 || "TurboTasks::event_foreground_done".to_string()
483 }),
484 event_foreground_start: Event::new(|| {
485 || "TurboTasks::event_foreground_start".to_string()
486 }),
487 event_background_done: Event::new(|| {
488 || "TurboTasks::event_background_done".to_string()
489 }),
490 program_start: Instant::now(),
491 compilation_events: CompilationEventQueue::default(),
492 });
493 this.backend.startup(&*this);
494 this
495 }
496
497 pub fn pin(&self) -> Arc<Self> {
498 self.this.upgrade().unwrap()
499 }
500
501 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
503 where
504 T: ?Sized,
505 F: Fn() -> Fut + Send + Sync + Clone + 'static,
506 Fut: Future<Output = Result<Vc<T>>> + Send,
507 {
508 let id = self.backend.create_transient_task(
509 TransientTaskType::Root(Box::new(move || {
510 let functor = functor.clone();
511 Box::pin(async move {
512 let raw_vc = functor().await?.node;
513 raw_vc.to_non_local().await
514 })
515 })),
516 self,
517 );
518 self.schedule(id);
519 id
520 }
521
522 pub fn dispose_root_task(&self, task_id: TaskId) {
523 self.backend.dispose_root_task(task_id, self);
524 }
525
526 #[track_caller]
530 fn spawn_once_task<T, Fut>(&self, future: Fut)
531 where
532 T: ?Sized,
533 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
534 {
535 let id = self.backend.create_transient_task(
536 TransientTaskType::Once(Box::pin(async move {
537 let raw_vc = future.await?.node;
538 raw_vc.to_non_local().await
539 })),
540 self,
541 );
542 self.schedule(id);
543 }
544
545 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
546 &self,
547 future: impl Future<Output = Result<T>> + Send + 'static,
548 ) -> Result<T> {
549 let (tx, rx) = tokio::sync::oneshot::channel();
550 self.spawn_once_task(async move {
551 let result = future.await;
552 tx.send(result)
553 .map_err(|_| anyhow!("unable to send result"))?;
554 Ok(Completion::new())
555 });
556
557 rx.await?
558 }
559
560 #[tracing::instrument(level = "trace", skip_all, name = "turbo_tasks::run")]
561 pub async fn run<T: TraceRawVcs + Send + 'static>(
562 &self,
563 future: impl Future<Output = Result<T>> + Send + 'static,
564 ) -> Result<T, TurboTasksExecutionError> {
565 self.begin_foreground_job();
566 let execution_id = self.execution_id_factory.wrapping_get();
568 let current_task_state =
569 Arc::new(RwLock::new(CurrentTaskState::new_temporary(execution_id)));
570
571 let result = TURBO_TASKS
572 .scope(
573 self.pin(),
574 CURRENT_TASK_STATE.scope(current_task_state, async {
575 let (result, _duration, _alloc_info) = CaptureFuture::new(future).await;
576
577 let ltt =
579 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().local_task_tracker.clone());
580 ltt.close();
581 ltt.wait().await;
582
583 match result {
584 Ok(Ok(raw_vc)) => Ok(raw_vc),
585 Ok(Err(err)) => Err(err.into()),
586 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
587 }
588 }),
589 )
590 .await;
591 self.finish_foreground_job();
592 result
593 }
594
595 pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
596 let this = self.pin();
597 tokio::spawn(async move {
598 this.pin()
599 .run_once(async move {
600 this.finish_foreground_job();
601 future.await;
602 this.begin_foreground_job();
603 Ok(())
604 })
605 .await
606 .unwrap()
607 });
608 }
609
610 pub(crate) fn native_call(
611 &self,
612 native_fn: &'static NativeFunction,
613 this: Option<RawVc>,
614 arg: Box<dyn MagicAny>,
615 persistence: TaskPersistence,
616 ) -> RawVc {
617 match persistence {
618 TaskPersistence::Local => {
619 let task_type = LocalTaskSpec {
620 task_type: LocalTaskType::Native { native_fn },
621 this,
622 arg,
623 };
624 self.schedule_local_task(task_type, persistence)
625 }
626 TaskPersistence::Transient => {
627 let task_type = CachedTaskType {
628 native_fn,
629 this,
630 arg,
631 };
632
633 RawVc::TaskOutput(self.backend.get_or_create_transient_task(
634 task_type,
635 current_task_if_available("turbo_function calls"),
636 self,
637 ))
638 }
639 TaskPersistence::Persistent => {
640 let task_type = CachedTaskType {
641 native_fn,
642 this,
643 arg,
644 };
645
646 RawVc::TaskOutput(self.backend.get_or_create_persistent_task(
647 task_type,
648 current_task_if_available("turbo_function calls"),
649 self,
650 ))
651 }
652 }
653 }
654
655 pub fn dynamic_call(
656 &self,
657 native_fn: &'static NativeFunction,
658 this: Option<RawVc>,
659 arg: Box<dyn MagicAny>,
660 persistence: TaskPersistence,
661 ) -> RawVc {
662 if this.is_none_or(|this| this.is_resolved()) && native_fn.arg_meta.is_resolved(&*arg) {
663 return self.native_call(native_fn, this, arg, persistence);
664 }
665 let task_type = LocalTaskSpec {
666 task_type: LocalTaskType::ResolveNative { native_fn },
667 this,
668 arg,
669 };
670 self.schedule_local_task(task_type, persistence)
671 }
672
673 pub fn trait_call(
674 &self,
675 trait_method: &'static TraitMethod,
676 this: RawVc,
677 arg: Box<dyn MagicAny>,
678 persistence: TaskPersistence,
679 ) -> RawVc {
680 if let RawVc::TaskCell(_, CellId { type_id, .. }) = this {
684 match registry::get_value_type(type_id).get_trait_method(trait_method) {
685 Some(native_fn) => {
686 let arg = native_fn.arg_meta.filter_owned(arg);
687 return self.dynamic_call(native_fn, Some(this), arg, persistence);
688 }
689 None => {
690 }
694 }
695 }
696
697 let task_type = LocalTaskSpec {
699 task_type: LocalTaskType::ResolveTrait { trait_method },
700 this: Some(this),
701 arg,
702 };
703
704 self.schedule_local_task(task_type, persistence)
705 }
706
707 #[track_caller]
708 pub(crate) fn schedule(&self, task_id: TaskId) {
709 self.begin_foreground_job();
710 self.scheduled_tasks.fetch_add(1, Ordering::AcqRel);
711
712 let this = self.pin();
713 let future = async move {
714 let mut schedule_again = true;
715 while schedule_again {
716 let execution_id = this.execution_id_factory.wrapping_get();
718 let current_task_state =
719 Arc::new(RwLock::new(CurrentTaskState::new(task_id, execution_id)));
720 let single_execution_future = async {
721 if this.stopped.load(Ordering::Acquire) {
722 this.backend.task_execution_canceled(task_id, &*this);
723 return false;
724 }
725
726 let Some(TaskExecutionSpec { future, span }) =
727 this.backend.try_start_task_execution(task_id, &*this)
728 else {
729 return false;
730 };
731
732 async {
733 let (result, duration, alloc_info) = CaptureFuture::new(future).await;
734
735 let ltt = CURRENT_TASK_STATE
737 .with(|ts| ts.read().unwrap().local_task_tracker.clone());
738 ltt.close();
739 ltt.wait().await;
740
741 let result = match result {
742 Ok(Ok(raw_vc)) => Ok(raw_vc),
743 Ok(Err(err)) => Err(err.into()),
744 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
745 };
746
747 let FinishedTaskState {
748 stateful,
749 has_invalidator,
750 } = this.finish_current_task_state();
751 let cell_counters = CURRENT_TASK_STATE
752 .with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
753 this.backend.task_execution_completed(
754 task_id,
755 duration,
756 alloc_info.memory_usage(),
757 result,
758 &cell_counters,
759 stateful,
760 has_invalidator,
761 &*this,
762 )
763 }
764 .instrument(span)
765 .await
766 };
767 schedule_again = CURRENT_TASK_STATE
768 .scope(current_task_state, single_execution_future)
769 .await;
770 }
771 this.finish_foreground_job();
772 anyhow::Ok(())
773 };
774
775 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
776
777 #[cfg(feature = "tokio_tracing")]
778 {
779 let description = self.backend.get_task_description(task_id);
780 tokio::task::Builder::new()
781 .name(&description)
782 .spawn(future)
783 .unwrap();
784 }
785 #[cfg(not(feature = "tokio_tracing"))]
786 tokio::task::spawn(future);
787 }
788
789 fn schedule_local_task(
790 &self,
791 ty: LocalTaskSpec,
792 persistence: TaskPersistence,
800 ) -> RawVc {
801 let task_type = ty.task_type;
802 let (global_task_state, parent_task_id, execution_id, local_task_id) = CURRENT_TASK_STATE
803 .with(|gts| {
804 let mut gts_write = gts.write().unwrap();
805 let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
806 done_event: Event::new(move || {
807 move || format!("LocalTask({task_type})::done_event")
808 }),
809 });
810 (
811 Arc::clone(gts),
812 gts_write.task_id,
813 gts_write.execution_id,
814 local_task_id,
815 )
816 });
817
818 #[cfg(feature = "tokio_tracing")]
819 let description = format!(
820 "[local] (parent: {}) {}",
821 self.backend.get_task_description(parent_task_id),
822 ty.task_type,
823 );
824 #[cfg(not(feature = "tokio_tracing"))]
825 let _ = parent_task_id; let this = self.pin();
828 let future = async move {
829 let TaskExecutionSpec { future, span } =
830 crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
831 async move {
832 let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
833
834 let result = match result {
835 Ok(Ok(raw_vc)) => Ok(raw_vc),
836 Ok(Err(err)) => Err(err.into()),
837 Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
838 };
839
840 let local_task = LocalTask::Done {
841 output: match result {
842 Ok(raw_vc) => OutputContent::Link(raw_vc),
843 Err(err) => OutputContent::Error(err.with_task_context(task_type)),
844 },
845 };
846
847 let done_event = CURRENT_TASK_STATE.with(move |gts| {
848 let mut gts_write = gts.write().unwrap();
849 let scheduled_task =
850 std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task);
851 let LocalTask::Scheduled { done_event } = scheduled_task else {
852 panic!("local task finished, but was not in the scheduled state?");
853 };
854 done_event
855 });
856 done_event.notify(usize::MAX)
857 }
858 .instrument(span)
859 .await
860 };
861 let future = global_task_state
862 .read()
863 .unwrap()
864 .local_task_tracker
865 .track_future(future);
866 let future = CURRENT_TASK_STATE.scope(global_task_state, future);
867 let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();
868
869 #[cfg(feature = "tokio_tracing")]
870 tokio::task::Builder::new()
871 .name(&description)
872 .spawn(future)
873 .unwrap();
874 #[cfg(not(feature = "tokio_tracing"))]
875 tokio::task::spawn(future);
876
877 RawVc::LocalOutput(execution_id, local_task_id, persistence)
878 }
879
880 fn begin_foreground_job(&self) {
881 if self
882 .currently_scheduled_foreground_jobs
883 .fetch_add(1, Ordering::AcqRel)
884 == 0
885 {
886 *self.start.lock().unwrap() = Some(Instant::now());
887 self.event_foreground_start.notify(usize::MAX);
888 self.backend.idle_end(self);
889 }
890 }
891
892 fn finish_foreground_job(&self) {
893 if self
894 .currently_scheduled_foreground_jobs
895 .fetch_sub(1, Ordering::AcqRel)
896 == 1
897 {
898 self.backend.idle_start(self);
899 let total = self.scheduled_tasks.load(Ordering::Acquire);
902 self.scheduled_tasks.store(0, Ordering::Release);
903 if let Some(start) = *self.start.lock().unwrap() {
904 let (update, _) = &mut *self.aggregated_update.lock().unwrap();
905 if let Some(update) = update.as_mut() {
906 update.0 += start.elapsed();
907 update.1 += total;
908 } else {
909 *update = Some((start.elapsed(), total));
910 }
911 }
912 self.event_foreground_done.notify(usize::MAX);
913 }
914 }
915
916 fn begin_background_job(&self) {
917 self.currently_scheduled_background_jobs
918 .fetch_add(1, Ordering::Relaxed);
919 }
920
921 fn finish_background_job(&self) {
922 if self
923 .currently_scheduled_background_jobs
924 .fetch_sub(1, Ordering::Relaxed)
925 == 1
926 {
927 self.event_background_done.notify(usize::MAX);
928 }
929 }
930
931 pub fn get_in_progress_count(&self) -> usize {
932 self.currently_scheduled_foreground_jobs
933 .load(Ordering::Acquire)
934 }
935
936 pub async fn wait_task_completion(
948 &self,
949 id: TaskId,
950 consistency: ReadConsistency,
951 ) -> Result<()> {
952 read_task_output(
953 self,
954 id,
955 ReadOutputOptions {
956 tracking: ReadTracking::Untracked,
958 consistency,
959 },
960 )
961 .await?;
962 Ok(())
963 }
964
965 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
968 self.aggregated_update_info(aggregation, Duration::MAX)
969 .await
970 .unwrap()
971 }
972
973 pub async fn aggregated_update_info(
977 &self,
978 aggregation: Duration,
979 timeout: Duration,
980 ) -> Option<UpdateInfo> {
981 let listener = self
982 .event_foreground_done
983 .listen_with_note(|| || "wait for update info".to_string());
984 let wait_for_finish = {
985 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
986 if aggregation.is_zero() {
987 if let Some((duration, tasks)) = update.take() {
988 return Some(UpdateInfo {
989 duration,
990 tasks,
991 reasons: take(reason_set),
992 placeholder_for_future_fields: (),
993 });
994 } else {
995 true
996 }
997 } else {
998 update.is_none()
999 }
1000 };
1001 if wait_for_finish {
1002 if timeout == Duration::MAX {
1003 listener.await;
1005 } else {
1006 let start_listener = self
1008 .event_foreground_start
1009 .listen_with_note(|| || "wait for update info".to_string());
1010 if self
1011 .currently_scheduled_foreground_jobs
1012 .load(Ordering::Acquire)
1013 == 0
1014 {
1015 start_listener.await;
1016 } else {
1017 drop(start_listener);
1018 }
1019 if timeout.is_zero() || tokio::time::timeout(timeout, listener).await.is_err() {
1020 return None;
1022 }
1023 }
1024 }
1025 if !aggregation.is_zero() {
1026 loop {
1027 select! {
1028 () = tokio::time::sleep(aggregation) => {
1029 break;
1030 }
1031 () = self.event_foreground_done.listen_with_note(|| || "wait for update info".to_string()) => {
1032 }
1034 }
1035 }
1036 }
1037 let (update, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1038 if let Some((duration, tasks)) = update.take() {
1039 Some(UpdateInfo {
1040 duration,
1041 tasks,
1042 reasons: take(reason_set),
1043 placeholder_for_future_fields: (),
1044 })
1045 } else {
1046 panic!("aggregated_update_info must not called concurrently")
1047 }
1048 }
1049
1050 pub async fn wait_background_done(&self) {
1051 let listener = self.event_background_done.listen();
1052 if self
1053 .currently_scheduled_background_jobs
1054 .load(Ordering::Acquire)
1055 != 0
1056 {
1057 listener.await;
1058 }
1059 }
1060
1061 pub async fn stop_and_wait(&self) {
1062 turbo_tasks_future_scope(self.pin(), async move {
1063 self.backend.stopping(self);
1064 self.stopped.store(true, Ordering::Release);
1065 {
1066 let listener = self
1067 .event_foreground_done
1068 .listen_with_note(|| || "wait for stop".to_string());
1069 if self
1070 .currently_scheduled_foreground_jobs
1071 .load(Ordering::Acquire)
1072 != 0
1073 {
1074 listener.await;
1075 }
1076 }
1077 {
1078 let listener = self.event_background_done.listen();
1079 if self
1080 .currently_scheduled_background_jobs
1081 .load(Ordering::Acquire)
1082 != 0
1083 {
1084 listener.await;
1085 }
1086 }
1087 self.backend.stop(self);
1088 })
1089 .await;
1090 }
1091
1092 #[track_caller]
1093 pub(crate) fn schedule_foreground_job<T>(&self, func: T)
1094 where
1095 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1096 T::CallOnceFuture: Send,
1097 {
1098 let mut this = self.pin();
1099 this.begin_foreground_job();
1100 tokio::spawn(
1101 TURBO_TASKS
1102 .scope(this.clone(), async move {
1103 if !this.stopped.load(Ordering::Acquire) {
1104 this = func(this.clone()).await;
1105 }
1106 this.finish_foreground_job();
1107 })
1108 .in_current_span(),
1109 );
1110 }
1111
1112 #[track_caller]
1113 pub(crate) fn schedule_background_job<T>(&self, func: T)
1114 where
1115 T: AsyncFnOnce(Arc<TurboTasks<B>>) -> Arc<TurboTasks<B>> + Send + 'static,
1116 T::CallOnceFuture: Send,
1117 {
1118 let mut this = self.pin();
1119 self.begin_background_job();
1120 tokio::spawn(
1121 TURBO_TASKS
1122 .scope(this.clone(), async move {
1123 if !this.stopped.load(Ordering::Acquire) {
1124 this = func(this).await;
1125 }
1126 this.finish_background_job();
1127 })
1128 .in_current_span(),
1129 );
1130 }
1131
1132 fn finish_current_task_state(&self) -> FinishedTaskState {
1133 let (stateful, has_invalidator) = CURRENT_TASK_STATE.with(|cell| {
1134 let CurrentTaskState {
1135 stateful,
1136 has_invalidator,
1137 ..
1138 } = &mut *cell.write().unwrap();
1139 (*stateful, *has_invalidator)
1140 });
1141
1142 FinishedTaskState {
1143 stateful,
1144 has_invalidator,
1145 }
1146 }
1147
1148 pub fn backend(&self) -> &B {
1149 &self.backend
1150 }
1151}
1152
1153struct FinishedTaskState {
1154 stateful: bool,
1156
1157 has_invalidator: bool,
1159}
1160
1161impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
1162 fn dynamic_call(
1163 &self,
1164 native_fn: &'static NativeFunction,
1165 this: Option<RawVc>,
1166 arg: Box<dyn MagicAny>,
1167 persistence: TaskPersistence,
1168 ) -> RawVc {
1169 self.dynamic_call(native_fn, this, arg, persistence)
1170 }
1171 fn native_call(
1172 &self,
1173 native_fn: &'static NativeFunction,
1174 this: Option<RawVc>,
1175 arg: Box<dyn MagicAny>,
1176 persistence: TaskPersistence,
1177 ) -> RawVc {
1178 self.native_call(native_fn, this, arg, persistence)
1179 }
1180 fn trait_call(
1181 &self,
1182 trait_method: &'static TraitMethod,
1183 this: RawVc,
1184 arg: Box<dyn MagicAny>,
1185 persistence: TaskPersistence,
1186 ) -> RawVc {
1187 self.trait_call(trait_method, this, arg, persistence)
1188 }
1189
1190 #[track_caller]
1191 fn run(
1192 &self,
1193 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1194 ) -> Pin<Box<dyn Future<Output = Result<(), TurboTasksExecutionError>> + Send>> {
1195 let this = self.pin();
1196 Box::pin(async move { this.run(future).await })
1197 }
1198
1199 #[track_caller]
1200 fn run_once(
1201 &self,
1202 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1203 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1204 let this = self.pin();
1205 Box::pin(async move { this.run_once(future).await })
1206 }
1207
1208 #[track_caller]
1209 fn run_once_with_reason(
1210 &self,
1211 reason: StaticOrArc<dyn InvalidationReason>,
1212 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1213 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1214 {
1215 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1216 reason_set.insert(reason);
1217 }
1218 let this = self.pin();
1219 Box::pin(async move { this.run_once(future).await })
1220 }
1221
1222 #[track_caller]
1223 fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1224 self.start_once_process(future)
1225 }
1226}
1227
1228impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
1229 #[instrument(level = "info", skip_all, name = "invalidate")]
1230 fn invalidate(&self, task: TaskId) {
1231 self.backend.invalidate_task(task, self);
1232 }
1233
1234 #[instrument(level = "info", skip_all, name = "invalidate", fields(name = display(&reason)))]
1235 fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>) {
1236 {
1237 let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
1238 reason_set.insert(reason);
1239 }
1240 self.backend.invalidate_task(task, self);
1241 }
1242
1243 fn invalidate_serialization(&self, task: TaskId) {
1244 self.backend.invalidate_serialization(task, self);
1245 }
1246
1247 fn try_read_task_output(
1248 &self,
1249 task: TaskId,
1250 options: ReadOutputOptions,
1251 ) -> Result<Result<RawVc, EventListener>> {
1252 self.backend.try_read_task_output(
1253 task,
1254 current_task_if_available("reading Vcs"),
1255 options,
1256 self,
1257 )
1258 }
1259
1260 fn try_read_task_cell(
1261 &self,
1262 task: TaskId,
1263 index: CellId,
1264 options: ReadCellOptions,
1265 ) -> Result<Result<TypedCellContent, EventListener>> {
1266 self.backend.try_read_task_cell(
1267 task,
1268 index,
1269 current_task_if_available("reading Vcs"),
1270 options,
1271 self,
1272 )
1273 }
1274
1275 fn try_read_own_task_cell(
1276 &self,
1277 current_task: TaskId,
1278 index: CellId,
1279 options: ReadCellOptions,
1280 ) -> Result<TypedCellContent> {
1281 self.backend
1282 .try_read_own_task_cell(current_task, index, options, self)
1283 }
1284
1285 fn try_read_local_output(
1286 &self,
1287 execution_id: ExecutionId,
1288 local_task_id: LocalTaskId,
1289 ) -> Result<Result<RawVc, EventListener>> {
1290 CURRENT_TASK_STATE.with(|gts| {
1291 let gts_read = gts.read().unwrap();
1292
1293 gts_read.assert_execution_id(execution_id);
1298
1299 match gts_read.get_local_task(local_task_id) {
1300 LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
1301 LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
1302 }
1303 })
1304 }
1305
1306 fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
1307 self.backend.read_task_collectibles(
1308 task,
1309 trait_id,
1310 current_task_if_available("reading collectibles"),
1311 self,
1312 )
1313 }
1314
1315 fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc) {
1316 self.backend.emit_collectible(
1317 trait_type,
1318 collectible,
1319 current_task("emitting collectible"),
1320 self,
1321 );
1322 }
1323
1324 fn unemit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc, count: u32) {
1325 self.backend.unemit_collectible(
1326 trait_type,
1327 collectible,
1328 count,
1329 current_task("emitting collectible"),
1330 self,
1331 );
1332 }
1333
1334 fn unemit_collectibles(&self, trait_type: TraitTypeId, collectibles: &TaskCollectiblesMap) {
1335 for (&collectible, &count) in collectibles {
1336 if count > 0 {
1337 self.backend.unemit_collectible(
1338 trait_type,
1339 collectible,
1340 count as u32,
1341 current_task("emitting collectible"),
1342 self,
1343 );
1344 }
1345 }
1346 }
1347
1348 fn read_own_task_cell(
1349 &self,
1350 task: TaskId,
1351 index: CellId,
1352 options: ReadCellOptions,
1353 ) -> Result<TypedCellContent> {
1354 self.try_read_own_task_cell(task, index, options)
1355 }
1356
1357 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
1358 self.backend.update_task_cell(task, index, content, self);
1359 }
1360
1361 fn connect_task(&self, task: TaskId) {
1362 self.backend
1363 .connect_task(task, current_task_if_available("connecting task"), self);
1364 }
1365
1366 fn mark_own_task_as_finished(&self, task: TaskId) {
1367 self.backend.mark_own_task_as_finished(task, self);
1368 }
1369
1370 fn set_own_task_aggregation_number(&self, task: TaskId, aggregation_number: u32) {
1371 self.backend
1372 .set_own_task_aggregation_number(task, aggregation_number, self);
1373 }
1374
1375 fn mark_own_task_as_session_dependent(&self, task: TaskId) {
1376 self.backend.mark_own_task_as_session_dependent(task, self);
1377 }
1378
1379 fn detached_for_testing(
1382 &self,
1383 fut: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1384 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
1385 let global_task_state = CURRENT_TASK_STATE.with(|ts| ts.clone());
1388 let tracked_fut = {
1389 let ts = global_task_state.read().unwrap();
1390 ts.local_task_tracker.track_future(fut)
1391 };
1392 Box::pin(TURBO_TASKS.scope(
1393 turbo_tasks(),
1394 CURRENT_TASK_STATE.scope(global_task_state, tracked_fut),
1395 ))
1396 }
1397
1398 fn task_statistics(&self) -> &TaskStatisticsApi {
1399 self.backend.task_statistics()
1400 }
1401
1402 fn stop_and_wait(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
1403 let this = self.pin();
1404 Box::pin(async move {
1405 this.stop_and_wait().await;
1406 })
1407 }
1408
1409 fn subscribe_to_compilation_events(
1410 &self,
1411 event_types: Option<Vec<String>>,
1412 ) -> Receiver<Arc<dyn CompilationEvent>> {
1413 self.compilation_events.subscribe(event_types)
1414 }
1415
1416 fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
1417 if let Err(e) = self.compilation_events.send(event) {
1418 tracing::warn!("Failed to send compilation event: {e}");
1419 }
1420 }
1421
1422 fn is_tracking_dependencies(&self) -> bool {
1423 self.backend.is_tracking_dependencies()
1424 }
1425}
1426
1427impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
1428 fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>> {
1429 self.pin()
1430 }
1431 fn backend(&self) -> &B {
1432 &self.backend
1433 }
1434
1435 #[track_caller]
1436 fn schedule_backend_background_job(&self, job: B::BackendJob) {
1437 self.schedule_background_job(async move |this| {
1438 this.backend.run_backend_job(job, &*this).await;
1439 this
1440 })
1441 }
1442
1443 #[track_caller]
1444 fn schedule_backend_foreground_job(&self, job: B::BackendJob) {
1445 self.schedule_foreground_job(async move |this| {
1446 this.backend.run_backend_job(job, &*this).await;
1447 this
1448 })
1449 }
1450
1451 #[track_caller]
1452 fn schedule(&self, task: TaskId) {
1453 self.schedule(task)
1454 }
1455
1456 fn program_duration_until(&self, instant: Instant) -> Duration {
1457 instant - self.program_start
1458 }
1459
1460 fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
1461 unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
1463 }
1464
1465 fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
1466 unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
1468 }
1469
1470 unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
1471 unsafe { self.task_id_factory.reuse(id.into()) }
1472 }
1473
1474 unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
1475 unsafe { self.transient_task_id_factory.reuse(id.into()) }
1476 }
1477
1478 fn is_idle(&self) -> bool {
1479 self.currently_scheduled_foreground_jobs
1480 .load(Ordering::Acquire)
1481 == 0
1482 }
1483}
1484
1485pub(crate) fn current_task_if_available(from: &str) -> Option<TaskId> {
1486 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1487 Ok(id) => id,
1488 Err(_) => panic!(
1489 "{from} can only be used in the context of a turbo_tasks task execution or \
1490 turbo_tasks run"
1491 ),
1492 }
1493}
1494
1495pub(crate) fn current_task(from: &str) -> TaskId {
1496 match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) {
1497 Ok(Some(id)) => id,
1498 Ok(None) | Err(_) => {
1499 panic!("{from} can only be used in the context of a turbo_tasks task execution")
1500 }
1501 }
1502}
1503
1504pub async fn run<T: Send + 'static>(
1505 tt: Arc<dyn TurboTasksApi>,
1506 future: impl Future<Output = Result<T>> + Send + 'static,
1507) -> Result<T> {
1508 let (tx, rx) = tokio::sync::oneshot::channel();
1509
1510 tt.run(Box::pin(async move {
1511 let result = future.await?;
1512 tx.send(result)
1513 .map_err(|_| anyhow!("unable to send result"))?;
1514 Ok(())
1515 }))
1516 .await?;
1517
1518 Ok(rx.await?)
1519}
1520
1521pub async fn run_once<T: Send + 'static>(
1522 tt: Arc<dyn TurboTasksApi>,
1523 future: impl Future<Output = Result<T>> + Send + 'static,
1524) -> Result<T> {
1525 let (tx, rx) = tokio::sync::oneshot::channel();
1526
1527 tt.run_once(Box::pin(async move {
1528 let result = future.await?;
1529 tx.send(result)
1530 .map_err(|_| anyhow!("unable to send result"))?;
1531 Ok(())
1532 }))
1533 .await?;
1534
1535 Ok(rx.await?)
1536}
1537
1538pub async fn run_once_with_reason<T: Send + 'static>(
1539 tt: Arc<dyn TurboTasksApi>,
1540 reason: impl InvalidationReason,
1541 future: impl Future<Output = Result<T>> + Send + 'static,
1542) -> Result<T> {
1543 let (tx, rx) = tokio::sync::oneshot::channel();
1544
1545 tt.run_once_with_reason(
1546 (Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
1547 Box::pin(async move {
1548 let result = future.await?;
1549 tx.send(result)
1550 .map_err(|_| anyhow!("unable to send result"))?;
1551 Ok(())
1552 }),
1553 )
1554 .await?;
1555
1556 Ok(rx.await?)
1557}
1558
1559pub fn dynamic_call(
1561 func: &'static NativeFunction,
1562 this: Option<RawVc>,
1563 arg: Box<dyn MagicAny>,
1564 persistence: TaskPersistence,
1565) -> RawVc {
1566 with_turbo_tasks(|tt| tt.dynamic_call(func, this, arg, persistence))
1567}
1568
1569pub fn trait_call(
1571 trait_method: &'static TraitMethod,
1572 this: RawVc,
1573 arg: Box<dyn MagicAny>,
1574 persistence: TaskPersistence,
1575) -> RawVc {
1576 with_turbo_tasks(|tt| tt.trait_call(trait_method, this, arg, persistence))
1577}
1578
1579pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
1580 TURBO_TASKS.with(|arc| arc.clone())
1581}
1582
1583pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1584 TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1585}
1586
1587pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
1588 TURBO_TASKS.with(|arc| func(arc))
1589}
1590
1591pub fn turbo_tasks_scope<T>(tt: Arc<dyn TurboTasksApi>, f: impl FnOnce() -> T) -> T {
1592 TURBO_TASKS.sync_scope(tt, f)
1593}
1594
1595pub fn turbo_tasks_future_scope<T>(
1596 tt: Arc<dyn TurboTasksApi>,
1597 f: impl Future<Output = T>,
1598) -> impl Future<Output = T> {
1599 TURBO_TASKS.scope(tt, f)
1600}
1601
1602pub fn with_turbo_tasks_for_testing<T>(
1603 tt: Arc<dyn TurboTasksApi>,
1604 current_task: TaskId,
1605 execution_id: ExecutionId,
1606 f: impl Future<Output = T>,
1607) -> impl Future<Output = T> {
1608 TURBO_TASKS.scope(
1609 tt,
1610 CURRENT_TASK_STATE.scope(
1611 Arc::new(RwLock::new(CurrentTaskState::new(
1612 current_task,
1613 execution_id,
1614 ))),
1615 f,
1616 ),
1617 )
1618}
1619
1620pub fn spawn_detached_for_testing(f: impl Future<Output = Result<()>> + Send + 'static) {
1625 tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(f.in_current_span())));
1626}
1627
1628pub fn current_task_for_testing() -> Option<TaskId> {
1629 CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
1630}
1631
1632pub fn mark_session_dependent() {
1634 with_turbo_tasks(|tt| {
1635 tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()"))
1636 });
1637}
1638
1639pub fn mark_root() {
1642 with_turbo_tasks(|tt| {
1643 tt.set_own_task_aggregation_number(current_task("turbo_tasks::mark_root()"), u32::MAX)
1644 });
1645}
1646
1647pub fn mark_finished() {
1650 with_turbo_tasks(|tt| {
1651 tt.mark_own_task_as_finished(current_task("turbo_tasks::mark_finished()"))
1652 });
1653}
1654
1655pub fn mark_stateful() -> SerializationInvalidator {
1661 CURRENT_TASK_STATE.with(|cell| {
1662 let CurrentTaskState {
1663 stateful, task_id, ..
1664 } = &mut *cell.write().unwrap();
1665 *stateful = true;
1666 let Some(task_id) = *task_id else {
1667 panic!(
1668 "mark_stateful() can only be used in the context of a turbo_tasks task execution"
1669 );
1670 };
1671 SerializationInvalidator::new(task_id)
1672 })
1673}
1674
1675pub fn mark_invalidator() {
1676 CURRENT_TASK_STATE.with(|cell| {
1677 let CurrentTaskState {
1678 has_invalidator, ..
1679 } = &mut *cell.write().unwrap();
1680 *has_invalidator = true;
1681 })
1682}
1683
1684pub fn prevent_gc() {
1685 mark_stateful();
1687}
1688
1689pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
1690 with_turbo_tasks(|tt| {
1691 let raw_vc = collectible.node.node;
1692 tt.emit_collectible(T::get_trait_type_id(), raw_vc)
1693 })
1694}
1695
1696pub(crate) async fn read_task_output(
1697 this: &dyn TurboTasksApi,
1698 id: TaskId,
1699 options: ReadOutputOptions,
1700) -> Result<RawVc> {
1701 loop {
1702 match this.try_read_task_output(id, options)? {
1703 Ok(result) => return Ok(result),
1704 Err(listener) => listener.await,
1705 }
1706 }
1707}
1708
1709pub(crate) async fn read_task_cell(
1710 this: &dyn TurboTasksApi,
1711 id: TaskId,
1712 index: CellId,
1713 options: ReadCellOptions,
1714) -> Result<TypedCellContent> {
1715 loop {
1716 match this.try_read_task_cell(id, index, options)? {
1717 Ok(result) => return Ok(result),
1718 Err(listener) => listener.await,
1719 }
1720 }
1721}
1722
1723#[derive(Clone, Copy, Serialize, Deserialize)]
1729pub struct CurrentCellRef {
1730 current_task: TaskId,
1731 index: CellId,
1732}
1733
1734type VcReadRepr<T> = <<T as VcValueType>::Read as VcRead<T>>::Repr;
1735
1736impl CurrentCellRef {
1737 pub fn conditional_update<T>(&self, functor: impl FnOnce(Option<&T>) -> Option<T>)
1739 where
1740 T: VcValueType,
1741 {
1742 self.conditional_update_with_shared_reference(|old_shared_reference| {
1743 let old_ref = old_shared_reference
1744 .and_then(|sr| sr.0.downcast_ref::<VcReadRepr<T>>())
1745 .map(|content| <T::Read as VcRead<T>>::repr_to_value_ref(content));
1746 let new_value = functor(old_ref)?;
1747 Some(SharedReference::new(triomphe::Arc::new(
1748 <T::Read as VcRead<T>>::value_to_repr(new_value),
1749 )))
1750 })
1751 }
1752
1753 pub fn conditional_update_with_shared_reference(
1755 &self,
1756 functor: impl FnOnce(Option<&SharedReference>) -> Option<SharedReference>,
1757 ) {
1758 let tt = turbo_tasks();
1759 let cell_content = tt
1760 .read_own_task_cell(
1761 self.current_task,
1762 self.index,
1763 ReadCellOptions {
1764 tracking: ReadTracking::Untracked,
1765 ..Default::default()
1766 },
1767 )
1768 .ok();
1769 let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref()));
1770 if let Some(update) = update {
1771 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(update)))
1772 }
1773 }
1774
1775 pub fn compare_and_update<T>(&self, new_value: T)
1809 where
1810 T: PartialEq + VcValueType,
1811 {
1812 self.conditional_update(|old_value| {
1813 if let Some(old_value) = old_value
1814 && old_value == &new_value
1815 {
1816 return None;
1817 }
1818 Some(new_value)
1819 });
1820 }
1821
1822 pub fn compare_and_update_with_shared_reference<T>(&self, new_shared_reference: SharedReference)
1831 where
1832 T: VcValueType + PartialEq,
1833 {
1834 fn extract_sr_value<T: VcValueType>(sr: &SharedReference) -> &T {
1835 <T::Read as VcRead<T>>::repr_to_value_ref(
1836 sr.0.downcast_ref::<VcReadRepr<T>>()
1837 .expect("cannot update SharedReference of different type"),
1838 )
1839 }
1840 self.conditional_update_with_shared_reference(|old_sr| {
1841 if let Some(old_sr) = old_sr {
1842 let old_value: &T = extract_sr_value(old_sr);
1843 let new_value = extract_sr_value(&new_shared_reference);
1844 if old_value == new_value {
1845 return None;
1846 }
1847 }
1848 Some(new_shared_reference)
1849 });
1850 }
1851
1852 pub fn update<T>(&self, new_value: T)
1854 where
1855 T: VcValueType,
1856 {
1857 let tt = turbo_tasks();
1858 tt.update_own_task_cell(
1859 self.current_task,
1860 self.index,
1861 CellContent(Some(SharedReference::new(triomphe::Arc::new(
1862 <T::Read as VcRead<T>>::value_to_repr(new_value),
1863 )))),
1864 )
1865 }
1866
1867 pub fn update_with_shared_reference(&self, shared_ref: SharedReference) {
1876 let tt = turbo_tasks();
1877 let content = tt
1878 .read_own_task_cell(
1879 self.current_task,
1880 self.index,
1881 ReadCellOptions {
1882 tracking: ReadTracking::Untracked,
1883 ..Default::default()
1884 },
1885 )
1886 .ok();
1887 let update = if let Some(TypedCellContent(_, CellContent(Some(shared_ref_exp)))) = content {
1888 shared_ref_exp != shared_ref
1890 } else {
1891 true
1892 };
1893 if update {
1894 tt.update_own_task_cell(self.current_task, self.index, CellContent(Some(shared_ref)))
1895 }
1896 }
1897}
1898
1899impl From<CurrentCellRef> for RawVc {
1900 fn from(cell: CurrentCellRef) -> Self {
1901 RawVc::TaskCell(cell.current_task, cell.index)
1902 }
1903}
1904
1905pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
1906 CURRENT_TASK_STATE.with(|ts| {
1907 let current_task = current_task("celling turbo_tasks values");
1908 let mut ts = ts.write().unwrap();
1909 let map = ts.cell_counters.as_mut().unwrap();
1910 let current_index = map.entry(ty).or_default();
1911 let index = *current_index;
1912 *current_index += 1;
1913 CurrentCellRef {
1914 current_task,
1915 index: CellId { type_id: ty, index },
1916 }
1917 })
1918}
1919
1920pub(crate) async fn read_local_output(
1921 this: &dyn TurboTasksApi,
1922 execution_id: ExecutionId,
1923 local_task_id: LocalTaskId,
1924) -> Result<RawVc> {
1925 loop {
1926 match this.try_read_local_output(execution_id, local_task_id)? {
1927 Ok(raw_vc) => return Ok(raw_vc),
1928 Err(event_listener) => event_listener.await,
1929 }
1930 }
1931}