1use std::{
2 error::Error,
3 fmt::{self, Debug, Display},
4 future::Future,
5 hash::{BuildHasherDefault, Hash},
6 pin::Pin,
7 sync::Arc,
8 time::Duration,
9};
10
11use anyhow::{Result, anyhow};
12use auto_hash_map::AutoMap;
13use rustc_hash::FxHasher;
14use tracing::Span;
15
16pub use crate::id::BackendJobId;
17use crate::{
18 FunctionId, RawVc, ReadCellOptions, ReadRef, SharedReference, TaskId, TaskIdSet, TraitRef,
19 TraitTypeId, TurboTasksPanic, ValueTypeId, VcRead, VcValueTrait, VcValueType,
20 event::EventListener,
21 magic_any::MagicAny,
22 manager::{ReadConsistency, TurboTasksBackendApi},
23 raw_vc::CellId,
24 registry,
25 task::shared_reference::TypedSharedReference,
26 task_statistics::TaskStatisticsApi,
27 triomphe_utils::unchecked_sidecast_triomphe_arc,
28};
29
30pub type TransientTaskRoot =
31 Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<RawVc>> + Send>> + Send + Sync>;
32
33pub enum TransientTaskType {
34 Root(TransientTaskRoot),
40
41 Once(Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>),
51}
52
53impl Debug for TransientTaskType {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 Self::Root(_) => f.debug_tuple("Root").finish(),
57 Self::Once(_) => f.debug_tuple("Once").finish(),
58 }
59 }
60}
61
62#[derive(Debug, Eq)]
65pub struct CachedTaskType {
66 pub fn_type: FunctionId,
67 pub this: Option<RawVc>,
68 pub arg: Box<dyn MagicAny>,
69}
70
71impl CachedTaskType {
72 pub fn get_name(&self) -> &'static str {
75 ®istry::get_function(self.fn_type).name
76 }
77}
78
79impl PartialEq for CachedTaskType {
82 #[expect(clippy::op_ref)]
83 fn eq(&self, other: &Self) -> bool {
84 self.fn_type == other.fn_type && self.this == other.this && &self.arg == &other.arg
85 }
86}
87
88impl Hash for CachedTaskType {
91 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
92 self.fn_type.hash(state);
93 self.this.hash(state);
94 self.arg.hash(state);
95 }
96}
97
98impl Display for CachedTaskType {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.write_str(self.get_name())
101 }
102}
103
104mod ser {
105 use std::any::Any;
106
107 use serde::{
108 Deserialize, Deserializer, Serialize, Serializer,
109 de::{self},
110 ser::{SerializeSeq, SerializeTuple},
111 };
112
113 use super::*;
114
115 impl Serialize for TypedCellContent {
116 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
117 where
118 S: Serializer,
119 {
120 let value_type = registry::get_value_type(self.0);
121 let serializable = if let Some(value) = &self.1.0 {
122 value_type.any_as_serializable(&value.0)
123 } else {
124 None
125 };
126 let mut state = serializer.serialize_tuple(3)?;
127 state.serialize_element(registry::get_value_type_global_name(self.0))?;
128 if let Some(serializable) = serializable {
129 state.serialize_element(&true)?;
130 state.serialize_element(serializable)?;
131 } else {
132 state.serialize_element(&false)?;
133 state.serialize_element(&())?;
134 }
135 state.end()
136 }
137 }
138
139 impl<'de> Deserialize<'de> for TypedCellContent {
140 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
141 where
142 D: Deserializer<'de>,
143 {
144 struct Visitor;
145
146 impl<'de> serde::de::Visitor<'de> for Visitor {
147 type Value = TypedCellContent;
148
149 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
150 write!(formatter, "a valid TypedCellContent")
151 }
152
153 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
154 where
155 A: de::SeqAccess<'de>,
156 {
157 let value_type = seq
158 .next_element()?
159 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
160 let value_type = registry::get_value_type_id_by_global_name(value_type)
161 .ok_or_else(|| de::Error::custom("Unknown value type"))?;
162 let has_value: bool = seq
163 .next_element()?
164 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
165 if has_value {
166 let seed = registry::get_value_type(value_type)
167 .get_any_deserialize_seed()
168 .ok_or_else(|| {
169 de::Error::custom("Value type doesn't support deserialization")
170 })?;
171 let value = seq
172 .next_element_seed(seed)?
173 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
174 let arc = triomphe::Arc::<dyn Any + Send + Sync>::from(value);
175 Ok(TypedCellContent(
176 value_type,
177 CellContent(Some(SharedReference(arc))),
178 ))
179 } else {
180 let () = seq
181 .next_element()?
182 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
183 Ok(TypedCellContent(value_type, CellContent(None)))
184 }
185 }
186 }
187
188 deserializer.deserialize_tuple(2, Visitor)
189 }
190 }
191
192 enum FunctionAndArg<'a> {
193 Owned {
194 fn_type: FunctionId,
195 arg: Box<dyn MagicAny>,
196 },
197 Borrowed {
198 fn_type: FunctionId,
199 arg: &'a dyn MagicAny,
200 },
201 }
202
203 impl Serialize for FunctionAndArg<'_> {
204 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
205 where
206 S: Serializer,
207 {
208 let FunctionAndArg::Borrowed { fn_type, arg } = self else {
209 unreachable!();
210 };
211 let mut state = serializer.serialize_seq(Some(2))?;
212 state.serialize_element(&fn_type)?;
213 let arg = *arg;
214 let arg = registry::get_function(*fn_type).arg_meta.as_serialize(arg);
215 state.serialize_element(arg)?;
216 state.end()
217 }
218 }
219
220 impl<'de> Deserialize<'de> for FunctionAndArg<'de> {
221 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
222 struct Visitor;
223 impl<'de> serde::de::Visitor<'de> for Visitor {
224 type Value = FunctionAndArg<'de>;
225
226 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
227 write!(formatter, "a valid FunctionAndArg")
228 }
229
230 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
231 where
232 A: serde::de::SeqAccess<'de>,
233 {
234 let fn_type = seq
235 .next_element()?
236 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
237 let seed = registry::get_function(fn_type)
238 .arg_meta
239 .deserialization_seed();
240 let arg = seq
241 .next_element_seed(seed)?
242 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
243 Ok(FunctionAndArg::Owned { fn_type, arg })
244 }
245 }
246 deserializer.deserialize_seq(Visitor)
247 }
248 }
249
250 impl Serialize for CachedTaskType {
251 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
252 where
253 S: ser::Serializer,
254 {
255 let CachedTaskType { fn_type, this, arg } = self;
256 let mut s = serializer.serialize_tuple(2)?;
257 s.serialize_element(&FunctionAndArg::Borrowed {
258 fn_type: *fn_type,
259 arg: &**arg,
260 })?;
261 s.serialize_element(this)?;
262 s.end()
263 }
264 }
265
266 impl<'de> Deserialize<'de> for CachedTaskType {
267 fn deserialize<D: ser::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
268 struct Visitor;
269 impl<'de> serde::de::Visitor<'de> for Visitor {
270 type Value = CachedTaskType;
271
272 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
273 write!(formatter, "a valid PersistentTaskType")
274 }
275
276 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
277 where
278 A: serde::de::SeqAccess<'de>,
279 {
280 let FunctionAndArg::Owned { fn_type, arg } = seq
281 .next_element()?
282 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?
283 else {
284 unreachable!();
285 };
286 let this = seq
287 .next_element()?
288 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
289 Ok(CachedTaskType { fn_type, this, arg })
290 }
291 }
292 deserializer.deserialize_tuple(2, Visitor)
293 }
294 }
295}
296
297pub struct TaskExecutionSpec<'a> {
298 pub future: Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'a>>,
299 pub span: Span,
300}
301
302#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
303pub struct CellContent(pub Option<SharedReference>);
304#[derive(Clone, Debug, PartialEq, Eq, Hash)]
305pub struct TypedCellContent(pub ValueTypeId, pub CellContent);
306
307impl Display for CellContent {
308 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309 match &self.0 {
310 None => write!(f, "empty"),
311 Some(content) => Display::fmt(content, f),
312 }
313 }
314}
315
316impl TypedCellContent {
317 pub fn cast<T: VcValueType>(self) -> Result<ReadRef<T>> {
318 let data = self.1.0.ok_or_else(|| anyhow!("Cell is empty"))?;
319 let data = data
320 .downcast::<<T::Read as VcRead<T>>::Repr>()
321 .map_err(|_err| anyhow!("Unexpected type in cell"))?;
322 let data = unsafe { unchecked_sidecast_triomphe_arc(data) };
325 Ok(ReadRef::new_arc(data))
326 }
327
328 pub fn cast_trait<T>(self) -> Result<TraitRef<T>>
333 where
334 T: VcValueTrait + ?Sized,
335 {
336 let shared_reference = self
337 .1
338 .0
339 .ok_or_else(|| anyhow!("Cell is empty"))?
340 .into_typed(self.0);
341 Ok(
342 TraitRef::new(shared_reference),
344 )
345 }
346
347 pub fn into_untyped(self) -> CellContent {
348 self.1
349 }
350}
351
352impl From<TypedSharedReference> for TypedCellContent {
353 fn from(value: TypedSharedReference) -> Self {
354 TypedCellContent(value.0, CellContent(Some(value.1)))
355 }
356}
357
358impl TryFrom<TypedCellContent> for TypedSharedReference {
359 type Error = TypedCellContent;
360
361 fn try_from(content: TypedCellContent) -> Result<Self, TypedCellContent> {
362 if let TypedCellContent(type_id, CellContent(Some(shared_reference))) = content {
363 Ok(TypedSharedReference(type_id, shared_reference))
364 } else {
365 Err(content)
366 }
367 }
368}
369
370impl CellContent {
371 pub fn into_typed(self, type_id: ValueTypeId) -> TypedCellContent {
372 TypedCellContent(type_id, self)
373 }
374}
375
376impl From<SharedReference> for CellContent {
377 fn from(value: SharedReference) -> Self {
378 CellContent(Some(value))
379 }
380}
381
382impl From<Option<SharedReference>> for CellContent {
383 fn from(value: Option<SharedReference>) -> Self {
384 CellContent(value)
385 }
386}
387
388impl TryFrom<CellContent> for SharedReference {
389 type Error = CellContent;
390
391 fn try_from(content: CellContent) -> Result<Self, CellContent> {
392 if let CellContent(Some(shared_reference)) = content {
393 Ok(shared_reference)
394 } else {
395 Err(content)
396 }
397 }
398}
399
400pub type TaskCollectiblesMap = AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1>;
401
402#[derive(Clone, Debug)]
405pub enum TurboTasksExecutionErrorMessage {
406 PIISafe(&'static str),
407 NonPIISafe(String),
408}
409
410impl Display for TurboTasksExecutionErrorMessage {
411 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412 match self {
413 TurboTasksExecutionErrorMessage::PIISafe(msg) => write!(f, "{msg}"),
414 TurboTasksExecutionErrorMessage::NonPIISafe(msg) => write!(f, "{msg}"),
415 }
416 }
417}
418
419#[derive(Clone, Debug)]
420pub enum TurboTasksExecutionError {
421 Panic(Arc<TurboTasksPanic>),
422 Error {
423 message: TurboTasksExecutionErrorMessage,
424 source: Option<Arc<TurboTasksExecutionError>>,
425 },
426}
427
428impl Error for TurboTasksExecutionError {
429 fn source(&self) -> Option<&(dyn Error + 'static)> {
430 match self {
431 TurboTasksExecutionError::Panic(_panic) => None,
432 TurboTasksExecutionError::Error { source, .. } => {
433 source.as_ref().map(|s| s as &dyn Error)
434 }
435 }
436 }
437}
438
439impl Display for TurboTasksExecutionError {
440 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441 match self {
442 TurboTasksExecutionError::Panic(panic) => write!(f, "{}", &panic),
443 TurboTasksExecutionError::Error { message, .. } => {
444 write!(f, "{message}")
445 }
446 }
447 }
448}
449
450impl From<anyhow::Error> for TurboTasksExecutionError {
451 fn from(err: anyhow::Error) -> Self {
452 let mut current: &dyn std::error::Error = err.as_ref();
453 let mut message = current.to_string();
454 let mut source_exec_error = None;
455
456 while let Some(current_source) = current.source() {
459 if let Some(turbo_error) =
460 current_source.downcast_ref::<Arc<TurboTasksExecutionError>>()
461 {
462 source_exec_error = Some(turbo_error.clone());
463 break;
464 }
465 message.push_str(&format!("\n{current_source}"));
466 current = current_source;
467 }
468
469 TurboTasksExecutionError::Error {
470 message: TurboTasksExecutionErrorMessage::NonPIISafe(message),
471 source: source_exec_error,
472 }
473 }
474}
475
476pub trait Backend: Sync + Send {
477 #[allow(unused_variables)]
478 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
479
480 #[allow(unused_variables)]
481 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
482 #[allow(unused_variables)]
483 fn stopping(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
484
485 #[allow(unused_variables)]
486 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
487 #[allow(unused_variables)]
488 fn idle_end(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
489
490 fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
491
492 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
493 fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
494
495 fn invalidate_serialization(
496 &self,
497 _task: TaskId,
498 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
499 ) {
500 }
501
502 fn get_task_description(&self, task: TaskId) -> String;
503
504 type TaskState: Send + Sync + 'static;
516
517 fn new_task_state(&self, task: TaskId) -> Self::TaskState;
526
527 fn try_start_task_execution<'a>(
528 &'a self,
529 task: TaskId,
530 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
531 ) -> Option<TaskExecutionSpec<'a>>;
532
533 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
534
535 fn task_execution_result(
536 &self,
537 task_id: TaskId,
538 result: Result<RawVc, Arc<TurboTasksExecutionError>>,
539 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
540 );
541
542 fn task_execution_completed(
543 &self,
544 task: TaskId,
545 duration: Duration,
546 memory_usage: usize,
547 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
548 stateful: bool,
549 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
550 ) -> bool;
551
552 fn run_backend_job<'a>(
553 &'a self,
554 id: BackendJobId,
555 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
556 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
557
558 fn try_read_task_output(
559 &self,
560 task: TaskId,
561 reader: TaskId,
562 consistency: ReadConsistency,
563 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
564 ) -> Result<Result<RawVc, EventListener>>;
565
566 fn try_read_task_output_untracked(
569 &self,
570 task: TaskId,
571 consistency: ReadConsistency,
572 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
573 ) -> Result<Result<RawVc, EventListener>>;
574
575 fn try_read_task_cell(
576 &self,
577 task: TaskId,
578 index: CellId,
579 reader: TaskId,
580 options: ReadCellOptions,
581 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
582 ) -> Result<Result<TypedCellContent, EventListener>>;
583
584 fn try_read_task_cell_untracked(
587 &self,
588 task: TaskId,
589 index: CellId,
590 options: ReadCellOptions,
591 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
592 ) -> Result<Result<TypedCellContent, EventListener>>;
593
594 fn try_read_own_task_cell_untracked(
597 &self,
598 current_task: TaskId,
599 index: CellId,
600 options: ReadCellOptions,
601 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
602 ) -> Result<TypedCellContent> {
603 match self.try_read_task_cell_untracked(current_task, index, options, turbo_tasks)? {
604 Ok(content) => Ok(content),
605 Err(_) => Ok(TypedCellContent(index.type_id, CellContent(None))),
606 }
607 }
608
609 fn read_task_collectibles(
610 &self,
611 task: TaskId,
612 trait_id: TraitTypeId,
613 reader: TaskId,
614 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
615 ) -> TaskCollectiblesMap;
616
617 fn emit_collectible(
618 &self,
619 trait_type: TraitTypeId,
620 collectible: RawVc,
621 task: TaskId,
622 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
623 );
624
625 fn unemit_collectible(
626 &self,
627 trait_type: TraitTypeId,
628 collectible: RawVc,
629 count: u32,
630 task: TaskId,
631 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
632 );
633
634 fn update_task_cell(
635 &self,
636 task: TaskId,
637 index: CellId,
638 content: CellContent,
639 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
640 );
641
642 fn get_or_create_persistent_task(
643 &self,
644 task_type: CachedTaskType,
645 parent_task: TaskId,
646 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
647 ) -> TaskId;
648
649 fn get_or_create_transient_task(
650 &self,
651 task_type: CachedTaskType,
652 parent_task: TaskId,
653 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
654 ) -> TaskId;
655
656 fn try_get_function_id(&self, task_id: TaskId) -> Option<FunctionId>;
659
660 fn connect_task(
661 &self,
662 task: TaskId,
663 parent_task: TaskId,
664 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
665 );
666
667 fn mark_own_task_as_finished(
668 &self,
669 _task: TaskId,
670 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
671 ) {
672 }
674
675 fn set_own_task_aggregation_number(
676 &self,
677 _task: TaskId,
678 _aggregation_number: u32,
679 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
680 ) {
681 }
683
684 fn mark_own_task_as_session_dependent(
685 &self,
686 _task: TaskId,
687 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
688 ) {
689 }
691
692 fn create_transient_task(
693 &self,
694 task_type: TransientTaskType,
695 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
696 ) -> TaskId;
697
698 fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
699
700 fn task_statistics(&self) -> &TaskStatisticsApi;
701}