Skip to main content

turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_tasks::{
12    DynTaskInputs, RawVc, TaskId,
13    macro_helpers::NativeFunction,
14    panic_hooks::{PanicHookGuard, register_panic_hook},
15    parallel,
16};
17
18use crate::{
19    GitVersionInfo,
20    backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
21    backing_storage::{
22        BackingStorage, BackingStorageSealed, SnapshotItem, SnapshotMeta,
23        compute_task_type_hash_from_components,
24    },
25    database::{
26        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
27        db_versioning::handle_db_versioning,
28        key_value_database::{KeySpace, KeyValueDatabase},
29        write_batch::{ConcurrentWriteBatch, WriteBuffer},
30    },
31    db_invalidation::invalidation_reasons,
32};
33
34const META_KEY_OPERATIONS: u32 = 0;
35const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
36
37struct IntKey([u8; 4]);
38
39impl IntKey {
40    fn new(value: u32) -> Self {
41        Self(value.to_le_bytes())
42    }
43}
44
45impl AsRef<[u8]> for IntKey {
46    fn as_ref(&self) -> &[u8] {
47        &self.0
48    }
49}
50
51fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
52    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
53    Ok(n)
54}
55
56// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
57// problems in turbo-tasks.
58//
59// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
60// won't report bugs to us, and we'll never find root-causes for these problems.
61//
62// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
63// feel these pain points and fix the root causes of bugs.
64fn should_invalidate_on_panic() -> bool {
65    fn env_is_falsy(key: &str) -> bool {
66        env::var_os(key)
67            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
68    }
69    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
70        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
71    });
72    *SHOULD_INVALIDATE
73}
74
75pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
76    database: T,
77    /// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
78    /// storage case.
79    base_path: Option<PathBuf>,
80    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
81    invalidated: Mutex<bool>,
82    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
83    /// drop.
84    _panic_hook_guard: Option<PanicHookGuard>,
85}
86
87pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
88    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
89    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
90}
91
92/// A wrapper type used by [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
93///
94/// Wraps a low-level key-value database into a higher-level [`BackingStorage`] type.
95impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
96    pub(crate) fn new_in_memory(database: T) -> Self {
97        Self {
98            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
99                database,
100                base_path: None,
101                invalidated: Mutex::new(false),
102                _panic_hook_guard: None,
103            }),
104        }
105    }
106
107    /// Handles boilerplate logic for an on-disk persisted database with versioning.
108    ///
109    /// - Creates a directory per version, with a maximum number of old versions and performs
110    ///   automatic cleanup of old versions.
111    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
112    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
113    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
114    ///
115    /// Along with returning a [`KeyValueDatabaseBackingStorage`], this returns a
116    /// [`StartupCacheState`], which can be used by the application for logging information to the
117    /// user or telemetry about the cache.
118    pub(crate) fn open_versioned_on_disk(
119        base_path: PathBuf,
120        version_info: &GitVersionInfo,
121        is_ci: bool,
122        database: impl FnOnce(PathBuf) -> Result<T>,
123    ) -> Result<(Self, StartupCacheState)>
124    where
125        T: Send + Sync + 'static,
126    {
127        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
128            .context("Failed to check database invalidation and cleanup")?;
129        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
130            .context("Failed to handle database versioning")?;
131        let database = (database)(versioned_path).context("Failed to open database")?;
132        let backing_storage = Self {
133            inner: Arc::new_cyclic(
134                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
135                    let panic_hook_guard = if should_invalidate_on_panic() {
136                        let weak_inner = weak_inner.clone();
137                        Some(register_panic_hook(Box::new(move |_| {
138                            let Some(inner) = weak_inner.upgrade() else {
139                                return;
140                            };
141                            // If a panic happened that must mean something deep inside of turbopack
142                            // or turbo-tasks failed, and it may be hard to recover. We don't want
143                            // the cache to stick around, as that may persist bugs. Make a
144                            // best-effort attempt to invalidate the database (ignoring failures).
145                            let _ = inner.invalidate(invalidation_reasons::PANIC);
146                        })))
147                    } else {
148                        None
149                    };
150                    KeyValueDatabaseBackingStorageInner {
151                        database,
152                        base_path: Some(base_path),
153                        invalidated: Mutex::new(false),
154                        _panic_hook_guard: panic_hook_guard,
155                    }
156                },
157            ),
158        };
159        Ok((backing_storage, startup_cache_state))
160    }
161}
162
163impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
164    fn invalidate(&self, reason_code: &str) -> Result<()> {
165        // `base_path` can be `None` for a `NoopKvDb`
166        if let Some(base_path) = &self.base_path {
167            // Invalidation could happen frequently if there's a bunch of panics. We only need to
168            // invalidate once, so grab a lock.
169            let mut invalidated_guard = self
170                .invalidated
171                .lock()
172                .unwrap_or_else(PoisonError::into_inner);
173            if *invalidated_guard {
174                return Ok(());
175            }
176            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
177            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
178            // partial writes.
179            invalidate_db(base_path, reason_code)?;
180            self.database.prevent_writes();
181            // Avoid redundant invalidations from future panics
182            *invalidated_guard = true;
183        }
184        Ok(())
185    }
186
187    /// Used to read the next free task ID from the database.
188    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
189        self.database
190            .get(KeySpace::Infra, IntKey::new(key).as_ref())?
191            .map(as_u32)
192            .transpose()
193    }
194}
195
196impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
197    for KeyValueDatabaseBackingStorage<T>
198{
199    fn invalidate(&self, reason_code: &str) -> Result<()> {
200        self.inner.invalidate(reason_code)
201    }
202}
203
204impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
205    for KeyValueDatabaseBackingStorage<T>
206{
207    fn next_free_task_id(&self) -> Result<TaskId> {
208        Ok(self
209            .inner
210            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
211            .context("Unable to read next free task id from database")?
212            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
213    }
214
215    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
216        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
217            let Some(operations) =
218                database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
219            else {
220                return Ok(Vec::new());
221            };
222            let operations = turbo_bincode_decode(operations.borrow())?;
223            Ok(operations)
224        }
225        get(&self.inner.database).context("Unable to read uncompleted operations from database")
226    }
227
228    fn save_snapshot<I>(
229        &self,
230        operations: Vec<Arc<AnyOperation>>,
231        snapshots: Vec<I>,
232    ) -> Result<SnapshotMeta>
233    where
234        I: IntoIterator<Item = SnapshotItem> + Send + Sync,
235    {
236        let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
237        let batch = self.inner.database.write_batch()?;
238
239        {
240            let _span = tracing::trace_span!("update task data").entered();
241            let snapshot_meta =
242                parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
243                    let mut max_new_task_id = 0;
244                    let mut data_items = 0;
245                    let mut meta_items = 0;
246                    let mut task_cache_items = 0;
247                    for SnapshotItem {
248                        task_id,
249                        meta,
250                        data,
251                        task_type_hash,
252                    } in shard
253                    {
254                        let key = IntKey::new(*task_id);
255                        let key = key.as_ref();
256                        if let Some(meta) = meta {
257                            batch.put(
258                                KeySpace::TaskMeta,
259                                WriteBuffer::Borrowed(key),
260                                WriteBuffer::SmallVec(meta),
261                            )?;
262                            meta_items += 1;
263                        }
264                        if let Some(data) = data {
265                            batch.put(
266                                KeySpace::TaskData,
267                                WriteBuffer::Borrowed(key),
268                                WriteBuffer::SmallVec(data),
269                            )?;
270                            data_items += 1;
271                        }
272                        // Write task cache entry inline if this is a new task
273                        if let Some(task_type_hash) = task_type_hash {
274                            batch.put(
275                                KeySpace::TaskCache,
276                                WriteBuffer::Borrowed(&task_type_hash),
277                                WriteBuffer::Borrowed(key),
278                            )?;
279                            task_cache_items += 1;
280                            max_new_task_id = max_new_task_id.max(*task_id);
281                        }
282                    }
283                    Ok(SnapshotMeta {
284                        data_items,
285                        meta_items,
286                        task_cache_items,
287                        max_next_task_id: max_new_task_id,
288                    })
289                })?
290                .into_iter()
291                .reduce(|t1, t2| t1.merge(t2))
292                .unwrap_or_default();
293
294            let span = tracing::trace_span!("flush task data").entered();
295            parallel::try_for_each(
296                &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
297                |&key_space| {
298                    let _span = span.clone().entered();
299                    // Safety: `map_collect_owned` has returned, so no concurrent `put` or
300                    // `delete` on these key spaces are in-flight.
301                    unsafe { batch.flush(key_space) }
302                },
303            )?;
304
305            let mut next_task_id = get_next_free_task_id(&batch)?;
306            next_task_id = next_task_id.max(snapshot_meta.max_next_task_id + 1);
307
308            save_infra(&batch, next_task_id, operations)?;
309            {
310                let _span = tracing::trace_span!("commit").entered();
311                batch.commit().context("Unable to commit operations")?;
312            }
313            Ok(snapshot_meta)
314        }
315    }
316
317    fn lookup_task_candidates(
318        &self,
319        native_fn: &'static NativeFunction,
320        this: Option<RawVc>,
321        arg: &dyn DynTaskInputs,
322    ) -> Result<SmallVec<[TaskId; 1]>> {
323        let inner = &*self.inner;
324        if inner.database.is_empty() {
325            // Checking if the database is empty is a performance optimization
326            // to avoid computing the hash.
327            return Ok(SmallVec::new());
328        }
329        let hash = compute_task_type_hash_from_components(native_fn, this, arg);
330        let buffers = inner
331            .database
332            .get_multiple(KeySpace::TaskCache, &hash)
333            .with_context(|| {
334                format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
335            })?;
336
337        let mut task_ids = SmallVec::with_capacity(buffers.len());
338        for bytes in buffers {
339            let bytes = bytes.borrow().try_into()?;
340            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
341            task_ids.push(id);
342        }
343        Ok(task_ids)
344    }
345
346    fn lookup_data(
347        &self,
348        task_id: TaskId,
349        category: SpecificTaskDataCategory,
350        storage: &mut TaskStorage,
351    ) -> Result<()> {
352        let inner = &*self.inner;
353        let Some(bytes) = inner
354            .database
355            .get(category.key_space(), IntKey::new(*task_id).as_ref())
356            .with_context(|| {
357                format!("Looking up task storage for {task_id} from database failed")
358            })?
359        else {
360            return Ok(());
361        };
362        let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
363        storage
364            .decode(category, &mut decoder)
365            .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
366    }
367
368    fn batch_lookup_data(
369        &self,
370        task_ids: &[TaskId],
371        category: SpecificTaskDataCategory,
372    ) -> Result<Vec<TaskStorage>> {
373        let inner = &*self.inner;
374        let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
375        let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
376        let bytes = inner
377            .database
378            .batch_get(category.key_space(), &keys)
379            .with_context(|| {
380                format!(
381                    "Looking up typed data for {} tasks from database failed",
382                    task_ids.len()
383                )
384            })?;
385        bytes
386            .into_iter()
387            .map(|opt_bytes| {
388                let mut storage = TaskStorage::new();
389                if let Some(bytes) = opt_bytes {
390                    let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
391                    storage
392                        .decode(category, &mut decoder)
393                        .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
394                }
395                Ok(storage)
396            })
397            .collect::<Result<Vec<_>>>()
398    }
399
400    fn compact(&self) -> Result<bool> {
401        self.inner.database.compact()
402    }
403
404    fn shutdown(&self) -> Result<()> {
405        self.inner.database.shutdown()
406    }
407
408    fn has_unrecoverable_write_error(&self) -> bool {
409        self.inner.database.has_unrecoverable_write_error()
410    }
411}
412
413fn get_next_free_task_id<'a>(batch: &impl ConcurrentWriteBatch<'a>) -> Result<u32, anyhow::Error> {
414    Ok(
415        match batch.get(
416            KeySpace::Infra,
417            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
418        )? {
419            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
420            None => 1,
421        },
422    )
423}
424
425fn save_infra<'a>(
426    batch: &impl ConcurrentWriteBatch<'a>,
427    next_task_id: u32,
428    operations: Vec<Arc<AnyOperation>>,
429) -> Result<(), anyhow::Error> {
430    batch
431        .put(
432            KeySpace::Infra,
433            WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
434            WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
435        )
436        .context("Unable to write next free task id")?;
437    {
438        let _span =
439            tracing::trace_span!("update operations", operations = operations.len()).entered();
440        let operations =
441            turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
442        batch
443            .put(
444                KeySpace::Infra,
445                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
446                WriteBuffer::SmallVec(operations),
447            )
448            .context("Unable to write operations")?;
449    }
450    // Safety: save_infra is called after all concurrent writes to Infra are done.
451    unsafe { batch.flush(KeySpace::Infra)? };
452    Ok(())
453}
454
455#[cfg(test)]
456mod tests {
457    use std::borrow::Borrow;
458
459    use turbo_tasks::TaskId;
460
461    use super::*;
462    use crate::database::{
463        key_value_database::KeyValueDatabase,
464        turbo::TurboKeyValueDatabase,
465        write_batch::{ConcurrentWriteBatch, WriteBuffer},
466    };
467
468    /// Helper to write to the database using the concurrent batch API.
469    fn write_task_cache_entry(
470        db: &TurboKeyValueDatabase,
471        hash: u64,
472        task_id: TaskId,
473    ) -> Result<()> {
474        let batch = db.write_batch()?;
475        batch.put(
476            KeySpace::TaskCache,
477            WriteBuffer::Borrowed(&hash.to_le_bytes()),
478            WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
479        )?;
480        batch.commit()?;
481        Ok(())
482    }
483
484    /// Tests that `get_multiple` correctly returns multiple TaskIds when the same hash key
485    /// is used (simulating a hash collision scenario).
486    ///
487    /// This is a lower-level test that verifies the database layer correctly handles
488    /// the case where multiple task IDs are stored under the same hash key.
489    #[tokio::test(flavor = "multi_thread")]
490    async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
491        let tempdir = tempfile::tempdir()?;
492        let path = tempdir.path();
493
494        // Use is_short_session=true to disable background compaction (which requires turbo-tasks
495        // context)
496        let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
497
498        // Simulate a hash collision by writing multiple TaskIds with the same hash key
499        let collision_hash: u64 = 0xDEADBEEF;
500        let task_id_1 = TaskId::try_from(100u32).unwrap();
501        let task_id_2 = TaskId::try_from(200u32).unwrap();
502        let task_id_3 = TaskId::try_from(300u32).unwrap();
503
504        // Write three task IDs under the same hash key (simulating collision)
505        // Each write creates a new SST file, so all three will be returned by get_multiple
506        write_task_cache_entry(&db, collision_hash, task_id_1)?;
507        write_task_cache_entry(&db, collision_hash, task_id_2)?;
508        write_task_cache_entry(&db, collision_hash, task_id_3)?;
509
510        // Now query using get_multiple - should return all three TaskIds
511        let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
512
513        assert_eq!(
514            results.len(),
515            3,
516            "Should return all 3 task IDs for the colliding hash"
517        );
518
519        // Convert results to TaskIds and verify all three are present
520        let mut found_ids: Vec<TaskId> = results
521            .iter()
522            .map(|bytes| {
523                let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
524                TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
525            })
526            .collect();
527        found_ids.sort_by_key(|id| **id);
528
529        assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
530
531        db.shutdown()?;
532        Ok(())
533    }
534
535    /// Tests that multiple distinct keys written in a single batch with flush can be read back.
536    /// This mirrors the actual save_snapshot pattern: write many TaskCache entries, flush, commit.
537    #[tokio::test(flavor = "multi_thread")]
538    async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
539        let tempdir = tempfile::tempdir()?;
540        let path = tempdir.path();
541
542        let n = 100_000;
543        let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
544        let task_ids: Vec<TaskId> = (1..=n as u32)
545            .map(|i| TaskId::try_from(i).unwrap())
546            .collect();
547
548        // Write all entries in a single batch with flush (like save_snapshot does)
549        {
550            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
551            let batch = db.write_batch()?;
552
553            for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
554                batch.put(
555                    KeySpace::TaskCache,
556                    WriteBuffer::Borrowed(&hash.to_le_bytes()),
557                    WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
558                )?;
559            }
560            // Flush TaskCache (like the new code does)
561            unsafe { batch.flush(KeySpace::TaskCache) }?;
562            batch.commit()?;
563
564            db.shutdown()?;
565        }
566
567        // Reopen and verify all entries are readable
568        {
569            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
570            let mut found = 0;
571            let mut missing = 0;
572            for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
573                let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
574                if results.is_empty() {
575                    missing += 1;
576                } else {
577                    found += 1;
578                    let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
579                    let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
580                    assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
581                }
582            }
583            assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
584            db.shutdown()?;
585        }
586
587        Ok(())
588    }
589}