turbo_tasks/
backend.rs

1use std::{
2    borrow::Cow,
3    error::Error,
4    fmt::{self, Debug, Display},
5    future::Future,
6    hash::{BuildHasherDefault, Hash},
7    pin::Pin,
8    sync::Arc,
9};
10
11use anyhow::{Result, anyhow};
12use auto_hash_map::AutoMap;
13use bincode::{
14    Decode, Encode,
15    error::{DecodeError, EncodeError},
16    impl_borrow_decode,
17};
18use rustc_hash::FxHasher;
19use smallvec::SmallVec;
20use tracing::Span;
21use turbo_bincode::{
22    TurboBincodeDecode, TurboBincodeDecoder, TurboBincodeEncode, TurboBincodeEncoder,
23    impl_decode_for_turbo_bincode_decode, impl_encode_for_turbo_bincode_encode,
24};
25use turbo_rcstr::RcStr;
26
27use crate::{
28    RawVc, ReadCellOptions, ReadOutputOptions, ReadRef, SharedReference, TaskId, TaskIdSet,
29    TaskPriority, TraitRef, TraitTypeId, TurboTasksPanic, ValueTypeId, VcValueTrait, VcValueType,
30    event::EventListener, macro_helpers::NativeFunction, magic_any::MagicAny,
31    manager::TurboTasksBackendApi, raw_vc::CellId, registry,
32    task::shared_reference::TypedSharedReference, task_statistics::TaskStatisticsApi,
33};
34
35pub type TransientTaskRoot =
36    Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<RawVc>> + Send>> + Send + Sync>;
37
38pub enum TransientTaskType {
39    /// A root task that will track dependencies and re-execute when
40    /// dependencies change. Task will eventually settle to the correct
41    /// execution.
42    ///
43    /// Always active. Automatically scheduled.
44    Root(TransientTaskRoot),
45
46    // TODO implement these strongly consistency
47    /// A single root task execution. It won't track dependencies.
48    ///
49    /// Task will definitely include all invalidations that happened before the
50    /// start of the task. It may or may not include invalidations that
51    /// happened after that. It may see these invalidations partially
52    /// applied.
53    ///
54    /// Active until done. Automatically scheduled.
55    Once(Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>),
56}
57
58impl Debug for TransientTaskType {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            Self::Root(_) => f.debug_tuple("Root").finish(),
62            Self::Once(_) => f.debug_tuple("Once").finish(),
63        }
64    }
65}
66
67/// A normal task execution containing a native (rust) function. This type is passed into the
68/// backend either to execute a function or to look up a cached result.
69#[derive(Debug, Eq)]
70pub struct CachedTaskType {
71    pub native_fn: &'static NativeFunction,
72    pub this: Option<RawVc>,
73    pub arg: Box<dyn MagicAny>,
74}
75
76impl CachedTaskType {
77    /// Get the name of the function from the registry. Equivalent to the
78    /// [`Display`]/[`ToString::to_string`] implementation, but does not allocate a [`String`].
79    pub fn get_name(&self) -> &'static str {
80        self.native_fn.name
81    }
82}
83
84impl TurboBincodeEncode for CachedTaskType {
85    fn encode(&self, encoder: &mut TurboBincodeEncoder) -> Result<(), EncodeError> {
86        Encode::encode(&registry::get_function_id(self.native_fn), encoder)?;
87
88        let (encode_arg_any, _) = self.native_fn.arg_meta.bincode;
89        Encode::encode(&self.this, encoder)?;
90        encode_arg_any(&*self.arg, encoder)?;
91
92        Ok(())
93    }
94}
95
96impl<Context> TurboBincodeDecode<Context> for CachedTaskType {
97    fn decode(decoder: &mut TurboBincodeDecoder) -> Result<Self, DecodeError> {
98        let native_fn = registry::get_native_function(Decode::decode(decoder)?);
99
100        let (_, decode_arg_any) = native_fn.arg_meta.bincode;
101        let this = Decode::decode(decoder)?;
102        let arg = decode_arg_any(decoder)?;
103
104        Ok(Self {
105            native_fn,
106            this,
107            arg,
108        })
109    }
110}
111
112impl_encode_for_turbo_bincode_encode!(CachedTaskType);
113impl_decode_for_turbo_bincode_decode!(CachedTaskType);
114impl_borrow_decode!(CachedTaskType);
115
116// Manual implementation is needed because of a borrow issue with `Box<dyn Trait>`:
117// https://github.com/rust-lang/rust/issues/31740
118impl PartialEq for CachedTaskType {
119    #[expect(clippy::op_ref)]
120    fn eq(&self, other: &Self) -> bool {
121        self.native_fn == other.native_fn && self.this == other.this && &self.arg == &other.arg
122    }
123}
124
125// Manual implementation because we have to have a manual `PartialEq` implementation, and clippy
126// complains if we have a derived `Hash` impl, but manual `PartialEq` impl.
127impl Hash for CachedTaskType {
128    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
129        self.native_fn.hash(state);
130        self.this.hash(state);
131        self.arg.hash(state);
132    }
133}
134
135impl Display for CachedTaskType {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        f.write_str(self.get_name())
138    }
139}
140
141pub struct TaskExecutionSpec<'a> {
142    pub future: Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'a>>,
143    pub span: Span,
144}
145
146#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
147pub struct CellContent(pub Option<SharedReference>);
148#[derive(Clone, Debug, PartialEq, Eq, Hash)]
149pub struct TypedCellContent(pub ValueTypeId, pub CellContent);
150
151impl Display for CellContent {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        match &self.0 {
154            None => write!(f, "empty"),
155            Some(content) => Display::fmt(content, f),
156        }
157    }
158}
159
160impl TypedCellContent {
161    pub fn cast<T: VcValueType>(self) -> Result<ReadRef<T>> {
162        let data = self.1.0.ok_or_else(|| anyhow!("Cell is empty"))?;
163        let data = data
164            .downcast::<T>()
165            .map_err(|_err| anyhow!("Unexpected type in cell"))?;
166        Ok(ReadRef::new_arc(data))
167    }
168
169    /// # Safety
170    ///
171    /// The caller must ensure that the TypedCellContent contains a vc
172    /// that implements T.
173    pub fn cast_trait<T>(self) -> Result<TraitRef<T>>
174    where
175        T: VcValueTrait + ?Sized,
176    {
177        let shared_reference = self
178            .1
179            .0
180            .ok_or_else(|| anyhow!("Cell is empty"))?
181            .into_typed(self.0);
182        Ok(
183            // Safety: It is a TypedSharedReference
184            TraitRef::new(shared_reference),
185        )
186    }
187
188    pub fn into_untyped(self) -> CellContent {
189        self.1
190    }
191
192    pub fn encode(&self, enc: &mut TurboBincodeEncoder) -> Result<(), EncodeError> {
193        let Self(type_id, content) = self;
194        let value_type = registry::get_value_type(*type_id);
195        type_id.encode(enc)?;
196        if let Some(bincode) = value_type.bincode {
197            if let Some(reference) = &content.0 {
198                true.encode(enc)?;
199                bincode.0(&*reference.0, enc)?;
200                Ok(())
201            } else {
202                false.encode(enc)?;
203                Ok(())
204            }
205        } else {
206            Ok(())
207        }
208    }
209
210    pub fn decode(dec: &mut TurboBincodeDecoder) -> Result<Self, DecodeError> {
211        let type_id = ValueTypeId::decode(dec)?;
212        let value_type = registry::get_value_type(type_id);
213        if let Some(bincode) = value_type.bincode {
214            let is_some = bool::decode(dec)?;
215            if is_some {
216                let reference = bincode.1(dec)?;
217                return Ok(TypedCellContent(type_id, CellContent(Some(reference))));
218            }
219        }
220        Ok(TypedCellContent(type_id, CellContent(None)))
221    }
222}
223
224impl From<TypedSharedReference> for TypedCellContent {
225    fn from(value: TypedSharedReference) -> Self {
226        TypedCellContent(value.type_id, CellContent(Some(value.reference)))
227    }
228}
229
230impl TryFrom<TypedCellContent> for TypedSharedReference {
231    type Error = TypedCellContent;
232
233    fn try_from(content: TypedCellContent) -> Result<Self, TypedCellContent> {
234        if let TypedCellContent(type_id, CellContent(Some(reference))) = content {
235            Ok(TypedSharedReference { type_id, reference })
236        } else {
237            Err(content)
238        }
239    }
240}
241
242impl CellContent {
243    pub fn into_typed(self, type_id: ValueTypeId) -> TypedCellContent {
244        TypedCellContent(type_id, self)
245    }
246}
247
248impl From<SharedReference> for CellContent {
249    fn from(value: SharedReference) -> Self {
250        CellContent(Some(value))
251    }
252}
253
254impl From<Option<SharedReference>> for CellContent {
255    fn from(value: Option<SharedReference>) -> Self {
256        CellContent(value)
257    }
258}
259
260impl TryFrom<CellContent> for SharedReference {
261    type Error = CellContent;
262
263    fn try_from(content: CellContent) -> Result<Self, CellContent> {
264        if let CellContent(Some(shared_reference)) = content {
265            Ok(shared_reference)
266        } else {
267            Err(content)
268        }
269    }
270}
271
272pub type TaskCollectiblesMap = AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1>;
273
274// Structurally and functionally similar to Cow<&'static, str> but explicitly notes the importance
275// of non-static strings potentially containing PII (Personal Identifiable Information).
276#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)]
277pub enum TurboTasksExecutionErrorMessage {
278    PIISafe(#[bincode(with = "turbo_bincode::owned_cow")] Cow<'static, str>),
279    NonPIISafe(String),
280}
281
282impl Display for TurboTasksExecutionErrorMessage {
283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284        match self {
285            TurboTasksExecutionErrorMessage::PIISafe(msg) => write!(f, "{msg}"),
286            TurboTasksExecutionErrorMessage::NonPIISafe(msg) => write!(f, "{msg}"),
287        }
288    }
289}
290
291#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
292pub struct TurboTasksError {
293    pub message: TurboTasksExecutionErrorMessage,
294    pub source: Option<TurboTasksExecutionError>,
295}
296
297#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
298pub struct TurboTaskContextError {
299    pub task: RcStr,
300    #[cfg(feature = "task_id_details")]
301    pub task_id: Option<TaskId>,
302    pub source: Option<TurboTasksExecutionError>,
303}
304
305#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)]
306pub enum TurboTasksExecutionError {
307    Panic(Arc<TurboTasksPanic>),
308    Error(Arc<TurboTasksError>),
309    TaskContext(Arc<TurboTaskContextError>),
310}
311
312impl TurboTasksExecutionError {
313    pub fn with_task_context(&self, task: impl Display, _task_id: Option<TaskId>) -> Self {
314        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
315            task: RcStr::from(task.to_string()),
316            #[cfg(feature = "task_id_details")]
317            task_id: _task_id,
318            source: Some(self.clone()),
319        }))
320    }
321}
322
323impl Error for TurboTasksExecutionError {
324    fn source(&self) -> Option<&(dyn Error + 'static)> {
325        match self {
326            TurboTasksExecutionError::Panic(_panic) => None,
327            TurboTasksExecutionError::Error(error) => {
328                error.source.as_ref().map(|s| s as &dyn Error)
329            }
330            TurboTasksExecutionError::TaskContext(context_error) => {
331                context_error.source.as_ref().map(|s| s as &dyn Error)
332            }
333        }
334    }
335}
336
337impl Display for TurboTasksExecutionError {
338    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
339        match self {
340            TurboTasksExecutionError::Panic(panic) => write!(f, "{}", &panic),
341            TurboTasksExecutionError::Error(error) => {
342                write!(f, "{}", error.message)
343            }
344            TurboTasksExecutionError::TaskContext(context_error) => {
345                #[cfg(feature = "task_id_details")]
346                if let Some(task_id) = context_error.task_id {
347                    return write!(
348                        f,
349                        "Execution of {} ({}) failed",
350                        context_error.task, task_id
351                    );
352                }
353                write!(f, "Execution of {} failed", context_error.task)
354            }
355        }
356    }
357}
358
359impl<'l> From<&'l (dyn std::error::Error + 'static)> for TurboTasksExecutionError {
360    fn from(err: &'l (dyn std::error::Error + 'static)) -> Self {
361        if let Some(err) = err.downcast_ref::<TurboTasksExecutionError>() {
362            return err.clone();
363        }
364        let message = err.to_string();
365        let source = err.source().map(|source| source.into());
366
367        TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
368            message: TurboTasksExecutionErrorMessage::NonPIISafe(message),
369            source,
370        }))
371    }
372}
373
374impl From<anyhow::Error> for TurboTasksExecutionError {
375    fn from(err: anyhow::Error) -> Self {
376        let current: &(dyn std::error::Error + 'static) = err.as_ref();
377        current.into()
378    }
379}
380
381pub enum VerificationMode {
382    EqualityCheck,
383    Skip,
384}
385
386pub trait Backend: Sync + Send {
387    #[allow(unused_variables)]
388    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
389
390    #[allow(unused_variables)]
391    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
392    #[allow(unused_variables)]
393    fn stopping(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
394
395    #[allow(unused_variables)]
396    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
397    #[allow(unused_variables)]
398    fn idle_end(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
399
400    fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
401
402    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
403    fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
404
405    fn invalidate_serialization(
406        &self,
407        _task: TaskId,
408        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
409    ) {
410    }
411
412    fn get_task_description(&self, task: TaskId) -> String;
413
414    fn try_start_task_execution<'a>(
415        &'a self,
416        task: TaskId,
417        priority: TaskPriority,
418        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
419    ) -> Option<TaskExecutionSpec<'a>>;
420
421    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
422
423    fn task_execution_completed(
424        &self,
425        task: TaskId,
426        result: Result<RawVc, TurboTasksExecutionError>,
427        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
428        has_invalidator: bool,
429        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
430    ) -> bool;
431
432    type BackendJob: Send + 'static;
433
434    fn run_backend_job<'a>(
435        &'a self,
436        job: Self::BackendJob,
437        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
438    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
439
440    /// INVALIDATION: Be careful with this, when reader is None, it will not track dependencies, so
441    /// using it could break cache invalidation.
442    fn try_read_task_output(
443        &self,
444        task: TaskId,
445        reader: Option<TaskId>,
446        options: ReadOutputOptions,
447        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
448    ) -> Result<Result<RawVc, EventListener>>;
449
450    /// INVALIDATION: Be careful with this, when reader is None, it will not track dependencies, so
451    /// using it could break cache invalidation.
452    fn try_read_task_cell(
453        &self,
454        task: TaskId,
455        index: CellId,
456        reader: Option<TaskId>,
457        options: ReadCellOptions,
458        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
459    ) -> Result<Result<TypedCellContent, EventListener>>;
460
461    /// INVALIDATION: Be careful with this, it will not track dependencies, so
462    /// using it could break cache invalidation.
463    fn try_read_own_task_cell(
464        &self,
465        current_task: TaskId,
466        index: CellId,
467        options: ReadCellOptions,
468        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
469    ) -> Result<TypedCellContent> {
470        match self.try_read_task_cell(current_task, index, None, options, turbo_tasks)? {
471            Ok(content) => Ok(content),
472            Err(_) => Ok(TypedCellContent(index.type_id, CellContent(None))),
473        }
474    }
475
476    /// INVALIDATION: Be careful with this, when reader is None, it will not track dependencies, so
477    /// using it could break cache invalidation.
478    fn read_task_collectibles(
479        &self,
480        task: TaskId,
481        trait_id: TraitTypeId,
482        reader: Option<TaskId>,
483        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
484    ) -> TaskCollectiblesMap;
485
486    fn emit_collectible(
487        &self,
488        trait_type: TraitTypeId,
489        collectible: RawVc,
490        task: TaskId,
491        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
492    );
493
494    fn unemit_collectible(
495        &self,
496        trait_type: TraitTypeId,
497        collectible: RawVc,
498        count: u32,
499        task: TaskId,
500        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
501    );
502
503    fn update_task_cell(
504        &self,
505        task: TaskId,
506        index: CellId,
507        is_serializable_cell_content: bool,
508        content: CellContent,
509        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
510        verification_mode: VerificationMode,
511        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
512    );
513
514    fn get_or_create_persistent_task(
515        &self,
516        task_type: CachedTaskType,
517        parent_task: Option<TaskId>,
518        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
519    ) -> TaskId;
520
521    fn get_or_create_transient_task(
522        &self,
523        task_type: CachedTaskType,
524        parent_task: Option<TaskId>,
525        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
526    ) -> TaskId;
527
528    fn connect_task(
529        &self,
530        task: TaskId,
531        parent_task: Option<TaskId>,
532        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
533    );
534
535    fn mark_own_task_as_finished(
536        &self,
537        _task: TaskId,
538        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
539    ) {
540        // Do nothing by default
541    }
542
543    fn set_own_task_aggregation_number(
544        &self,
545        _task: TaskId,
546        _aggregation_number: u32,
547        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
548    ) {
549        // Do nothing by default
550    }
551
552    fn mark_own_task_as_session_dependent(
553        &self,
554        _task: TaskId,
555        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
556    ) {
557        // Do nothing by default
558    }
559
560    fn create_transient_task(
561        &self,
562        task_type: TransientTaskType,
563        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
564    ) -> TaskId;
565
566    fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
567
568    fn task_statistics(&self) -> &TaskStatisticsApi;
569
570    fn is_tracking_dependencies(&self) -> bool;
571}