Skip to main content

turbo_tasks/
backend.rs

1use std::{
2    borrow::Cow,
3    error::Error,
4    fmt::{self, Debug, Display},
5    future::Future,
6    hash::{BuildHasherDefault, Hash},
7    pin::Pin,
8    sync::Arc,
9};
10
11use anyhow::{Result, anyhow};
12use auto_hash_map::AutoMap;
13use bincode::{
14    Decode, Encode,
15    de::Decoder,
16    enc::Encoder,
17    error::{DecodeError, EncodeError},
18    impl_borrow_decode,
19};
20use rustc_hash::FxHasher;
21use smallvec::SmallVec;
22use tracing::Span;
23use turbo_bincode::{
24    TurboBincodeDecode, TurboBincodeDecoder, TurboBincodeEncode, TurboBincodeEncoder,
25    impl_decode_for_turbo_bincode_decode, impl_encode_for_turbo_bincode_encode, new_hash_encoder,
26};
27use turbo_rcstr::RcStr;
28use turbo_tasks_hash::DeterministicHasher;
29
30use crate::{
31    RawVc, ReadCellOptions, ReadOutputOptions, ReadRef, SharedReference, TaskId, TaskIdSet,
32    TaskPriority, TraitRef, TraitTypeId, TurboTasksCallApi, TurboTasksPanic, ValueTypeId,
33    VcValueTrait, VcValueType, event::EventListener, macro_helpers::NativeFunction,
34    magic_any::MagicAny, manager::TurboTasksBackendApi, raw_vc::CellId, registry,
35    task::shared_reference::TypedSharedReference, task_statistics::TaskStatisticsApi, turbo_tasks,
36};
37
38pub type TransientTaskRoot =
39    Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<RawVc>> + Send>> + Send + Sync>;
40
41pub enum TransientTaskType {
42    /// A root task that will track dependencies and re-execute when
43    /// dependencies change. Task will eventually settle to the correct
44    /// execution.
45    ///
46    /// Always active. Automatically scheduled.
47    Root(TransientTaskRoot),
48
49    // TODO implement these strongly consistency
50    /// A single root task execution. It won't track dependencies.
51    ///
52    /// Task will definitely include all invalidations that happened before the
53    /// start of the task. It may or may not include invalidations that
54    /// happened after that. It may see these invalidations partially
55    /// applied.
56    ///
57    /// Active until done. Automatically scheduled.
58    Once(Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>),
59}
60
61impl Debug for TransientTaskType {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            Self::Root(_) => f.debug_tuple("Root").finish(),
65            Self::Once(_) => f.debug_tuple("Once").finish(),
66        }
67    }
68}
69
70/// A normal task execution containing a native (rust) function. This type is passed into the
71/// backend either to execute a function or to look up a cached result.
72#[derive(Debug, Eq)]
73pub struct CachedTaskType {
74    pub native_fn: &'static NativeFunction,
75    pub this: Option<RawVc>,
76    pub arg: Box<dyn MagicAny>,
77}
78
79impl CachedTaskType {
80    /// Get the name of the function from the registry. Equivalent to the
81    /// [`Display`]/[`ToString::to_string`] implementation, but does not allocate a [`String`].
82    pub fn get_name(&self) -> &'static str {
83        self.native_fn.name
84    }
85
86    /// Encodes this task type directly to a hasher, avoiding buffer allocation.
87    ///
88    /// This uses the same encoding logic as [`TurboBincodeEncode`] but writes
89    /// directly to a [`DeterministicHasher`] instead of a buffer.
90    pub fn hash_encode<H: DeterministicHasher>(&self, hasher: &mut H) {
91        let fn_id = registry::get_function_id(self.native_fn);
92        {
93            let mut encoder = new_hash_encoder(hasher);
94            Encode::encode(&fn_id, &mut encoder).expect("fn_id encoding should not fail");
95            Encode::encode(&self.this, &mut encoder).expect("this encoding should not fail");
96        }
97        (self.native_fn.arg_meta.hash_encode)(&*self.arg, hasher);
98    }
99}
100
101impl TurboBincodeEncode for CachedTaskType {
102    fn encode(&self, encoder: &mut TurboBincodeEncoder) -> Result<(), EncodeError> {
103        Encode::encode(&registry::get_function_id(self.native_fn), encoder)?;
104
105        let (encode_arg_any, _) = self.native_fn.arg_meta.bincode;
106        Encode::encode(&self.this, encoder)?;
107        encode_arg_any(&*self.arg, encoder)?;
108
109        Ok(())
110    }
111}
112
113impl<Context> TurboBincodeDecode<Context> for CachedTaskType {
114    fn decode(decoder: &mut TurboBincodeDecoder) -> Result<Self, DecodeError> {
115        let native_fn = registry::get_native_function(Decode::decode(decoder)?);
116
117        let (_, decode_arg_any) = native_fn.arg_meta.bincode;
118        let this = Decode::decode(decoder)?;
119        let arg = decode_arg_any(decoder)?;
120
121        Ok(Self {
122            native_fn,
123            this,
124            arg,
125        })
126    }
127}
128
129impl_encode_for_turbo_bincode_encode!(CachedTaskType);
130impl_decode_for_turbo_bincode_decode!(CachedTaskType);
131impl_borrow_decode!(CachedTaskType);
132
133// Manual implementation is needed because of a borrow issue with `Box<dyn Trait>`:
134// https://github.com/rust-lang/rust/issues/31740
135impl PartialEq for CachedTaskType {
136    #[expect(clippy::op_ref)]
137    fn eq(&self, other: &Self) -> bool {
138        self.native_fn == other.native_fn && self.this == other.this && &self.arg == &other.arg
139    }
140}
141
142// Manual implementation because we have to have a manual `PartialEq` implementation, and clippy
143// complains if we have a derived `Hash` impl, but manual `PartialEq` impl.
144impl Hash for CachedTaskType {
145    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
146        self.native_fn.hash(state);
147        self.this.hash(state);
148        self.arg.hash(state);
149    }
150}
151
152impl Display for CachedTaskType {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        f.write_str(self.get_name())
155    }
156}
157
158pub struct TaskExecutionSpec<'a> {
159    pub future: Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'a>>,
160    pub span: Span,
161}
162
163#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
164pub struct CellContent(pub Option<SharedReference>);
165#[derive(Clone, Debug, PartialEq, Eq, Hash)]
166pub struct TypedCellContent(pub ValueTypeId, pub CellContent);
167
168impl Display for CellContent {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        match &self.0 {
171            None => write!(f, "empty"),
172            Some(content) => Display::fmt(content, f),
173        }
174    }
175}
176
177impl TypedCellContent {
178    pub fn cast<T: VcValueType>(self) -> Result<ReadRef<T>> {
179        let data = self.1.0.ok_or_else(|| anyhow!("Cell is empty"))?;
180        let data = data
181            .downcast::<T>()
182            .map_err(|_err| anyhow!("Unexpected type in cell"))?;
183        Ok(ReadRef::new_arc(data))
184    }
185
186    /// # Safety
187    ///
188    /// The caller must ensure that the TypedCellContent contains a vc
189    /// that implements T.
190    pub fn cast_trait<T>(self) -> Result<TraitRef<T>>
191    where
192        T: VcValueTrait + ?Sized,
193    {
194        let shared_reference = self
195            .1
196            .0
197            .ok_or_else(|| anyhow!("Cell is empty"))?
198            .into_typed(self.0);
199        Ok(
200            // Safety: It is a TypedSharedReference
201            TraitRef::new(shared_reference),
202        )
203    }
204
205    pub fn into_untyped(self) -> CellContent {
206        self.1
207    }
208
209    pub fn encode(&self, enc: &mut TurboBincodeEncoder) -> Result<(), EncodeError> {
210        let Self(type_id, content) = self;
211        let value_type = registry::get_value_type(*type_id);
212        type_id.encode(enc)?;
213        if let Some(bincode) = value_type.bincode {
214            if let Some(reference) = &content.0 {
215                true.encode(enc)?;
216                bincode.0(&*reference.0, enc)?;
217                Ok(())
218            } else {
219                false.encode(enc)?;
220                Ok(())
221            }
222        } else {
223            Ok(())
224        }
225    }
226
227    pub fn decode(dec: &mut TurboBincodeDecoder) -> Result<Self, DecodeError> {
228        let type_id = ValueTypeId::decode(dec)?;
229        let value_type = registry::get_value_type(type_id);
230        if let Some(bincode) = value_type.bincode {
231            let is_some = bool::decode(dec)?;
232            if is_some {
233                let reference = bincode.1(dec)?;
234                return Ok(TypedCellContent(type_id, CellContent(Some(reference))));
235            }
236        }
237        Ok(TypedCellContent(type_id, CellContent(None)))
238    }
239}
240
241impl From<TypedSharedReference> for TypedCellContent {
242    fn from(value: TypedSharedReference) -> Self {
243        TypedCellContent(value.type_id, CellContent(Some(value.reference)))
244    }
245}
246
247impl TryFrom<TypedCellContent> for TypedSharedReference {
248    type Error = TypedCellContent;
249
250    fn try_from(content: TypedCellContent) -> Result<Self, TypedCellContent> {
251        if let TypedCellContent(type_id, CellContent(Some(reference))) = content {
252            Ok(TypedSharedReference { type_id, reference })
253        } else {
254            Err(content)
255        }
256    }
257}
258
259impl CellContent {
260    pub fn into_typed(self, type_id: ValueTypeId) -> TypedCellContent {
261        TypedCellContent(type_id, self)
262    }
263}
264
265impl From<SharedReference> for CellContent {
266    fn from(value: SharedReference) -> Self {
267        CellContent(Some(value))
268    }
269}
270
271impl From<Option<SharedReference>> for CellContent {
272    fn from(value: Option<SharedReference>) -> Self {
273        CellContent(value)
274    }
275}
276
277impl TryFrom<CellContent> for SharedReference {
278    type Error = CellContent;
279
280    fn try_from(content: CellContent) -> Result<Self, CellContent> {
281        if let CellContent(Some(shared_reference)) = content {
282            Ok(shared_reference)
283        } else {
284            Err(content)
285        }
286    }
287}
288
289pub type TaskCollectiblesMap = AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1>;
290
291// Structurally and functionally similar to Cow<&'static, str> but explicitly notes the importance
292// of non-static strings potentially containing PII (Personal Identifiable Information).
293#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
294pub enum TurboTasksExecutionErrorMessage {
295    PIISafe(#[bincode(with = "turbo_bincode::owned_cow")] Cow<'static, str>),
296    NonPIISafe(String),
297}
298
299impl Display for TurboTasksExecutionErrorMessage {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        match self {
302            TurboTasksExecutionErrorMessage::PIISafe(msg) => write!(f, "{msg}"),
303            TurboTasksExecutionErrorMessage::NonPIISafe(msg) => write!(f, "{msg}"),
304        }
305    }
306}
307
308#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
309pub struct TurboTasksError {
310    pub message: TurboTasksExecutionErrorMessage,
311    pub source: Option<TurboTasksExecutionError>,
312}
313
314/// Error context indicating that a task's execution failed. Stores a `task_id` and a reference to
315/// the `TurboTasksCallApi` so that the task name can be resolved lazily at display time (via
316/// [`TurboTasksCallApi::get_task_name`]) rather than eagerly at error creation time.
317#[derive(Clone)]
318pub struct TurboTaskContextError {
319    pub turbo_tasks: Arc<dyn TurboTasksCallApi>,
320    pub task_id: TaskId,
321    pub source: Option<TurboTasksExecutionError>,
322}
323
324impl PartialEq for TurboTaskContextError {
325    fn eq(&self, other: &Self) -> bool {
326        self.task_id == other.task_id && self.source == other.source
327    }
328}
329impl Eq for TurboTaskContextError {}
330
331impl Encode for TurboTaskContextError {
332    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
333        Encode::encode(&self.task_id, encoder)?;
334        Encode::encode(&self.source, encoder)?;
335        Ok(())
336    }
337}
338
339impl<Context> Decode<Context> for TurboTaskContextError {
340    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
341        let task_id = Decode::decode(decoder)?;
342        let source = Decode::decode(decoder)?;
343        let turbo_tasks = turbo_tasks();
344        Ok(Self {
345            turbo_tasks,
346            task_id,
347            source,
348        })
349    }
350}
351
352impl_borrow_decode!(TurboTaskContextError);
353
354impl Debug for TurboTaskContextError {
355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356        f.debug_struct("TurboTaskContextError")
357            .field("task_id", &self.task_id)
358            .field("source", &self.source)
359            .finish()
360    }
361}
362
363/// Error context for a local task that failed. Unlike [`TurboTaskContextError`],
364/// this stores the task name directly since local tasks don't have a [`TaskId`].
365#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
366pub struct TurboTaskLocalContextError {
367    pub name: RcStr,
368    pub source: Option<TurboTasksExecutionError>,
369}
370
371#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
372pub enum TurboTasksExecutionError {
373    Panic(Arc<TurboTasksPanic>),
374    Error(Arc<TurboTasksError>),
375    TaskContext(Arc<TurboTaskContextError>),
376    LocalTaskContext(Arc<TurboTaskLocalContextError>),
377}
378
379impl TurboTasksExecutionError {
380    /// Wraps this error in a [`TaskContext`](TurboTasksExecutionError::TaskContext) layer
381    /// identifying the normal task that encountered the error.
382    pub fn with_task_context(
383        self,
384        task_id: TaskId,
385        turbo_tasks: Arc<dyn TurboTasksCallApi>,
386    ) -> Self {
387        TurboTasksExecutionError::TaskContext(Arc::new(TurboTaskContextError {
388            task_id,
389            turbo_tasks,
390            source: Some(self),
391        }))
392    }
393
394    /// Wraps this error in a [`LocalTaskContext`](TurboTasksExecutionError::LocalTaskContext) layer
395    /// identifying the local task that encountered the error.
396    pub fn with_local_task_context(self, name: String) -> Self {
397        TurboTasksExecutionError::LocalTaskContext(Arc::new(TurboTaskLocalContextError {
398            name: RcStr::from(name),
399            source: Some(self),
400        }))
401    }
402}
403
404impl Error for TurboTasksExecutionError {
405    fn source(&self) -> Option<&(dyn Error + 'static)> {
406        match self {
407            TurboTasksExecutionError::Panic(_panic) => None,
408            TurboTasksExecutionError::Error(error) => {
409                error.source.as_ref().map(|s| s as &dyn Error)
410            }
411            TurboTasksExecutionError::TaskContext(context_error) => {
412                context_error.source.as_ref().map(|s| s as &dyn Error)
413            }
414            TurboTasksExecutionError::LocalTaskContext(context_error) => {
415                context_error.source.as_ref().map(|s| s as &dyn Error)
416            }
417        }
418    }
419}
420
421impl Display for TurboTasksExecutionError {
422    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423        match self {
424            TurboTasksExecutionError::Panic(panic) => write!(f, "{}", &panic),
425            TurboTasksExecutionError::Error(error) => {
426                write!(f, "{}", error.message)
427            }
428            TurboTasksExecutionError::TaskContext(context_error) => {
429                let task_id = context_error.task_id;
430                let name = context_error.turbo_tasks.get_task_name(task_id);
431                if cfg!(feature = "task_id_details") {
432                    write!(f, "Execution of {name} ({}) failed", task_id)
433                } else {
434                    write!(f, "Execution of {name} failed")
435                }
436            }
437            TurboTasksExecutionError::LocalTaskContext(context_error) => {
438                write!(f, "Execution of {} failed", context_error.name)
439            }
440        }
441    }
442}
443
444impl<'l> From<&'l (dyn std::error::Error + 'static)> for TurboTasksExecutionError {
445    fn from(err: &'l (dyn std::error::Error + 'static)) -> Self {
446        if let Some(err) = err.downcast_ref::<TurboTasksExecutionError>() {
447            return err.clone();
448        }
449        let message = err.to_string();
450        let source = err.source().map(|source| source.into());
451
452        TurboTasksExecutionError::Error(Arc::new(TurboTasksError {
453            message: TurboTasksExecutionErrorMessage::NonPIISafe(message),
454            source,
455        }))
456    }
457}
458
459impl From<anyhow::Error> for TurboTasksExecutionError {
460    fn from(err: anyhow::Error) -> Self {
461        let current: &(dyn std::error::Error + 'static) = err.as_ref();
462        current.into()
463    }
464}
465
466pub enum VerificationMode {
467    EqualityCheck,
468    Skip,
469}
470
471pub trait Backend: Sync + Send {
472    #[allow(unused_variables)]
473    fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
474
475    #[allow(unused_variables)]
476    fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
477    #[allow(unused_variables)]
478    fn stopping(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
479
480    #[allow(unused_variables)]
481    fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
482    #[allow(unused_variables)]
483    fn idle_end(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
484
485    fn invalidate_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
486
487    fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
488    fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
489
490    fn invalidate_serialization(
491        &self,
492        _task: TaskId,
493        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
494    ) {
495    }
496
497    fn try_start_task_execution<'a>(
498        &'a self,
499        task: TaskId,
500        priority: TaskPriority,
501        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
502    ) -> Option<TaskExecutionSpec<'a>>;
503
504    fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
505
506    fn task_execution_completed(
507        &self,
508        task: TaskId,
509        result: Result<RawVc, TurboTasksExecutionError>,
510        cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
511        #[cfg(feature = "verify_determinism")] stateful: bool,
512        has_invalidator: bool,
513        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
514    ) -> bool;
515
516    type BackendJob: Send + 'static;
517
518    fn run_backend_job<'a>(
519        &'a self,
520        job: Self::BackendJob,
521        turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
522    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
523
524    /// INVALIDATION: Be careful with this, when reader is None, it will not track dependencies, so
525    /// using it could break cache invalidation.
526    fn try_read_task_output(
527        &self,
528        task: TaskId,
529        reader: Option<TaskId>,
530        options: ReadOutputOptions,
531        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
532    ) -> Result<Result<RawVc, EventListener>>;
533
534    /// INVALIDATION: Be careful with this, when reader is None, it will not track dependencies, so
535    /// using it could break cache invalidation.
536    fn try_read_task_cell(
537        &self,
538        task: TaskId,
539        index: CellId,
540        reader: Option<TaskId>,
541        options: ReadCellOptions,
542        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
543    ) -> Result<Result<TypedCellContent, EventListener>>;
544
545    /// INVALIDATION: Be careful with this, it will not track dependencies, so
546    /// using it could break cache invalidation.
547    fn try_read_own_task_cell(
548        &self,
549        current_task: TaskId,
550        index: CellId,
551        options: ReadCellOptions,
552        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
553    ) -> Result<TypedCellContent> {
554        match self.try_read_task_cell(current_task, index, None, options, turbo_tasks)? {
555            Ok(content) => Ok(content),
556            Err(_) => Ok(TypedCellContent(index.type_id, CellContent(None))),
557        }
558    }
559
560    /// INVALIDATION: Be careful with this, when reader is None, it will not track dependencies, so
561    /// using it could break cache invalidation.
562    fn read_task_collectibles(
563        &self,
564        task: TaskId,
565        trait_id: TraitTypeId,
566        reader: Option<TaskId>,
567        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
568    ) -> TaskCollectiblesMap;
569
570    fn emit_collectible(
571        &self,
572        trait_type: TraitTypeId,
573        collectible: RawVc,
574        task: TaskId,
575        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
576    );
577
578    fn unemit_collectible(
579        &self,
580        trait_type: TraitTypeId,
581        collectible: RawVc,
582        count: u32,
583        task: TaskId,
584        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
585    );
586
587    fn update_task_cell(
588        &self,
589        task: TaskId,
590        index: CellId,
591        is_serializable_cell_content: bool,
592        content: CellContent,
593        updated_key_hashes: Option<SmallVec<[u64; 2]>>,
594        verification_mode: VerificationMode,
595        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
596    );
597
598    fn get_or_create_persistent_task(
599        &self,
600        task_type: CachedTaskType,
601        parent_task: Option<TaskId>,
602        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
603    ) -> TaskId;
604
605    fn get_or_create_transient_task(
606        &self,
607        task_type: CachedTaskType,
608        parent_task: Option<TaskId>,
609        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
610    ) -> TaskId;
611
612    fn connect_task(
613        &self,
614        task: TaskId,
615        parent_task: Option<TaskId>,
616        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
617    );
618
619    fn mark_own_task_as_finished(
620        &self,
621        _task: TaskId,
622        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
623    ) {
624        // Do nothing by default
625    }
626
627    fn mark_own_task_as_session_dependent(
628        &self,
629        _task: TaskId,
630        _turbo_tasks: &dyn TurboTasksBackendApi<Self>,
631    ) {
632        // Do nothing by default
633    }
634
635    fn create_transient_task(
636        &self,
637        task_type: TransientTaskType,
638        turbo_tasks: &dyn TurboTasksBackendApi<Self>,
639    ) -> TaskId;
640
641    fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
642
643    fn task_statistics(&self) -> &TaskStatisticsApi;
644
645    fn is_tracking_dependencies(&self) -> bool;
646
647    /// Returns a human-readable name for the given task. Used by error display formatting
648    /// to lazily resolve task names instead of storing them eagerly in error objects.
649    fn get_task_name(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) -> String;
650}