1use std::{
2 borrow::Cow,
3 error::Error,
4 fmt::{self, Debug, Display},
5 future::Future,
6 hash::{BuildHasherDefault, Hash},
7 pin::Pin,
8 sync::Arc,
9};
10
11use anyhow::{Result, anyhow};
12use auto_hash_map::AutoMap;
13use rustc_hash::FxHasher;
14use serde::{Deserialize, Serialize};
15use tracing::Span;
16use turbo_rcstr::RcStr;
17
18use crate::{
19 RawVc, ReadCellOptions, ReadOutputOptions, ReadRef, SharedReference, TaskId, TaskIdSet,
20 TraitRef, TraitTypeId, TurboTasksPanic, ValueTypeId, VcRead, VcValueTrait, VcValueType,
21 event::EventListener, macro_helpers::NativeFunction, magic_any::MagicAny,
22 manager::TurboTasksBackendApi, raw_vc::CellId, registry,
23 task::shared_reference::TypedSharedReference, task_statistics::TaskStatisticsApi,
24 triomphe_utils::unchecked_sidecast_triomphe_arc,
25};
26
27pub type TransientTaskRoot =
28 Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<RawVc>> + Send>> + Send + Sync>;
29
30pub enum TransientTaskType {
31 Root(TransientTaskRoot),
37
38 Once(Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>),
48}
49
50impl Debug for TransientTaskType {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 Self::Root(_) => f.debug_tuple("Root").finish(),
54 Self::Once(_) => f.debug_tuple("Once").finish(),
55 }
56 }
57}
58
59#[derive(Debug, Eq)]
62pub struct CachedTaskType {
63 pub native_fn: &'static NativeFunction,
64 pub this: Option<RawVc>,
65 pub arg: Box<dyn MagicAny>,
66}
67
68impl CachedTaskType {
69 pub fn get_name(&self) -> &'static str {
72 self.native_fn.name
73 }
74}
75
76impl PartialEq for CachedTaskType {
79 #[expect(clippy::op_ref)]
80 fn eq(&self, other: &Self) -> bool {
81 self.native_fn == other.native_fn && self.this == other.this && &self.arg == &other.arg
82 }
83}
84
85impl Hash for CachedTaskType {
88 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
89 self.native_fn.hash(state);
90 self.this.hash(state);
91 self.arg.hash(state);
92 }
93}
94
95impl Display for CachedTaskType {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.write_str(self.get_name())
98 }
99}
100
101mod ser {
102 use std::any::Any;
103
104 use serde::{
105 Deserialize, Deserializer, Serialize, Serializer,
106 de::{self},
107 ser::{SerializeSeq, SerializeTuple},
108 };
109
110 use super::*;
111
112 impl Serialize for TypedCellContent {
113 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
114 where
115 S: Serializer,
116 {
117 let value_type = registry::get_value_type(self.0);
118 let serializable = if let Some(value) = &self.1.0 {
119 value_type.any_as_serializable(&value.0)
120 } else {
121 None
122 };
123 let mut state = serializer.serialize_tuple(3)?;
124 state.serialize_element(&self.0)?;
125 if let Some(serializable) = serializable {
126 state.serialize_element(&true)?;
127 state.serialize_element(serializable)?;
128 } else {
129 state.serialize_element(&false)?;
130 state.serialize_element(&())?;
131 }
132 state.end()
133 }
134 }
135
136 impl<'de> Deserialize<'de> for TypedCellContent {
137 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
138 where
139 D: Deserializer<'de>,
140 {
141 struct Visitor;
142
143 impl<'de> serde::de::Visitor<'de> for Visitor {
144 type Value = TypedCellContent;
145
146 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
147 write!(formatter, "a valid TypedCellContent")
148 }
149
150 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
151 where
152 A: de::SeqAccess<'de>,
153 {
154 let value_type: ValueTypeId = seq
155 .next_element()?
156 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
157 let has_value: bool = seq
158 .next_element()?
159 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
160 if has_value {
161 let seed = registry::get_value_type(value_type)
162 .get_any_deserialize_seed()
163 .ok_or_else(|| {
164 de::Error::custom("Value type doesn't support deserialization")
165 })?;
166 let value = seq
167 .next_element_seed(seed)?
168 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
169 let arc = triomphe::Arc::<dyn Any + Send + Sync>::from(value);
170 Ok(TypedCellContent(
171 value_type,
172 CellContent(Some(SharedReference(arc))),
173 ))
174 } else {
175 let () = seq
176 .next_element()?
177 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
178 Ok(TypedCellContent(value_type, CellContent(None)))
179 }
180 }
181 }
182
183 deserializer.deserialize_tuple(2, Visitor)
184 }
185 }
186
187 enum FunctionAndArg<'a> {
188 Owned {
189 native_fn: &'static NativeFunction,
190 arg: Box<dyn MagicAny>,
191 },
192 Borrowed {
193 native_fn: &'static NativeFunction,
194 arg: &'a dyn MagicAny,
195 },
196 }
197
198 impl Serialize for FunctionAndArg<'_> {
199 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
200 where
201 S: Serializer,
202 {
203 let FunctionAndArg::Borrowed { native_fn, arg } = self else {
204 unreachable!();
205 };
206 let mut state = serializer.serialize_seq(Some(2))?;
207 state.serialize_element(®istry::get_function_id(native_fn))?;
208 let arg = *arg;
209 let arg = native_fn.arg_meta.as_serialize(arg);
210 state.serialize_element(arg)?;
211 state.end()
212 }
213 }
214
215 impl<'de> Deserialize<'de> for FunctionAndArg<'de> {
216 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
217 struct Visitor;
218 impl<'de> serde::de::Visitor<'de> for Visitor {
219 type Value = FunctionAndArg<'de>;
220
221 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
222 write!(formatter, "a valid FunctionAndArg")
223 }
224
225 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
226 where
227 A: serde::de::SeqAccess<'de>,
228 {
229 let fn_id = seq
230 .next_element()?
231 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
232 let native_fn = registry::get_native_function(fn_id);
233 let seed = native_fn.arg_meta.deserialization_seed();
234 let arg = seq
235 .next_element_seed(seed)?
236 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
237 Ok(FunctionAndArg::Owned { native_fn, arg })
238 }
239 }
240 deserializer.deserialize_seq(Visitor)
241 }
242 }
243
244 impl Serialize for CachedTaskType {
245 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
246 where
247 S: ser::Serializer,
248 {
249 let CachedTaskType {
250 native_fn,
251 this,
252 arg,
253 } = self;
254 let mut s = serializer.serialize_tuple(2)?;
255 s.serialize_element(&FunctionAndArg::Borrowed {
256 native_fn,
257 arg: &**arg,
258 })?;
259 s.serialize_element(this)?;
260 s.end()
261 }
262 }
263
264 impl<'de> Deserialize<'de> for CachedTaskType {
265 fn deserialize<D: ser::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
266 struct Visitor;
267 impl<'de> serde::de::Visitor<'de> for Visitor {
268 type Value = CachedTaskType;
269
270 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
271 write!(formatter, "a valid PersistentTaskType")
272 }
273
274 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
275 where
276 A: serde::de::SeqAccess<'de>,
277 {
278 let FunctionAndArg::Owned { native_fn, arg } = seq
279 .next_element()?
280 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?
281 else {
282 unreachable!();
283 };
284 let this = seq
285 .next_element()?
286 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
287 Ok(CachedTaskType {
288 native_fn,
289 this,
290 arg,
291 })
292 }
293 }
294 deserializer.deserialize_tuple(2, Visitor)
295 }
296 }
297}
298
299pub struct TaskExecutionSpec<'a> {
300 pub future: Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'a>>,
301 pub span: Span,
302}
303
304#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
305pub struct CellContent(pub Option<SharedReference>);
306#[derive(Clone, Debug, PartialEq, Eq, Hash)]
307pub struct TypedCellContent(pub ValueTypeId, pub CellContent);
308
309impl Display for CellContent {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 match &self.0 {
312 None => write!(f, "empty"),
313 Some(content) => Display::fmt(content, f),
314 }
315 }
316}
317
318impl TypedCellContent {
319 pub fn cast<T: VcValueType>(self) -> Result<ReadRef<T>> {
320 let data = self.1.0.ok_or_else(|| anyhow!("Cell is empty"))?;
321 let data = data
322 .downcast::<<T::Read as VcRead<T>>::Repr>()
323 .map_err(|_err| anyhow!("Unexpected type in cell"))?;
324 let data = unsafe { unchecked_sidecast_triomphe_arc(data) };
327 Ok(ReadRef::new_arc(data))
328 }
329
330 pub fn cast_trait<T>(self) -> Result<TraitRef<T>>
335 where
336 T: VcValueTrait + ?Sized,
337 {
338 let shared_reference = self
339 .1
340 .0
341 .ok_or_else(|| anyhow!("Cell is empty"))?
342 .into_typed(self.0);
343 Ok(
344 TraitRef::new(shared_reference),
346 )
347 }
348
349 pub fn into_untyped(self) -> CellContent {
350 self.1
351 }
352}
353
354impl From<TypedSharedReference> for TypedCellContent {
355 fn from(value: TypedSharedReference) -> Self {
356 TypedCellContent(value.type_id, CellContent(Some(value.reference)))
357 }
358}
359
360impl TryFrom<TypedCellContent> for TypedSharedReference {
361 type Error = TypedCellContent;
362
363 fn try_from(content: TypedCellContent) -> Result<Self, TypedCellContent> {
364 if let TypedCellContent(type_id, CellContent(Some(reference))) = content {
365 Ok(TypedSharedReference { type_id, reference })
366 } else {
367 Err(content)
368 }
369 }
370}
371
372impl CellContent {
373 pub fn into_typed(self, type_id: ValueTypeId) -> TypedCellContent {
374 TypedCellContent(type_id, self)
375 }
376}
377
378impl From<SharedReference> for CellContent {
379 fn from(value: SharedReference) -> Self {
380 CellContent(Some(value))
381 }
382}
383
384impl From<Option<SharedReference>> for CellContent {
385 fn from(value: Option<SharedReference>) -> Self {
386 CellContent(value)
387 }
388}
389
390impl TryFrom<CellContent> for SharedReference {
391 type Error = CellContent;
392
393 fn try_from(content: CellContent) -> Result<Self, CellContent> {
394 if let CellContent(Some(shared_reference)) = content {
395 Ok(shared_reference)
396 } else {
397 Err(content)
398 }
399 }
400}
401
402pub type TaskCollectiblesMap = AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1>;
403
404#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
407pub enum TurboTasksExecutionErrorMessage {
408 PIISafe(Cow<'static, str>),
409 NonPIISafe(String),
410}
411
412impl Display for TurboTasksExecutionErrorMessage {
413 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
414 match self {
415 TurboTasksExecutionErrorMessage::PIISafe(msg) => write!(f, "{msg}"),
416 TurboTasksExecutionErrorMessage::NonPIISafe(msg) => write!(f, "{msg}"),
417 }
418 }
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
422pub struct TurboTasksError {
423 pub message: TurboTasksExecutionErrorMessage,
424 pub source: Option<TurboTasksExecutionError>,
425}
426
427#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
428pub struct TurboTaskContextError {
429 pub task: RcStr,
430 #[cfg(feature = "task_id_details")]
431 pub task_id: Option<TaskId>,
432 pub source: Option<TurboTasksExecutionError>,
433}
434
435#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
436pub enum TurboTasksExecutionError {
437 Panic(Arc<TurboTasksPanic>),
438 Error(Arc<TurboTasksError>),
439 TaskContext(Arc<TurboTaskContextError>),
440}
441
442impl TurboTasksExecutionError {
443 pub fn with_task_context(&self, task: impl Display, _task_id: Option<TaskId>) -> Self {
444 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
445 task: RcStr::from(task.to_string()),
446 #[cfg(feature = "task_id_details")]
447 task_id: _task_id,
448 source: Some(self.clone()),
449 }))
450 }
451}
452
453impl Error for TurboTasksExecutionError {
454 fn source(&self) -> Option<&(dyn Error + 'static)> {
455 match self {
456 TurboTasksExecutionError::Panic(_panic) => None,
457 TurboTasksExecutionError::Error(error) => {
458 error.source.as_ref().map(|s| s as &dyn Error)
459 }
460 TurboTasksExecutionError::TaskContext(context_error) => {
461 context_error.source.as_ref().map(|s| s as &dyn Error)
462 }
463 }
464 }
465}
466
467impl Display for TurboTasksExecutionError {
468 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469 match self {
470 TurboTasksExecutionError::Panic(panic) => write!(f, "{}", &panic),
471 TurboTasksExecutionError::Error(error) => {
472 write!(f, "{}", error.message)
473 }
474 TurboTasksExecutionError::TaskContext(context_error) => {
475 #[cfg(feature = "task_id_details")]
476 if let Some(task_id) = context_error.task_id {
477 return write!(
478 f,
479 "Execution of {} ({}) failed",
480 context_error.task, task_id
481 );
482 }
483 write!(f, "Execution of {} failed", context_error.task)
484 }
485 }
486 }
487}
488
489impl<'l> From<&'l (dyn std::error::Error + 'static)> for TurboTasksExecutionError {
490 fn from(err: &'l (dyn std::error::Error + 'static)) -> Self {
491 if let Some(err) = err.downcast_ref::<TurboTasksExecutionError>() {
492 return err.clone();
493 }
494 let message = err.to_string();
495 let source = err.source().map(|source| source.into());
496
497 TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
498 message: TurboTasksExecutionErrorMessage::NonPIISafe(message),
499 source,
500 }))
501 }
502}
503
504impl From<anyhow::Error> for TurboTasksExecutionError {
505 fn from(err: anyhow::Error) -> Self {
506 let current: &(dyn std::error::Error + 'static) = err.as_ref();
507 current.into()
508 }
509}
510
511pub enum VerificationMode {
512 EqualityCheck,
513 Skip,
514}
515
516pub trait Backend: Sync + Send {
517 #[allow(unused_variables)]
518 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
519
520 #[allow(unused_variables)]
521 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
522 #[allow(unused_variables)]
523 fn stopping(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
524
525 #[allow(unused_variables)]
526 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
527 #[allow(unused_variables)]
528 fn idle_end(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
529
530 fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
531
532 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
533 fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
534
535 fn invalidate_serialization(
536 &self,
537 _task: TaskId,
538 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
539 ) {
540 }
541
542 fn get_task_description(&self, task: TaskId) -> String;
543
544 fn try_start_task_execution<'a>(
545 &'a self,
546 task: TaskId,
547 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
548 ) -> Option<TaskExecutionSpec<'a>>;
549
550 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
551
552 fn task_execution_completed(
553 &self,
554 task: TaskId,
555 result: Result<RawVc, TurboTasksExecutionError>,
556 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
557 stateful: bool,
558 has_invalidator: bool,
559 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
560 ) -> bool;
561
562 type BackendJob: Send + 'static;
563
564 fn run_backend_job<'a>(
565 &'a self,
566 job: Self::BackendJob,
567 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
568 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
569
570 fn try_read_task_output(
573 &self,
574 task: TaskId,
575 reader: Option<TaskId>,
576 options: ReadOutputOptions,
577 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
578 ) -> Result<Result<RawVc, EventListener>>;
579
580 fn try_read_task_cell(
583 &self,
584 task: TaskId,
585 index: CellId,
586 reader: Option<TaskId>,
587 options: ReadCellOptions,
588 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
589 ) -> Result<Result<TypedCellContent, EventListener>>;
590
591 fn try_read_own_task_cell(
594 &self,
595 current_task: TaskId,
596 index: CellId,
597 options: ReadCellOptions,
598 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
599 ) -> Result<TypedCellContent> {
600 match self.try_read_task_cell(current_task, index, None, options, turbo_tasks)? {
601 Ok(content) => Ok(content),
602 Err(_) => Ok(TypedCellContent(index.type_id, CellContent(None))),
603 }
604 }
605
606 fn read_task_collectibles(
609 &self,
610 task: TaskId,
611 trait_id: TraitTypeId,
612 reader: Option<TaskId>,
613 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
614 ) -> TaskCollectiblesMap;
615
616 fn emit_collectible(
617 &self,
618 trait_type: TraitTypeId,
619 collectible: RawVc,
620 task: TaskId,
621 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
622 );
623
624 fn unemit_collectible(
625 &self,
626 trait_type: TraitTypeId,
627 collectible: RawVc,
628 count: u32,
629 task: TaskId,
630 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
631 );
632
633 fn update_task_cell(
634 &self,
635 task: TaskId,
636 index: CellId,
637 content: CellContent,
638 verification_mode: VerificationMode,
639 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
640 );
641
642 fn get_or_create_persistent_task(
643 &self,
644 task_type: CachedTaskType,
645 parent_task: Option<TaskId>,
646 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
647 ) -> TaskId;
648
649 fn get_or_create_transient_task(
650 &self,
651 task_type: CachedTaskType,
652 parent_task: Option<TaskId>,
653 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
654 ) -> TaskId;
655
656 fn connect_task(
657 &self,
658 task: TaskId,
659 parent_task: Option<TaskId>,
660 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
661 );
662
663 fn mark_own_task_as_finished(
664 &self,
665 _task: TaskId,
666 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
667 ) {
668 }
670
671 fn set_own_task_aggregation_number(
672 &self,
673 _task: TaskId,
674 _aggregation_number: u32,
675 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
676 ) {
677 }
679
680 fn mark_own_task_as_session_dependent(
681 &self,
682 _task: TaskId,
683 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
684 ) {
685 }
687
688 fn create_transient_task(
689 &self,
690 task_type: TransientTaskType,
691 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
692 ) -> TaskId;
693
694 fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
695
696 fn task_statistics(&self) -> &TaskStatisticsApi;
697
698 fn is_tracking_dependencies(&self) -> bool;
699}