Skip to main content

turbo_tasks_backend/
backing_storage.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use either::Either;
5use smallvec::SmallVec;
6use turbo_bincode::TurboBincodeBuffer;
7use turbo_tasks::{
8    DynTaskInputs, RawVc, TaskId, backend::CachedTaskType, macro_helpers::NativeFunction,
9};
10use turbo_tasks_hash::Xxh3Hash64Hasher;
11
12use crate::backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage};
13
14pub type TaskTypeHash = [u8; 8];
15
16/// A single item yielded by the snapshot iterator during persistence.
17pub struct SnapshotItem {
18    pub task_id: TaskId,
19    /// Serialized task meta data, if modified
20    pub meta: Option<TurboBincodeBuffer>,
21    /// Serialized task data, if modified
22    pub data: Option<TurboBincodeBuffer>,
23    /// Task type for new tasks that need to be added to the task cache
24    pub task_type_hash: Option<TaskTypeHash>,
25}
26
27/// Computes a deterministic 64-bit hash of a CachedTaskType for use as a TaskCache key.
28///
29/// This encodes the task type directly to a hasher, avoiding intermediate buffer allocation.
30/// The encoding is deterministic (function IDs from registry, bincode argument encoding).
31pub fn compute_task_type_hash(task_type: &CachedTaskType) -> TaskTypeHash {
32    let mut hasher = Xxh3Hash64Hasher::new();
33    task_type.hash_encode(&mut hasher);
34    let hash = hasher.finish();
35    if cfg!(feature = "verify_serialization") {
36        hasher = Xxh3Hash64Hasher::new();
37        task_type.hash_encode(&mut hasher);
38        let hash2 = hasher.finish();
39        assert_eq!(
40            hash, hash2,
41            "Hashing TaskType twice was non-deterministic: \n{:?}\ngot hashes {} != {}",
42            task_type, hash, hash2
43        );
44    }
45    hash.to_le_bytes()
46}
47
48/// Computes a deterministic 64-bit hash from task type components for use as a TaskCache key.
49///
50/// Like [`compute_task_type_hash`], but works with borrowed components so the caller does not need
51/// to construct (and box-allocate) a full [`CachedTaskType`] first.
52pub fn compute_task_type_hash_from_components(
53    native_fn: &'static NativeFunction,
54    this: Option<RawVc>,
55    arg: &dyn DynTaskInputs,
56) -> TaskTypeHash {
57    let mut hasher = Xxh3Hash64Hasher::new();
58    CachedTaskType::hash_encode_components(native_fn, this, arg, &mut hasher);
59    hasher.finish().to_le_bytes()
60}
61
62/// Represents types accepted by [`TurboTasksBackend::new`]. Typically this is the value returned by
63/// [`turbo_backing_storage`] or [`noop_backing_storage`].
64///
65/// This trait is [sealed]. External crates are not allowed to implement it.
66///
67/// [`turbo_backing_storage`]: crate::turbo_backing_storage
68/// [`noop_backing_storage`]: crate::noop_backing_storage
69/// [`TurboTasksBackend::new`]: crate::TurboTasksBackend::new
70/// [sealed]: https://predr.ag/blog/definitive-guide-to-sealed-traits-in-rust/
71pub trait BackingStorage: BackingStorageSealed {
72    /// Called when the database should be invalidated upon re-initialization.
73    ///
74    /// This typically means that we'll restart the process or `turbo-tasks` soon with a fresh
75    /// database. If this happens, there's no point in writing anything else to disk, or flushing
76    /// during [`TurboTasksBackend::stop`].
77    ///
78    /// [`TurboTasksBackend::stop`]: turbo_tasks::backend::Backend::stop
79    //
80    // This can be implemented by calling `database::db_invalidation::invalidate_db` with the
81    // database's non-versioned base path.
82    fn invalidate(&self, reason_code: &str) -> Result<()>;
83}
84
85/// Private methods used by [`BackingStorage`]. This trait is `pub` (because of the sealed-trait
86/// pattern), but should not be exported outside of the crate.
87///
88/// [`BackingStorage`] is exported for documentation reasons and to expose the public
89/// [`BackingStorage::invalidate`] method.
90pub trait BackingStorageSealed: 'static + Send + Sync {
91    fn next_free_task_id(&self) -> Result<TaskId>;
92    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>>;
93
94    fn save_snapshot<I>(&self, operations: Vec<Arc<AnyOperation>>, snapshots: Vec<I>) -> Result<()>
95    where
96        I: IntoIterator<Item = SnapshotItem> + Send + Sync;
97    /// Returns all task IDs that match the given task type (hash collision candidates).
98    ///
99    /// Since TaskCache uses hash-based keys, multiple task types may (rarely) hash to the same key.
100    /// The caller must verify each returned TaskId by comparing the stored task type which will
101    /// require a second database read
102    fn lookup_task_candidates(
103        &self,
104        native_fn: &'static NativeFunction,
105        this: Option<RawVc>,
106        arg: &dyn DynTaskInputs,
107    ) -> Result<SmallVec<[TaskId; 1]>>;
108    /// Looks up and decodes persisted data for a single task, updating the provided storage with
109    /// data from the database in the given category.
110    fn lookup_data(
111        &self,
112        task_id: TaskId,
113        category: SpecificTaskDataCategory,
114        storage: &mut TaskStorage,
115    ) -> Result<()>;
116
117    /// Batch lookup and decode data for multiple tasks directly into TypedStorage instances.
118    /// Returns a vector of TypedStorage, one for each task_id in the input slice.
119    fn batch_lookup_data(
120        &self,
121        task_ids: &[TaskId],
122        category: SpecificTaskDataCategory,
123    ) -> Result<Vec<TaskStorage>>;
124
125    fn compact(&self) -> Result<bool> {
126        Ok(false)
127    }
128
129    fn shutdown(&self) -> Result<()> {
130        Ok(())
131    }
132
133    /// Returns true if the database is in an unrecoverable error state where a previous write or
134    /// compaction failed and the rollback also failed, permanently disabling further writes.
135    fn has_unrecoverable_write_error(&self) -> bool {
136        false
137    }
138}
139
140impl<L, R> BackingStorage for Either<L, R>
141where
142    L: BackingStorage,
143    R: BackingStorage,
144{
145    fn invalidate(&self, reason_code: &str) -> Result<()> {
146        either::for_both!(self, this => this.invalidate(reason_code))
147    }
148}
149
150impl<L, R> BackingStorageSealed for Either<L, R>
151where
152    L: BackingStorageSealed,
153    R: BackingStorageSealed,
154{
155    fn next_free_task_id(&self) -> Result<TaskId> {
156        either::for_both!(self, this => this.next_free_task_id())
157    }
158
159    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
160        either::for_both!(self, this => this.uncompleted_operations())
161    }
162
163    fn save_snapshot<I>(&self, operations: Vec<Arc<AnyOperation>>, snapshots: Vec<I>) -> Result<()>
164    where
165        I: IntoIterator<Item = SnapshotItem> + Send + Sync,
166    {
167        either::for_both!(self, this => this.save_snapshot(
168            operations,
169            snapshots,
170        ))
171    }
172
173    fn lookup_task_candidates(
174        &self,
175        native_fn: &'static NativeFunction,
176        this: Option<RawVc>,
177        arg: &dyn DynTaskInputs,
178    ) -> Result<SmallVec<[TaskId; 1]>> {
179        either::for_both!(self, this_impl => this_impl.lookup_task_candidates(native_fn, this, arg))
180    }
181
182    fn lookup_data(
183        &self,
184        task_id: TaskId,
185        category: SpecificTaskDataCategory,
186        storage: &mut TaskStorage,
187    ) -> Result<()> {
188        either::for_both!(self, this => this.lookup_data(task_id, category, storage))
189    }
190
191    fn batch_lookup_data(
192        &self,
193        task_ids: &[TaskId],
194        category: SpecificTaskDataCategory,
195    ) -> Result<Vec<TaskStorage>> {
196        either::for_both!(self, this => this.batch_lookup_data(task_ids, category))
197    }
198
199    fn compact(&self) -> Result<bool> {
200        either::for_both!(self, this => this.compact())
201    }
202
203    fn shutdown(&self) -> Result<()> {
204        either::for_both!(self, this => this.shutdown())
205    }
206
207    fn has_unrecoverable_write_error(&self) -> bool {
208        either::for_both!(self, this => this.has_unrecoverable_write_error())
209    }
210}