turbo_tasks_backend/
backing_storage.rs

1use std::{any::type_name, sync::Arc};
2
3use anyhow::Result;
4use either::Either;
5use smallvec::SmallVec;
6use turbo_tasks::{SessionId, TaskId, backend::CachedTaskType};
7
8use crate::{
9    backend::{AnyOperation, TaskDataCategory},
10    data::CachedDataItem,
11    utils::chunked_vec::ChunkedVec,
12};
13
14/// Represents types accepted by [`TurboTasksBackend::new`]. Typically this is the value returned by
15/// [`default_backing_storage`] or [`noop_backing_storage`].
16///
17/// This trait is [sealed]. External crates are not allowed to implement it.
18///
19/// [`default_backing_storage`]: crate::default_backing_storage
20/// [`noop_backing_storage`]: crate::noop_backing_storage
21/// [`TurboTasksBackend::new`]: crate::TurboTasksBackend::new
22/// [sealed]: https://predr.ag/blog/definitive-guide-to-sealed-traits-in-rust/
23pub trait BackingStorage: BackingStorageSealed {
24    /// Called when the database should be invalidated upon re-initialization.
25    ///
26    /// This typically means that we'll restart the process or `turbo-tasks` soon with a fresh
27    /// database. If this happens, there's no point in writing anything else to disk, or flushing
28    /// during [`KeyValueDatabase::shutdown`].
29    ///
30    /// This can be implemented by calling [`invalidate_db`] with
31    /// the database's non-versioned base path.
32    ///
33    /// [`KeyValueDatabase::shutdown`]: crate::database::key_value_database::KeyValueDatabase::shutdown
34    /// [`invalidate_db`]: crate::database::db_invalidation::invalidate_db
35    fn invalidate(&self, reason_code: &str) -> Result<()>;
36}
37
38/// Private methods used by [`BackingStorage`]. This trait is `pub` (because of the sealed-trait
39/// pattern), but should not be exported outside of the crate.
40///
41/// [`BackingStorage`] is exported for documentation reasons and to expose the public
42/// [`BackingStorage::invalidate`] method.
43pub trait BackingStorageSealed: 'static + Send + Sync {
44    type ReadTransaction<'l>;
45    fn next_free_task_id(&self) -> Result<TaskId>;
46    fn next_session_id(&self) -> Result<SessionId>;
47    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>>;
48    #[allow(clippy::ptr_arg)]
49    fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>>;
50    fn save_snapshot<I>(
51        &self,
52        session_id: SessionId,
53        operations: Vec<Arc<AnyOperation>>,
54        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
55        snapshots: Vec<I>,
56    ) -> Result<()>
57    where
58        I: Iterator<
59                Item = (
60                    TaskId,
61                    Option<SmallVec<[u8; 16]>>,
62                    Option<SmallVec<[u8; 16]>>,
63                ),
64            > + Send
65            + Sync;
66    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>>;
67    /// # Safety
68    ///
69    /// `tx` must be a transaction from this BackingStorage instance.
70    unsafe fn forward_lookup_task_cache(
71        &self,
72        tx: Option<&Self::ReadTransaction<'_>>,
73        key: &CachedTaskType,
74    ) -> Result<Option<TaskId>>;
75    /// # Safety
76    ///
77    /// `tx` must be a transaction from this BackingStorage instance.
78    unsafe fn reverse_lookup_task_cache(
79        &self,
80        tx: Option<&Self::ReadTransaction<'_>>,
81        task_id: TaskId,
82    ) -> Result<Option<Arc<CachedTaskType>>>;
83    /// # Safety
84    ///
85    /// `tx` must be a transaction from this BackingStorage instance.
86    unsafe fn lookup_data(
87        &self,
88        tx: Option<&Self::ReadTransaction<'_>>,
89        task_id: TaskId,
90        category: TaskDataCategory,
91    ) -> Result<Vec<CachedDataItem>>;
92
93    fn shutdown(&self) -> Result<()> {
94        Ok(())
95    }
96}
97
98impl<L, R> BackingStorage for Either<L, R>
99where
100    L: BackingStorage,
101    R: BackingStorage,
102{
103    fn invalidate(&self, reason_code: &str) -> Result<()> {
104        either::for_both!(self, this => this.invalidate(reason_code))
105    }
106}
107
108impl<L, R> BackingStorageSealed for Either<L, R>
109where
110    L: BackingStorageSealed,
111    R: BackingStorageSealed,
112{
113    type ReadTransaction<'l> = Either<L::ReadTransaction<'l>, R::ReadTransaction<'l>>;
114
115    fn next_free_task_id(&self) -> Result<TaskId> {
116        either::for_both!(self, this => this.next_free_task_id())
117    }
118
119    fn next_session_id(&self) -> Result<SessionId> {
120        either::for_both!(self, this => this.next_session_id())
121    }
122
123    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
124        either::for_both!(self, this => this.uncompleted_operations())
125    }
126
127    fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
128        either::for_both!(self, this => this.serialize(task, data))
129    }
130
131    fn save_snapshot<I>(
132        &self,
133        session_id: SessionId,
134        operations: Vec<Arc<AnyOperation>>,
135        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
136        snapshots: Vec<I>,
137    ) -> Result<()>
138    where
139        I: Iterator<
140                Item = (
141                    TaskId,
142                    Option<SmallVec<[u8; 16]>>,
143                    Option<SmallVec<[u8; 16]>>,
144                ),
145            > + Send
146            + Sync,
147    {
148        either::for_both!(self, this => this.save_snapshot(
149            session_id,
150            operations,
151            task_cache_updates,
152            snapshots,
153        ))
154    }
155
156    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
157        Some(match self {
158            Either::Left(this) => Either::Left(this.start_read_transaction()?),
159            Either::Right(this) => Either::Right(this.start_read_transaction()?),
160        })
161    }
162
163    unsafe fn forward_lookup_task_cache(
164        &self,
165        tx: Option<&Self::ReadTransaction<'_>>,
166        key: &CachedTaskType,
167    ) -> Result<Option<TaskId>> {
168        match self {
169            Either::Left(this) => {
170                let tx = tx.map(|tx| read_transaction_left_or_panic(tx.as_ref()));
171                unsafe { this.forward_lookup_task_cache(tx, key) }
172            }
173            Either::Right(this) => {
174                let tx = tx.map(|tx| read_transaction_right_or_panic(tx.as_ref()));
175                unsafe { this.forward_lookup_task_cache(tx, key) }
176            }
177        }
178    }
179
180    unsafe fn reverse_lookup_task_cache(
181        &self,
182        tx: Option<&Self::ReadTransaction<'_>>,
183        task_id: TaskId,
184    ) -> Result<Option<Arc<CachedTaskType>>> {
185        match self {
186            Either::Left(this) => {
187                let tx = tx.map(|tx| read_transaction_left_or_panic(tx.as_ref()));
188                unsafe { this.reverse_lookup_task_cache(tx, task_id) }
189            }
190            Either::Right(this) => {
191                let tx = tx.map(|tx| read_transaction_right_or_panic(tx.as_ref()));
192                unsafe { this.reverse_lookup_task_cache(tx, task_id) }
193            }
194        }
195    }
196
197    unsafe fn lookup_data(
198        &self,
199        tx: Option<&Self::ReadTransaction<'_>>,
200        task_id: TaskId,
201        category: TaskDataCategory,
202    ) -> Result<Vec<CachedDataItem>> {
203        match self {
204            Either::Left(this) => {
205                let tx = tx.map(|tx| read_transaction_left_or_panic(tx.as_ref()));
206                unsafe { this.lookup_data(tx, task_id, category) }
207            }
208            Either::Right(this) => {
209                let tx = tx.map(|tx| read_transaction_right_or_panic(tx.as_ref()));
210                unsafe { this.lookup_data(tx, task_id, category) }
211            }
212        }
213    }
214}
215
216// similar to `Either::unwrap_left`, but does not require `R: Debug`.
217fn read_transaction_left_or_panic<L, R>(either: Either<L, R>) -> L {
218    match either {
219        Either::Left(l) => l,
220        Either::Right(_) => panic!(
221            "expected ReadTransaction of Either::Left containing {}, received Either::Right type \
222             of {}",
223            type_name::<L>(),
224            type_name::<R>(),
225        ),
226    }
227}
228
229// similar to `Either::unwrap_right`, but does not require `R: Debug`.
230fn read_transaction_right_or_panic<L, R>(either: Either<L, R>) -> R {
231    match either {
232        Either::Left(_) => panic!(
233            "expected ReadTransaction of Either::Right containing {}, received Either::Left type \
234             of {}",
235            type_name::<R>(),
236            type_name::<L>(),
237        ),
238        Either::Right(r) => r,
239    }
240}