1use std::{
2 borrow::Cow,
3 error::Error,
4 fmt::{self, Debug, Display},
5 future::Future,
6 hash::{BuildHasherDefault, Hash},
7 pin::Pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use anyhow::{Result, anyhow};
13use auto_hash_map::AutoMap;
14use rustc_hash::FxHasher;
15use serde::{Deserialize, Serialize};
16use tracing::Span;
17use turbo_rcstr::RcStr;
18
19use crate::{
20 RawVc, ReadCellOptions, ReadOutputOptions, ReadRef, SharedReference, TaskId, TaskIdSet,
21 TraitRef, TraitTypeId, TurboTasksPanic, ValueTypeId, VcRead, VcValueTrait, VcValueType,
22 event::EventListener, macro_helpers::NativeFunction, magic_any::MagicAny,
23 manager::TurboTasksBackendApi, raw_vc::CellId, registry,
24 task::shared_reference::TypedSharedReference, task_statistics::TaskStatisticsApi,
25 triomphe_utils::unchecked_sidecast_triomphe_arc,
26};
27
28pub type TransientTaskRoot =
29 Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<RawVc>> + Send>> + Send + Sync>;
30
31pub enum TransientTaskType {
32 Root(TransientTaskRoot),
38
39 Once(Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>),
49}
50
51impl Debug for TransientTaskType {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
54 Self::Root(_) => f.debug_tuple("Root").finish(),
55 Self::Once(_) => f.debug_tuple("Once").finish(),
56 }
57 }
58}
59
60#[derive(Debug, Eq)]
63pub struct CachedTaskType {
64 pub native_fn: &'static NativeFunction,
65 pub this: Option<RawVc>,
66 pub arg: Box<dyn MagicAny>,
67}
68
69impl CachedTaskType {
70 pub fn get_name(&self) -> &'static str {
73 self.native_fn.name
74 }
75}
76
77impl PartialEq for CachedTaskType {
80 #[expect(clippy::op_ref)]
81 fn eq(&self, other: &Self) -> bool {
82 self.native_fn == other.native_fn && self.this == other.this && &self.arg == &other.arg
83 }
84}
85
86impl Hash for CachedTaskType {
89 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
90 self.native_fn.hash(state);
91 self.this.hash(state);
92 self.arg.hash(state);
93 }
94}
95
96impl Display for CachedTaskType {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 f.write_str(self.get_name())
99 }
100}
101
102mod ser {
103 use std::any::Any;
104
105 use serde::{
106 Deserialize, Deserializer, Serialize, Serializer,
107 de::{self},
108 ser::{SerializeSeq, SerializeTuple},
109 };
110
111 use super::*;
112
113 impl Serialize for TypedCellContent {
114 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
115 where
116 S: Serializer,
117 {
118 let value_type = registry::get_value_type(self.0);
119 let serializable = if let Some(value) = &self.1.0 {
120 value_type.any_as_serializable(&value.0)
121 } else {
122 None
123 };
124 let mut state = serializer.serialize_tuple(3)?;
125 state.serialize_element(&self.0)?;
126 if let Some(serializable) = serializable {
127 state.serialize_element(&true)?;
128 state.serialize_element(serializable)?;
129 } else {
130 state.serialize_element(&false)?;
131 state.serialize_element(&())?;
132 }
133 state.end()
134 }
135 }
136
137 impl<'de> Deserialize<'de> for TypedCellContent {
138 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
139 where
140 D: Deserializer<'de>,
141 {
142 struct Visitor;
143
144 impl<'de> serde::de::Visitor<'de> for Visitor {
145 type Value = TypedCellContent;
146
147 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
148 write!(formatter, "a valid TypedCellContent")
149 }
150
151 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
152 where
153 A: de::SeqAccess<'de>,
154 {
155 let value_type: ValueTypeId = seq
156 .next_element()?
157 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
158 let has_value: bool = seq
159 .next_element()?
160 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
161 if has_value {
162 let seed = registry::get_value_type(value_type)
163 .get_any_deserialize_seed()
164 .ok_or_else(|| {
165 de::Error::custom("Value type doesn't support deserialization")
166 })?;
167 let value = seq
168 .next_element_seed(seed)?
169 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
170 let arc = triomphe::Arc::<dyn Any + Send + Sync>::from(value);
171 Ok(TypedCellContent(
172 value_type,
173 CellContent(Some(SharedReference(arc))),
174 ))
175 } else {
176 let () = seq
177 .next_element()?
178 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
179 Ok(TypedCellContent(value_type, CellContent(None)))
180 }
181 }
182 }
183
184 deserializer.deserialize_tuple(2, Visitor)
185 }
186 }
187
188 enum FunctionAndArg<'a> {
189 Owned {
190 native_fn: &'static NativeFunction,
191 arg: Box<dyn MagicAny>,
192 },
193 Borrowed {
194 native_fn: &'static NativeFunction,
195 arg: &'a dyn MagicAny,
196 },
197 }
198
199 impl Serialize for FunctionAndArg<'_> {
200 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
201 where
202 S: Serializer,
203 {
204 let FunctionAndArg::Borrowed { native_fn, arg } = self else {
205 unreachable!();
206 };
207 let mut state = serializer.serialize_seq(Some(2))?;
208 state.serialize_element(®istry::get_function_id(native_fn))?;
209 let arg = *arg;
210 let arg = native_fn.arg_meta.as_serialize(arg);
211 state.serialize_element(arg)?;
212 state.end()
213 }
214 }
215
216 impl<'de> Deserialize<'de> for FunctionAndArg<'de> {
217 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
218 struct Visitor;
219 impl<'de> serde::de::Visitor<'de> for Visitor {
220 type Value = FunctionAndArg<'de>;
221
222 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
223 write!(formatter, "a valid FunctionAndArg")
224 }
225
226 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
227 where
228 A: serde::de::SeqAccess<'de>,
229 {
230 let fn_id = seq
231 .next_element()?
232 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
233 let native_fn = registry::get_native_function(fn_id);
234 let seed = native_fn.arg_meta.deserialization_seed();
235 let arg = seq
236 .next_element_seed(seed)?
237 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
238 Ok(FunctionAndArg::Owned { native_fn, arg })
239 }
240 }
241 deserializer.deserialize_seq(Visitor)
242 }
243 }
244
245 impl Serialize for CachedTaskType {
246 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
247 where
248 S: ser::Serializer,
249 {
250 let CachedTaskType {
251 native_fn,
252 this,
253 arg,
254 } = self;
255 let mut s = serializer.serialize_tuple(2)?;
256 s.serialize_element(&FunctionAndArg::Borrowed {
257 native_fn,
258 arg: &**arg,
259 })?;
260 s.serialize_element(this)?;
261 s.end()
262 }
263 }
264
265 impl<'de> Deserialize<'de> for CachedTaskType {
266 fn deserialize<D: ser::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
267 struct Visitor;
268 impl<'de> serde::de::Visitor<'de> for Visitor {
269 type Value = CachedTaskType;
270
271 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
272 write!(formatter, "a valid PersistentTaskType")
273 }
274
275 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
276 where
277 A: serde::de::SeqAccess<'de>,
278 {
279 let FunctionAndArg::Owned { native_fn, arg } = seq
280 .next_element()?
281 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?
282 else {
283 unreachable!();
284 };
285 let this = seq
286 .next_element()?
287 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
288 Ok(CachedTaskType {
289 native_fn,
290 this,
291 arg,
292 })
293 }
294 }
295 deserializer.deserialize_tuple(2, Visitor)
296 }
297 }
298}
299
300pub struct TaskExecutionSpec<'a> {
301 pub future: Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'a>>,
302 pub span: Span,
303}
304
305#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
306pub struct CellContent(pub Option<SharedReference>);
307#[derive(Clone, Debug, PartialEq, Eq, Hash)]
308pub struct TypedCellContent(pub ValueTypeId, pub CellContent);
309
310impl Display for CellContent {
311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312 match &self.0 {
313 None => write!(f, "empty"),
314 Some(content) => Display::fmt(content, f),
315 }
316 }
317}
318
319impl TypedCellContent {
320 pub fn cast<T: VcValueType>(self) -> Result<ReadRef<T>> {
321 let data = self.1.0.ok_or_else(|| anyhow!("Cell is empty"))?;
322 let data = data
323 .downcast::<<T::Read as VcRead<T>>::Repr>()
324 .map_err(|_err| anyhow!("Unexpected type in cell"))?;
325 let data = unsafe { unchecked_sidecast_triomphe_arc(data) };
328 Ok(ReadRef::new_arc(data))
329 }
330
331 pub fn cast_trait<T>(self) -> Result<TraitRef<T>>
336 where
337 T: VcValueTrait + ?Sized,
338 {
339 let shared_reference = self
340 .1
341 .0
342 .ok_or_else(|| anyhow!("Cell is empty"))?
343 .into_typed(self.0);
344 Ok(
345 TraitRef::new(shared_reference),
347 )
348 }
349
350 pub fn into_untyped(self) -> CellContent {
351 self.1
352 }
353}
354
355impl From<TypedSharedReference> for TypedCellContent {
356 fn from(value: TypedSharedReference) -> Self {
357 TypedCellContent(value.type_id, CellContent(Some(value.reference)))
358 }
359}
360
361impl TryFrom<TypedCellContent> for TypedSharedReference {
362 type Error = TypedCellContent;
363
364 fn try_from(content: TypedCellContent) -> Result<Self, TypedCellContent> {
365 if let TypedCellContent(type_id, CellContent(Some(reference))) = content {
366 Ok(TypedSharedReference { type_id, reference })
367 } else {
368 Err(content)
369 }
370 }
371}
372
373impl CellContent {
374 pub fn into_typed(self, type_id: ValueTypeId) -> TypedCellContent {
375 TypedCellContent(type_id, self)
376 }
377}
378
379impl From<SharedReference> for CellContent {
380 fn from(value: SharedReference) -> Self {
381 CellContent(Some(value))
382 }
383}
384
385impl From<Option<SharedReference>> for CellContent {
386 fn from(value: Option<SharedReference>) -> Self {
387 CellContent(value)
388 }
389}
390
391impl TryFrom<CellContent> for SharedReference {
392 type Error = CellContent;
393
394 fn try_from(content: CellContent) -> Result<Self, CellContent> {
395 if let CellContent(Some(shared_reference)) = content {
396 Ok(shared_reference)
397 } else {
398 Err(content)
399 }
400 }
401}
402
403pub type TaskCollectiblesMap = AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1>;
404
405#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
408pub enum TurboTasksExecutionErrorMessage {
409 PIISafe(Cow<'static, str>),
410 NonPIISafe(String),
411}
412
413impl Display for TurboTasksExecutionErrorMessage {
414 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415 match self {
416 TurboTasksExecutionErrorMessage::PIISafe(msg) => write!(f, "{msg}"),
417 TurboTasksExecutionErrorMessage::NonPIISafe(msg) => write!(f, "{msg}"),
418 }
419 }
420}
421
422#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
423pub struct TurboTasksError {
424 pub message: TurboTasksExecutionErrorMessage,
425 pub source: Option<TurboTasksExecutionError>,
426}
427
428#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
429pub struct TurboTaskContextError {
430 pub task: RcStr,
431 pub source: Option<TurboTasksExecutionError>,
432}
433
434#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
435pub enum TurboTasksExecutionError {
436 Panic(Arc<TurboTasksPanic>),
437 Error(Arc<TurboTasksError>),
438 TaskContext(Arc<TurboTaskContextError>),
439}
440
441impl TurboTasksExecutionError {
442 pub fn with_task_context(&self, task: impl Display) -> Self {
443 TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
444 task: RcStr::from(task.to_string()),
445 source: Some(self.clone()),
446 }))
447 }
448}
449
450impl Error for TurboTasksExecutionError {
451 fn source(&self) -> Option<&(dyn Error + 'static)> {
452 match self {
453 TurboTasksExecutionError::Panic(_panic) => None,
454 TurboTasksExecutionError::Error(error) => {
455 error.source.as_ref().map(|s| s as &dyn Error)
456 }
457 TurboTasksExecutionError::TaskContext(context_error) => {
458 context_error.source.as_ref().map(|s| s as &dyn Error)
459 }
460 }
461 }
462}
463
464impl Display for TurboTasksExecutionError {
465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466 match self {
467 TurboTasksExecutionError::Panic(panic) => write!(f, "{}", &panic),
468 TurboTasksExecutionError::Error(error) => {
469 write!(f, "{}", error.message)
470 }
471 TurboTasksExecutionError::TaskContext(context_error) => {
472 write!(f, "Execution of {} failed", context_error.task)
473 }
474 }
475 }
476}
477
478impl<'l> From<&'l (dyn std::error::Error + 'static)> for TurboTasksExecutionError {
479 fn from(err: &'l (dyn std::error::Error + 'static)) -> Self {
480 if let Some(err) = err.downcast_ref::<TurboTasksExecutionError>() {
481 return err.clone();
482 }
483 let message = err.to_string();
484 let source = err.source().map(|source| source.into());
485
486 TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
487 message: TurboTasksExecutionErrorMessage::NonPIISafe(message),
488 source,
489 }))
490 }
491}
492
493impl From<anyhow::Error> for TurboTasksExecutionError {
494 fn from(err: anyhow::Error) -> Self {
495 let current: &(dyn std::error::Error + 'static) = err.as_ref();
496 current.into()
497 }
498}
499
500pub trait Backend: Sync + Send {
501 #[allow(unused_variables)]
502 fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
503
504 #[allow(unused_variables)]
505 fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
506 #[allow(unused_variables)]
507 fn stopping(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
508
509 #[allow(unused_variables)]
510 fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
511 #[allow(unused_variables)]
512 fn idle_end(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
513
514 fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
515
516 fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
517 fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
518
519 fn invalidate_serialization(
520 &self,
521 _task: TaskId,
522 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
523 ) {
524 }
525
526 fn get_task_description(&self, task: TaskId) -> String;
527
528 fn try_start_task_execution<'a>(
529 &'a self,
530 task: TaskId,
531 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
532 ) -> Option<TaskExecutionSpec<'a>>;
533
534 fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
535
536 fn task_execution_completed(
537 &self,
538 task: TaskId,
539 duration: Duration,
540 memory_usage: usize,
541 result: Result<RawVc, TurboTasksExecutionError>,
542 cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
543 stateful: bool,
544 has_invalidator: bool,
545 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
546 ) -> bool;
547
548 type BackendJob: Send + 'static;
549
550 fn run_backend_job<'a>(
551 &'a self,
552 job: Self::BackendJob,
553 turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
554 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
555
556 fn try_read_task_output(
559 &self,
560 task: TaskId,
561 reader: Option<TaskId>,
562 options: ReadOutputOptions,
563 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
564 ) -> Result<Result<RawVc, EventListener>>;
565
566 fn try_read_task_cell(
569 &self,
570 task: TaskId,
571 index: CellId,
572 reader: Option<TaskId>,
573 options: ReadCellOptions,
574 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
575 ) -> Result<Result<TypedCellContent, EventListener>>;
576
577 fn try_read_own_task_cell(
580 &self,
581 current_task: TaskId,
582 index: CellId,
583 options: ReadCellOptions,
584 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
585 ) -> Result<TypedCellContent> {
586 match self.try_read_task_cell(current_task, index, None, options, turbo_tasks)? {
587 Ok(content) => Ok(content),
588 Err(_) => Ok(TypedCellContent(index.type_id, CellContent(None))),
589 }
590 }
591
592 fn read_task_collectibles(
595 &self,
596 task: TaskId,
597 trait_id: TraitTypeId,
598 reader: Option<TaskId>,
599 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
600 ) -> TaskCollectiblesMap;
601
602 fn emit_collectible(
603 &self,
604 trait_type: TraitTypeId,
605 collectible: RawVc,
606 task: TaskId,
607 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
608 );
609
610 fn unemit_collectible(
611 &self,
612 trait_type: TraitTypeId,
613 collectible: RawVc,
614 count: u32,
615 task: TaskId,
616 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
617 );
618
619 fn update_task_cell(
620 &self,
621 task: TaskId,
622 index: CellId,
623 content: CellContent,
624 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
625 );
626
627 fn get_or_create_persistent_task(
628 &self,
629 task_type: CachedTaskType,
630 parent_task: Option<TaskId>,
631 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
632 ) -> TaskId;
633
634 fn get_or_create_transient_task(
635 &self,
636 task_type: CachedTaskType,
637 parent_task: Option<TaskId>,
638 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
639 ) -> TaskId;
640
641 fn connect_task(
642 &self,
643 task: TaskId,
644 parent_task: Option<TaskId>,
645 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
646 );
647
648 fn mark_own_task_as_finished(
649 &self,
650 _task: TaskId,
651 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
652 ) {
653 }
655
656 fn set_own_task_aggregation_number(
657 &self,
658 _task: TaskId,
659 _aggregation_number: u32,
660 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
661 ) {
662 }
664
665 fn mark_own_task_as_session_dependent(
666 &self,
667 _task: TaskId,
668 _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
669 ) {
670 }
672
673 fn create_transient_task(
674 &self,
675 task_type: TransientTaskType,
676 turbo_tasks: &dyn TurboTasksBackendApi<Self>,
677 ) -> TaskId;
678
679 fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
680
681 fn task_statistics(&self) -> &TaskStatisticsApi;
682
683 fn is_tracking_dependencies(&self) -> bool;
684}