1use std::{
2 borrow::Cow,
3 error::Error,
4 fmt::{self, Debug, Display},
5 future::Future,
6 hash::{BuildHasherDefault, Hash},
7 pin::Pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use anyhow::{Result, anyhow};
13use auto_hash_map::AutoMap;
14use rustc_hash::FxHasher;
15use serde::{Deserialize, Serialize};
16use tracing::Span;
17use turbo_rcstr::RcStr;
18
19use crate::{
20 RawVc, ReadCellOptions, ReadRef, SharedReference, TaskId, TaskIdSet, TraitRef, TraitTypeId,
21 TurboTasksPanic, ValueTypeId, VcRead, VcValueTrait, VcValueType,
22 event::EventListener,
23 macro_helpers::NativeFunction,
24 magic_any::MagicAny,
25 manager::{ReadConsistency, TurboTasksBackendApi},
26 raw_vc::CellId,
27 registry,
28 task::shared_reference::TypedSharedReference,
29 task_statistics::TaskStatisticsApi,
30 triomphe_utils::unchecked_sidecast_triomphe_arc,
31};
32
33pub type TransientTaskRoot =
34 Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<RawVc>> + Send>> + Send + Sync>;
35
36pub enum TransientTaskType {
37 Root(TransientTaskRoot),
43
44 Once(Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>),
54}
55
56impl Debug for TransientTaskType {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 match self {
59 Self::Root(_) => f.debug_tuple("Root").finish(),
60 Self::Once(_) => f.debug_tuple("Once").finish(),
61 }
62 }
63}
64
65#[derive(Debug, Eq)]
68pub struct CachedTaskType {
69 pub native_fn: &'static NativeFunction,
70 pub this: Option<RawVc>,
71 pub arg: Box<dyn MagicAny>,
72}
73
74impl CachedTaskType {
75 pub fn get_name(&self) -> &'static str {
78 self.native_fn.name
79 }
80}
81
82impl PartialEq for CachedTaskType {
85 #[expect(clippy::op_ref)]
86 fn eq(&self, other: &Self) -> bool {
87 self.native_fn == other.native_fn && self.this == other.this && &self.arg == &other.arg
88 }
89}
90
91impl Hash for CachedTaskType {
94 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
95 self.native_fn.hash(state);
96 self.this.hash(state);
97 self.arg.hash(state);
98 }
99}
100
101impl Display for CachedTaskType {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 f.write_str(self.get_name())
104 }
105}
106
107mod ser {
108 use std::any::Any;
109
110 use serde::{
111 Deserialize, Deserializer, Serialize, Serializer,
112 de::{self},
113 ser::{SerializeSeq, SerializeTuple},
114 };
115
116 use super::*;
117
118 impl Serialize for TypedCellContent {
119 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
120 where
121 S: Serializer,
122 {
123 let value_type = registry::get_value_type(self.0);
124 let serializable = if let Some(value) = &self.1.0 {
125 value_type.any_as_serializable(&value.0)
126 } else {
127 None
128 };
129 let mut state = serializer.serialize_tuple(3)?;
130 state.serialize_element(registry::get_value_type_global_name(self.0))?;
131 if let Some(serializable) = serializable {
132 state.serialize_element(&true)?;
133 state.serialize_element(serializable)?;
134 } else {
135 state.serialize_element(&false)?;
136 state.serialize_element(&())?;
137 }
138 state.end()
139 }
140 }
141
142 impl<'de> Deserialize<'de> for TypedCellContent {
143 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
144 where
145 D: Deserializer<'de>,
146 {
147 struct Visitor;
148
149 impl<'de> serde::de::Visitor<'de> for Visitor {
150 type Value = TypedCellContent;
151
152 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
153 write!(formatter, "a valid TypedCellContent")
154 }
155
156 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
157 where
158 A: de::SeqAccess<'de>,
159 {
160 let value_type = seq
161 .next_element()?
162 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
163 let value_type = registry::get_value_type_id_by_global_name(value_type)
164 .ok_or_else(|| de::Error::custom("Unknown value type"))?;
165 let has_value: bool = seq
166 .next_element()?
167 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
168 if has_value {
169 let seed = registry::get_value_type(value_type)
170 .get_any_deserialize_seed()
171 .ok_or_else(|| {
172 de::Error::custom("Value type doesn't support deserialization")
173 })?;
174 let value = seq
175 .next_element_seed(seed)?
176 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
177 let arc = triomphe::Arc::<dyn Any + Send + Sync>::from(value);
178 Ok(TypedCellContent(
179 value_type,
180 CellContent(Some(SharedReference(arc))),
181 ))
182 } else {
183 let () = seq
184 .next_element()?
185 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
186 Ok(TypedCellContent(value_type, CellContent(None)))
187 }
188 }
189 }
190
191 deserializer.deserialize_tuple(2, Visitor)
192 }
193 }
194
195 enum FunctionAndArg<'a> {
196 Owned {
197 native_fn: &'static NativeFunction,
198 arg: Box<dyn MagicAny>,
199 },
200 Borrowed {
201 native_fn: &'static NativeFunction,
202 arg: &'a dyn MagicAny,
203 },
204 }
205
206 impl Serialize for FunctionAndArg<'_> {
207 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
208 where
209 S: Serializer,
210 {
211 let FunctionAndArg::Borrowed { native_fn, arg } = self else {
212 unreachable!();
213 };
214 let mut state = serializer.serialize_seq(Some(2))?;
215 state.serialize_element(native_fn.global_name())?;
216 let arg = *arg;
217 let arg = native_fn.arg_meta.as_serialize(arg);
218 state.serialize_element(arg)?;
219 state.end()
220 }
221 }
222
223 impl<'de> Deserialize<'de> for FunctionAndArg<'de> {
224 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
225 struct Visitor;
226 impl<'de> serde::de::Visitor<'de> for Visitor {
227 type Value = FunctionAndArg<'de>;
228
229 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
230 write!(formatter, "a valid FunctionAndArg")
231 }
232
233 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
234 where
235 A: serde::de::SeqAccess<'de>,
236 {
237 let fn_name = seq
238 .next_element()?
239 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
240 let native_fn = registry::get_function_by_global_name(fn_name);
241 let seed = native_fn.arg_meta.deserialization_seed();
242 let arg = seq
243 .next_element_seed(seed)?
244 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
245 Ok(FunctionAndArg::Owned { native_fn, arg })
246 }
247 }
248 deserializer.deserialize_seq(Visitor)
249 }
250 }
251
252 impl Serialize for CachedTaskType {
253 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
254 where
255 S: ser::Serializer,
256 {
257 let CachedTaskType {
258 native_fn,
259 this,
260 arg,
261 } = self;
262 let mut s = serializer.serialize_tuple(2)?;
263 s.serialize_element(&FunctionAndArg::Borrowed {
264 native_fn,
265 arg: &**arg,
266 })?;
267 s.serialize_element(this)?;
268 s.end()
269 }
270 }
271
272 impl<'de> Deserialize<'de> for CachedTaskType {
273 fn deserialize<D: ser::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
274 struct Visitor;
275 impl<'de> serde::de::Visitor<'de> for Visitor {
276 type Value = CachedTaskType;
277
278 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
279 write!(formatter, "a valid PersistentTaskType")
280 }
281
282 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
283 where
284 A: serde::de::SeqAccess<'de>,
285 {
286 let FunctionAndArg::Owned { native_fn, arg } = seq
287 .next_element()?
288 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?
289 else {
290 unreachable!();
291 };
292 let this = seq
293 .next_element()?
294 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
295 Ok(CachedTaskType {
296 native_fn,
297 this,
298 arg,
299 })
300 }
301 }
302 deserializer.deserialize_tuple(2, Visitor)
303 }
304 }
305}
306
307pub struct TaskExecutionSpec<'a> {
308 pub future: Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'a>>,
309 pub span: Span,
310}
311
312#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
313pub struct CellContent(pub Option<SharedReference>);
314#[derive(Clone, Debug, PartialEq, Eq, Hash)]
315pub struct TypedCellContent(pub ValueTypeId, pub CellContent);
316
317impl Display for CellContent {
318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 match &self.0 {
320 None => write!(f, "empty"),
321 Some(content) => Display::fmt(content, f),
322 }
323 }
324}
325
326impl TypedCellContent {
327 pub fn cast<T: VcValueType>(self) -> Result<ReadRef<T>> {
328 let data = self.1.0.ok_or_else(|| anyhow!("Cell is empty"))?;
329 let data = data
330 .downcast::<<T::Read as VcRead<T>>::Repr>()
331 .map_err(|_err| anyhow!("Unexpected type in cell"))?;
332 let data = unsafe { unchecked_sidecast_triomphe_arc(data) };
335 Ok(ReadRef::new_arc(data))
336 }
337
338 pub fn cast_trait<T>(self) -> Result<TraitRef<T>>
343 where
344 T: VcValueTrait + ?Sized,
345 {
346 let shared_reference = self
347 .1
348 .0
349 .ok_or_else(|| anyhow!("Cell is empty"))?
350 .into_typed(self.0);
351 Ok(
352 TraitRef::new(shared_reference),
354 )
355 }
356
357 pub fn into_untyped(self) -> CellContent {
358 self.1
359 }
360}
361
362impl From<TypedSharedReference> for TypedCellContent {
363 fn from(value: TypedSharedReference) -> Self {
364 TypedCellContent(value.type_id, CellContent(Some(value.reference)))
365 }
366}
367
368impl TryFrom<TypedCellContent> for TypedSharedReference {
369 type Error = TypedCellContent;
370
371 fn try_from(content: TypedCellContent) -> Result<Self, TypedCellContent> {
372 if let TypedCellContent(type_id, CellContent(Some(reference))) = content {
373 Ok(TypedSharedReference { type_id, reference })
374 } else {
375 Err(content)
376 }
377 }
378}
379
380impl CellContent {
381 pub fn into_typed(self, type_id: ValueTypeId) -> TypedCellContent {
382 TypedCellContent(type_id, self)
383 }
384}
385
386impl From<SharedReference> for CellContent {
387 fn from(value: SharedReference) -> Self {
388 CellContent(Some(value))
389 }
390}
391
392impl From<Option<SharedReference>> for CellContent {
393 fn from(value: Option<SharedReference>) -> Self {
394 CellContent(value)
395 }
396}
397
398impl TryFrom<CellContent> for SharedReference {
399 type Error = CellContent;
400
401 fn try_from(content: CellContent) -> Result<Self, CellContent> {
402 if let CellContent(Some(shared_reference)) = content {
403 Ok(shared_reference)
404 } else {
405 Err(content)
406 }
407 }
408}
409
410pub type TaskCollectiblesMap = AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1>;
411
412#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
415pub enum TurboTasksExecutionErrorMessage {
416 PIISafe(Cow<'static, str>),
417 NonPIISafe(String),
418}
419
420impl Display for TurboTasksExecutionErrorMessage {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 match self {
423 TurboTasksExecutionErrorMessage::PIISafe(msg) => write!(f, "{msg}"),
424 TurboTasksExecutionErrorMessage::NonPIISafe(msg) => write!(f, "{msg}"),
425 }
426 }
427}
428
429#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
430pub struct TurboTasksError {
431 pub message: TurboTasksExecutionErrorMessage,
432 pub source: Option<TurboTasksExecutionError>,
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
436pub struct TurboTaskContextError {
437 pub task: RcStr,
438 pub source: Option<TurboTasksExecutionError>,
439}
440
441#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
442pub enum TurboTasksExecutionError {
443 Panic(Arc<TurboTasksPanic>),
444 Error(Arc<TurboTasksError>),
445 TaskContext(Arc<TurboTaskContextError>),
446}
447
448impl TurboTasksExecutionError {
449 pub fn with_task_context(&self, task: impl Display) -> Self {
450 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
451 task: RcStr::from(task.to_string()),
452 source: Some(self.clone()),
453 }))
454 }
455}
456
457impl Error for TurboTasksExecutionError {
458 fn source(&self) -> Option<&(dyn Error + 'static)> {
459 match self {
460 TurboTasksExecutionError::Panic(_panic) => None,
461 TurboTasksExecutionError::Error(error) => {
462 error.source.as_ref().map(|s| s as &dyn Error)
463 }
464 TurboTasksExecutionError::TaskContext(context_error) => {
465 context_error.source.as_ref().map(|s| s as &dyn Error)
466 }
467 }
468 }
469}
470
471impl Display for TurboTasksExecutionError {
472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473 match self {
474 TurboTasksExecutionError::Panic(panic) => write!(f, "{}", &panic),
475 TurboTasksExecutionError::Error(error) => {
476 write!(f, "{}", error.message)
477 }
478 TurboTasksExecutionError::TaskContext(context_error) => {
479 write!(f, "Execution of {} failed", context_error.task)
480 }
481 }
482 }
483}
484
485impl<'l> From<&'l (dyn std::error::Error + 'static)> for TurboTasksExecutionError {
486 fn from(err: &'l (dyn std::error::Error + 'static)) -> Self {
487 if let Some(err) = err.downcast_ref::<TurboTasksExecutionError>() {
488 return err.clone();
489 }
490 let message = err.to_string();
491 let source = err.source().map(|source| source.into());
492
493 TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
494 message: TurboTasksExecutionErrorMessage::NonPIISafe(message),
495 source,
496 }))
497 }
498}
499
500impl From<anyhow::Error> for TurboTasksExecutionError {
501 fn from(err: anyhow::Error) -> Self {
502 let current: &(dyn std::error::Error + 'static) = err.as_ref();
503 current.into()
504 }
505}
506
507pub trait Backend: Sync + Send {
508 #[allow(unused_variables)]
509 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
510
511 #[allow(unused_variables)]
512 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
513 #[allow(unused_variables)]
514 fn stopping(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
515
516 #[allow(unused_variables)]
517 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
518 #[allow(unused_variables)]
519 fn idle_end(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
520
521 fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
522
523 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
524 fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
525
526 fn invalidate_serialization(
527 &self,
528 _task: TaskId,
529 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
530 ) {
531 }
532
533 fn get_task_description(&self, task: TaskId) -> String;
534
535 type TaskState: Send + Sync + 'static;
547
548 fn new_task_state(&self, task: TaskId) -> Self::TaskState;
557
558 fn try_start_task_execution<'a>(
559 &'a self,
560 task: TaskId,
561 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
562 ) -> Option<TaskExecutionSpec<'a>>;
563
564 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
565
566 fn task_execution_result(
567 &self,
568 task_id: TaskId,
569 result: Result<RawVc, TurboTasksExecutionError>,
570 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
571 );
572
573 fn task_execution_completed(
574 &self,
575 task: TaskId,
576 duration: Duration,
577 memory_usage: usize,
578 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
579 stateful: bool,
580 has_invalidator: bool,
581 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
582 ) -> bool;
583
584 type BackendJob: Send + 'static;
585
586 fn run_backend_job<'a>(
587 &'a self,
588 job: Self::BackendJob,
589 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
590 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
591
592 fn try_read_task_output(
593 &self,
594 task: TaskId,
595 reader: TaskId,
596 consistency: ReadConsistency,
597 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
598 ) -> Result<Result<RawVc, EventListener>>;
599
600 fn try_read_task_output_untracked(
603 &self,
604 task: TaskId,
605 consistency: ReadConsistency,
606 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
607 ) -> Result<Result<RawVc, EventListener>>;
608
609 fn try_read_task_cell(
610 &self,
611 task: TaskId,
612 index: CellId,
613 reader: TaskId,
614 options: ReadCellOptions,
615 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
616 ) -> Result<Result<TypedCellContent, EventListener>>;
617
618 fn try_read_task_cell_untracked(
621 &self,
622 task: TaskId,
623 index: CellId,
624 options: ReadCellOptions,
625 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
626 ) -> Result<Result<TypedCellContent, EventListener>>;
627
628 fn try_read_own_task_cell_untracked(
631 &self,
632 current_task: TaskId,
633 index: CellId,
634 options: ReadCellOptions,
635 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
636 ) -> Result<TypedCellContent> {
637 match self.try_read_task_cell_untracked(current_task, index, options, turbo_tasks)? {
638 Ok(content) => Ok(content),
639 Err(_) => Ok(TypedCellContent(index.type_id, CellContent(None))),
640 }
641 }
642
643 fn read_task_collectibles(
644 &self,
645 task: TaskId,
646 trait_id: TraitTypeId,
647 reader: TaskId,
648 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
649 ) -> TaskCollectiblesMap;
650
651 fn emit_collectible(
652 &self,
653 trait_type: TraitTypeId,
654 collectible: RawVc,
655 task: TaskId,
656 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
657 );
658
659 fn unemit_collectible(
660 &self,
661 trait_type: TraitTypeId,
662 collectible: RawVc,
663 count: u32,
664 task: TaskId,
665 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
666 );
667
668 fn update_task_cell(
669 &self,
670 task: TaskId,
671 index: CellId,
672 content: CellContent,
673 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
674 );
675
676 fn get_or_create_persistent_task(
677 &self,
678 task_type: CachedTaskType,
679 parent_task: TaskId,
680 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
681 ) -> TaskId;
682
683 fn get_or_create_transient_task(
684 &self,
685 task_type: CachedTaskType,
686 parent_task: TaskId,
687 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
688 ) -> TaskId;
689
690 fn connect_task(
691 &self,
692 task: TaskId,
693 parent_task: TaskId,
694 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
695 );
696
697 fn mark_own_task_as_finished(
698 &self,
699 _task: TaskId,
700 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
701 ) {
702 }
704
705 fn set_own_task_aggregation_number(
706 &self,
707 _task: TaskId,
708 _aggregation_number: u32,
709 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
710 ) {
711 }
713
714 fn mark_own_task_as_session_dependent(
715 &self,
716 _task: TaskId,
717 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
718 ) {
719 }
721
722 fn create_transient_task(
723 &self,
724 task_type: TransientTaskType,
725 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
726 ) -> TaskId;
727
728 fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
729
730 fn task_statistics(&self) -> &TaskStatisticsApi;
731}