Skip to main content

turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_tasks::{
12    DynTaskInputs, RawVc, TaskId,
13    macro_helpers::NativeFunction,
14    panic_hooks::{PanicHookGuard, register_panic_hook},
15    parallel,
16};
17
18use crate::{
19    GitVersionInfo,
20    backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
21    backing_storage::{
22        BackingStorage, BackingStorageSealed, SnapshotItem, compute_task_type_hash_from_components,
23    },
24    database::{
25        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26        db_versioning::handle_db_versioning,
27        key_value_database::{KeySpace, KeyValueDatabase},
28        write_batch::{ConcurrentWriteBatch, WriteBuffer},
29    },
30    db_invalidation::invalidation_reasons,
31};
32
33const META_KEY_OPERATIONS: u32 = 0;
34const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
35
36struct IntKey([u8; 4]);
37
38impl IntKey {
39    fn new(value: u32) -> Self {
40        Self(value.to_le_bytes())
41    }
42}
43
44impl AsRef<[u8]> for IntKey {
45    fn as_ref(&self) -> &[u8] {
46        &self.0
47    }
48}
49
50fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
51    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
52    Ok(n)
53}
54
55// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
56// problems in turbo-tasks.
57//
58// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
59// won't report bugs to us, and we'll never find root-causes for these problems.
60//
61// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
62// feel these pain points and fix the root causes of bugs.
63fn should_invalidate_on_panic() -> bool {
64    fn env_is_falsy(key: &str) -> bool {
65        env::var_os(key)
66            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
67    }
68    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
69        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
70    });
71    *SHOULD_INVALIDATE
72}
73
74pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
75    database: T,
76    /// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
77    /// storage case.
78    base_path: Option<PathBuf>,
79    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
80    invalidated: Mutex<bool>,
81    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
82    /// drop.
83    _panic_hook_guard: Option<PanicHookGuard>,
84}
85
86pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
87    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
88    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
89}
90
91/// A wrapper type used by [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
92///
93/// Wraps a low-level key-value database into a higher-level [`BackingStorage`] type.
94impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
95    pub(crate) fn new_in_memory(database: T) -> Self {
96        Self {
97            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
98                database,
99                base_path: None,
100                invalidated: Mutex::new(false),
101                _panic_hook_guard: None,
102            }),
103        }
104    }
105
106    /// Handles boilerplate logic for an on-disk persisted database with versioning.
107    ///
108    /// - Creates a directory per version, with a maximum number of old versions and performs
109    ///   automatic cleanup of old versions.
110    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
111    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
112    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
113    ///
114    /// Along with returning a [`KeyValueDatabaseBackingStorage`], this returns a
115    /// [`StartupCacheState`], which can be used by the application for logging information to the
116    /// user or telemetry about the cache.
117    pub(crate) fn open_versioned_on_disk(
118        base_path: PathBuf,
119        version_info: &GitVersionInfo,
120        is_ci: bool,
121        database: impl FnOnce(PathBuf) -> Result<T>,
122    ) -> Result<(Self, StartupCacheState)>
123    where
124        T: Send + Sync + 'static,
125    {
126        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
127            .context("Failed to check database invalidation and cleanup")?;
128        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
129            .context("Failed to handle database versioning")?;
130        let database = (database)(versioned_path).context("Failed to open database")?;
131        let backing_storage = Self {
132            inner: Arc::new_cyclic(
133                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
134                    let panic_hook_guard = if should_invalidate_on_panic() {
135                        let weak_inner = weak_inner.clone();
136                        Some(register_panic_hook(Box::new(move |_| {
137                            let Some(inner) = weak_inner.upgrade() else {
138                                return;
139                            };
140                            // If a panic happened that must mean something deep inside of turbopack
141                            // or turbo-tasks failed, and it may be hard to recover. We don't want
142                            // the cache to stick around, as that may persist bugs. Make a
143                            // best-effort attempt to invalidate the database (ignoring failures).
144                            let _ = inner.invalidate(invalidation_reasons::PANIC);
145                        })))
146                    } else {
147                        None
148                    };
149                    KeyValueDatabaseBackingStorageInner {
150                        database,
151                        base_path: Some(base_path),
152                        invalidated: Mutex::new(false),
153                        _panic_hook_guard: panic_hook_guard,
154                    }
155                },
156            ),
157        };
158        Ok((backing_storage, startup_cache_state))
159    }
160}
161
162impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
163    fn invalidate(&self, reason_code: &str) -> Result<()> {
164        // `base_path` can be `None` for a `NoopKvDb`
165        if let Some(base_path) = &self.base_path {
166            // Invalidation could happen frequently if there's a bunch of panics. We only need to
167            // invalidate once, so grab a lock.
168            let mut invalidated_guard = self
169                .invalidated
170                .lock()
171                .unwrap_or_else(PoisonError::into_inner);
172            if *invalidated_guard {
173                return Ok(());
174            }
175            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
176            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
177            // partial writes.
178            invalidate_db(base_path, reason_code)?;
179            self.database.prevent_writes();
180            // Avoid redundant invalidations from future panics
181            *invalidated_guard = true;
182        }
183        Ok(())
184    }
185
186    /// Used to read the next free task ID from the database.
187    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
188        self.database
189            .get(KeySpace::Infra, IntKey::new(key).as_ref())?
190            .map(as_u32)
191            .transpose()
192    }
193}
194
195impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
196    for KeyValueDatabaseBackingStorage<T>
197{
198    fn invalidate(&self, reason_code: &str) -> Result<()> {
199        self.inner.invalidate(reason_code)
200    }
201}
202
203impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
204    for KeyValueDatabaseBackingStorage<T>
205{
206    fn next_free_task_id(&self) -> Result<TaskId> {
207        Ok(self
208            .inner
209            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
210            .context("Unable to read next free task id from database")?
211            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
212    }
213
214    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
215        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
216            let Some(operations) =
217                database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
218            else {
219                return Ok(Vec::new());
220            };
221            let operations = turbo_bincode_decode(operations.borrow())?;
222            Ok(operations)
223        }
224        get(&self.inner.database).context("Unable to read uncompleted operations from database")
225    }
226
227    fn save_snapshot<I>(&self, operations: Vec<Arc<AnyOperation>>, snapshots: Vec<I>) -> Result<()>
228    where
229        I: IntoIterator<Item = SnapshotItem> + Send + Sync,
230    {
231        let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
232        let batch = self.inner.database.write_batch()?;
233
234        {
235            let _span = tracing::trace_span!("update task data").entered();
236            let max_new_task_id =
237                parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
238                    let mut max_new_task_id = 0;
239                    for SnapshotItem {
240                        task_id,
241                        meta,
242                        data,
243                        task_type_hash,
244                    } in shard
245                    {
246                        let key = IntKey::new(*task_id);
247                        let key = key.as_ref();
248                        if let Some(meta) = meta {
249                            batch.put(
250                                KeySpace::TaskMeta,
251                                WriteBuffer::Borrowed(key),
252                                WriteBuffer::SmallVec(meta),
253                            )?;
254                        }
255                        if let Some(data) = data {
256                            batch.put(
257                                KeySpace::TaskData,
258                                WriteBuffer::Borrowed(key),
259                                WriteBuffer::SmallVec(data),
260                            )?;
261                        }
262                        // Write task cache entry inline if this is a new task
263                        if let Some(task_type_hash) = task_type_hash {
264                            batch.put(
265                                KeySpace::TaskCache,
266                                WriteBuffer::Borrowed(&task_type_hash),
267                                WriteBuffer::Borrowed(key),
268                            )?;
269                            max_new_task_id = max_new_task_id.max(*task_id);
270                        }
271                    }
272                    Ok(max_new_task_id)
273                })?
274                .into_iter()
275                .max()
276                .unwrap_or_default();
277
278            let span = tracing::trace_span!("flush task data").entered();
279            parallel::try_for_each(
280                &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
281                |&key_space| {
282                    let _span = span.clone().entered();
283                    // Safety: `map_collect_owned` has returned, so no concurrent `put` or
284                    // `delete` on these key spaces are in-flight.
285                    unsafe { batch.flush(key_space) }
286                },
287            )?;
288
289            let mut next_task_id = get_next_free_task_id(&batch)?;
290            next_task_id = next_task_id.max(max_new_task_id + 1);
291
292            save_infra(&batch, next_task_id, operations)?;
293            {
294                let _span = tracing::trace_span!("commit").entered();
295                batch.commit().context("Unable to commit operations")?;
296            }
297            Ok(())
298        }
299    }
300
301    fn lookup_task_candidates(
302        &self,
303        native_fn: &'static NativeFunction,
304        this: Option<RawVc>,
305        arg: &dyn DynTaskInputs,
306    ) -> Result<SmallVec<[TaskId; 1]>> {
307        let inner = &*self.inner;
308        if inner.database.is_empty() {
309            // Checking if the database is empty is a performance optimization
310            // to avoid computing the hash.
311            return Ok(SmallVec::new());
312        }
313        let hash = compute_task_type_hash_from_components(native_fn, this, arg);
314        let buffers = inner
315            .database
316            .get_multiple(KeySpace::TaskCache, &hash)
317            .with_context(|| {
318                format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
319            })?;
320
321        let mut task_ids = SmallVec::with_capacity(buffers.len());
322        for bytes in buffers {
323            let bytes = bytes.borrow().try_into()?;
324            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
325            task_ids.push(id);
326        }
327        Ok(task_ids)
328    }
329
330    fn lookup_data(
331        &self,
332        task_id: TaskId,
333        category: SpecificTaskDataCategory,
334        storage: &mut TaskStorage,
335    ) -> Result<()> {
336        let inner = &*self.inner;
337        let Some(bytes) = inner
338            .database
339            .get(category.key_space(), IntKey::new(*task_id).as_ref())
340            .with_context(|| {
341                format!("Looking up task storage for {task_id} from database failed")
342            })?
343        else {
344            return Ok(());
345        };
346        let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
347        storage
348            .decode(category, &mut decoder)
349            .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
350    }
351
352    fn batch_lookup_data(
353        &self,
354        task_ids: &[TaskId],
355        category: SpecificTaskDataCategory,
356    ) -> Result<Vec<TaskStorage>> {
357        let inner = &*self.inner;
358        let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
359        let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
360        let bytes = inner
361            .database
362            .batch_get(category.key_space(), &keys)
363            .with_context(|| {
364                format!(
365                    "Looking up typed data for {} tasks from database failed",
366                    task_ids.len()
367                )
368            })?;
369        bytes
370            .into_iter()
371            .map(|opt_bytes| {
372                let mut storage = TaskStorage::new();
373                if let Some(bytes) = opt_bytes {
374                    let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
375                    storage
376                        .decode(category, &mut decoder)
377                        .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
378                }
379                Ok(storage)
380            })
381            .collect::<Result<Vec<_>>>()
382    }
383
384    fn compact(&self) -> Result<bool> {
385        self.inner.database.compact()
386    }
387
388    fn shutdown(&self) -> Result<()> {
389        self.inner.database.shutdown()
390    }
391
392    fn has_unrecoverable_write_error(&self) -> bool {
393        self.inner.database.has_unrecoverable_write_error()
394    }
395}
396
397fn get_next_free_task_id<'a>(batch: &impl ConcurrentWriteBatch<'a>) -> Result<u32, anyhow::Error> {
398    Ok(
399        match batch.get(
400            KeySpace::Infra,
401            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
402        )? {
403            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
404            None => 1,
405        },
406    )
407}
408
409fn save_infra<'a>(
410    batch: &impl ConcurrentWriteBatch<'a>,
411    next_task_id: u32,
412    operations: Vec<Arc<AnyOperation>>,
413) -> Result<(), anyhow::Error> {
414    batch
415        .put(
416            KeySpace::Infra,
417            WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
418            WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
419        )
420        .context("Unable to write next free task id")?;
421    {
422        let _span =
423            tracing::trace_span!("update operations", operations = operations.len()).entered();
424        let operations =
425            turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
426        batch
427            .put(
428                KeySpace::Infra,
429                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
430                WriteBuffer::SmallVec(operations),
431            )
432            .context("Unable to write operations")?;
433    }
434    // Safety: save_infra is called after all concurrent writes to Infra are done.
435    unsafe { batch.flush(KeySpace::Infra)? };
436    Ok(())
437}
438
439#[cfg(test)]
440mod tests {
441    use std::borrow::Borrow;
442
443    use turbo_tasks::TaskId;
444
445    use super::*;
446    use crate::database::{
447        key_value_database::KeyValueDatabase,
448        turbo::TurboKeyValueDatabase,
449        write_batch::{ConcurrentWriteBatch, WriteBuffer},
450    };
451
452    /// Helper to write to the database using the concurrent batch API.
453    fn write_task_cache_entry(
454        db: &TurboKeyValueDatabase,
455        hash: u64,
456        task_id: TaskId,
457    ) -> Result<()> {
458        let batch = db.write_batch()?;
459        batch.put(
460            KeySpace::TaskCache,
461            WriteBuffer::Borrowed(&hash.to_le_bytes()),
462            WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
463        )?;
464        batch.commit()?;
465        Ok(())
466    }
467
468    /// Tests that `get_multiple` correctly returns multiple TaskIds when the same hash key
469    /// is used (simulating a hash collision scenario).
470    ///
471    /// This is a lower-level test that verifies the database layer correctly handles
472    /// the case where multiple task IDs are stored under the same hash key.
473    #[tokio::test(flavor = "multi_thread")]
474    async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
475        let tempdir = tempfile::tempdir()?;
476        let path = tempdir.path();
477
478        // Use is_short_session=true to disable background compaction (which requires turbo-tasks
479        // context)
480        let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
481
482        // Simulate a hash collision by writing multiple TaskIds with the same hash key
483        let collision_hash: u64 = 0xDEADBEEF;
484        let task_id_1 = TaskId::try_from(100u32).unwrap();
485        let task_id_2 = TaskId::try_from(200u32).unwrap();
486        let task_id_3 = TaskId::try_from(300u32).unwrap();
487
488        // Write three task IDs under the same hash key (simulating collision)
489        // Each write creates a new SST file, so all three will be returned by get_multiple
490        write_task_cache_entry(&db, collision_hash, task_id_1)?;
491        write_task_cache_entry(&db, collision_hash, task_id_2)?;
492        write_task_cache_entry(&db, collision_hash, task_id_3)?;
493
494        // Now query using get_multiple - should return all three TaskIds
495        let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
496
497        assert_eq!(
498            results.len(),
499            3,
500            "Should return all 3 task IDs for the colliding hash"
501        );
502
503        // Convert results to TaskIds and verify all three are present
504        let mut found_ids: Vec<TaskId> = results
505            .iter()
506            .map(|bytes| {
507                let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
508                TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
509            })
510            .collect();
511        found_ids.sort_by_key(|id| **id);
512
513        assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
514
515        db.shutdown()?;
516        Ok(())
517    }
518
519    /// Tests that multiple distinct keys written in a single batch with flush can be read back.
520    /// This mirrors the actual save_snapshot pattern: write many TaskCache entries, flush, commit.
521    #[tokio::test(flavor = "multi_thread")]
522    async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
523        let tempdir = tempfile::tempdir()?;
524        let path = tempdir.path();
525
526        let n = 100_000;
527        let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
528        let task_ids: Vec<TaskId> = (1..=n as u32)
529            .map(|i| TaskId::try_from(i).unwrap())
530            .collect();
531
532        // Write all entries in a single batch with flush (like save_snapshot does)
533        {
534            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
535            let batch = db.write_batch()?;
536
537            for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
538                batch.put(
539                    KeySpace::TaskCache,
540                    WriteBuffer::Borrowed(&hash.to_le_bytes()),
541                    WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
542                )?;
543            }
544            // Flush TaskCache (like the new code does)
545            unsafe { batch.flush(KeySpace::TaskCache) }?;
546            batch.commit()?;
547
548            db.shutdown()?;
549        }
550
551        // Reopen and verify all entries are readable
552        {
553            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
554            let mut found = 0;
555            let mut missing = 0;
556            for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
557                let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
558                if results.is_empty() {
559                    missing += 1;
560                } else {
561                    found += 1;
562                    let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
563                    let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
564                    assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
565                }
566            }
567            assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
568            db.shutdown()?;
569        }
570
571        Ok(())
572    }
573}