Skip to main content

turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_persistence::CommitStats;
12use turbo_tasks::{
13    DynTaskInputs, RawVc, TaskId,
14    macro_helpers::NativeFunction,
15    panic_hooks::{PanicHookGuard, register_panic_hook},
16    parallel,
17};
18
19use crate::{
20    GitVersionInfo,
21    backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
22    backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash_from_components},
23    database::{
24        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
25        db_versioning::handle_db_versioning,
26        key_value_database::KeySpace,
27        turbo::{TurboKeyValueDatabase, TurboWriteBatch},
28        write_batch::WriteBuffer,
29    },
30    db_invalidation::invalidation_reasons,
31};
32
33const META_KEY_OPERATIONS: u32 = 0;
34const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
35
36struct IntKey([u8; 4]);
37
38impl IntKey {
39    fn new(value: u32) -> Self {
40        Self(value.to_le_bytes())
41    }
42}
43
44impl AsRef<[u8]> for IntKey {
45    fn as_ref(&self) -> &[u8] {
46        &self.0
47    }
48}
49
50fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
51    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
52    Ok(n)
53}
54
55// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
56// problems in turbo-tasks.
57//
58// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
59// won't report bugs to us, and we'll never find root-causes for these problems.
60//
61// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
62// feel these pain points and fix the root causes of bugs.
63fn should_invalidate_on_panic() -> bool {
64    fn env_is_falsy(key: &str) -> bool {
65        env::var_os(key)
66            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
67    }
68    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
69        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
70    });
71    *SHOULD_INVALIDATE
72}
73
74struct TurboBackingStorageInner {
75    database: TurboKeyValueDatabase,
76    /// Used when calling [`TurboBackingStorage::invalidate`]. Can be `None` in the
77    /// memory-only/no-op storage case.
78    base_path: Option<PathBuf>,
79    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
80    invalidated: Mutex<bool>,
81    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
82    /// drop.
83    _panic_hook_guard: Option<PanicHookGuard>,
84}
85
86/// The higher-level backing storage passed to [`TurboTasksBackend::new`], used by
87/// [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
88///
89/// Wraps a low-level [`TurboKeyValueDatabase`] and adapts it into the persistence operations the
90/// backend needs (snapshots, task-candidate lookups, etc.).
91///
92/// [`TurboTasksBackend::new`]: crate::TurboTasksBackend::new
93pub struct TurboBackingStorage {
94    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
95    inner: Arc<TurboBackingStorageInner>,
96}
97
98impl TurboBackingStorage {
99    pub(crate) fn new_in_memory(database: TurboKeyValueDatabase) -> Self {
100        Self {
101            inner: Arc::new(TurboBackingStorageInner {
102                database,
103                base_path: None,
104                invalidated: Mutex::new(false),
105                _panic_hook_guard: None,
106            }),
107        }
108    }
109
110    /// Handles boilerplate logic for an on-disk persisted database with versioning.
111    ///
112    /// - Creates a directory per version, with a maximum number of old versions and performs
113    ///   automatic cleanup of old versions.
114    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
115    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
116    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
117    ///
118    /// Along with returning a [`TurboBackingStorage`], this returns a
119    /// [`StartupCacheState`], which can be used by the application for logging information to the
120    /// user or telemetry about the cache.
121    pub(crate) fn open_versioned_on_disk(
122        base_path: PathBuf,
123        version_info: &GitVersionInfo,
124        is_ci: bool,
125        database: impl FnOnce(PathBuf) -> Result<TurboKeyValueDatabase>,
126    ) -> Result<(Self, StartupCacheState)> {
127        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
128            .context("Failed to check database invalidation and cleanup")?;
129        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
130            .context("Failed to handle database versioning")?;
131        let database = (database)(versioned_path).context("Failed to open database")?;
132        let backing_storage = Self {
133            inner: Arc::new_cyclic(move |weak_inner: &Weak<TurboBackingStorageInner>| {
134                let panic_hook_guard = if should_invalidate_on_panic() {
135                    let weak_inner = weak_inner.clone();
136                    Some(register_panic_hook(Box::new(move |_| {
137                        let Some(inner) = weak_inner.upgrade() else {
138                            return;
139                        };
140                        // If a panic happened that must mean something deep inside of turbopack
141                        // or turbo-tasks failed, and it may be hard to recover. We don't want
142                        // the cache to stick around, as that may persist bugs. Make a
143                        // best-effort attempt to invalidate the database (ignoring failures).
144                        let _ = inner.invalidate(invalidation_reasons::PANIC);
145                    })))
146                } else {
147                    None
148                };
149                TurboBackingStorageInner {
150                    database,
151                    base_path: Some(base_path),
152                    invalidated: Mutex::new(false),
153                    _panic_hook_guard: panic_hook_guard,
154                }
155            }),
156        };
157        Ok((backing_storage, startup_cache_state))
158    }
159}
160
161impl TurboBackingStorageInner {
162    fn invalidate(&self, reason_code: &str) -> Result<()> {
163        // `base_path` is `None` for in-memory backing storage (see `noop_backing_storage`).
164        if let Some(base_path) = &self.base_path {
165            // Invalidation could happen frequently if there's a bunch of panics. We only need to
166            // invalidate once, so grab a lock.
167            let mut invalidated_guard = self
168                .invalidated
169                .lock()
170                .unwrap_or_else(PoisonError::into_inner);
171            if *invalidated_guard {
172                return Ok(());
173            }
174            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
175            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
176            // partial writes.
177            invalidate_db(base_path, reason_code)?;
178            self.database.prevent_writes();
179            // Avoid redundant invalidations from future panics
180            *invalidated_guard = true;
181        }
182        Ok(())
183    }
184
185    /// Used to read the next free task ID from the database.
186    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
187        self.database
188            .get(KeySpace::Infra, IntKey::new(key).as_ref())?
189            .map(as_u32)
190            .transpose()
191    }
192}
193
194impl TurboBackingStorage {
195    /// Called when the database should be invalidated upon re-initialization.
196    ///
197    /// This typically means that we'll restart the process or `turbo-tasks` soon with a fresh
198    /// database. If this happens, there's no point in writing anything else to disk, or flushing
199    /// during [`TurboTasksBackend::stop`].
200    ///
201    /// [`TurboTasksBackend::stop`]: turbo_tasks::backend::Backend::stop
202    pub(crate) fn invalidate(&self, reason_code: &str) -> Result<()> {
203        self.inner.invalidate(reason_code)
204    }
205
206    pub(crate) fn next_free_task_id(&self) -> Result<TaskId> {
207        Ok(self
208            .inner
209            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
210            .context("Unable to read next free task id from database")?
211            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
212    }
213
214    pub(crate) fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
215        fn get(database: &TurboKeyValueDatabase) -> Result<Vec<AnyOperation>> {
216            let Some(operations) =
217                database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
218            else {
219                return Ok(Vec::new());
220            };
221            let operations = turbo_bincode_decode(operations.borrow())?;
222            Ok(operations)
223        }
224        get(&self.inner.database).context("Unable to read uncompleted operations from database")
225    }
226
227    pub(crate) fn save_snapshot<I>(
228        &self,
229        operations: Vec<Arc<AnyOperation>>,
230        snapshots: Vec<I>,
231    ) -> Result<SnapshotMeta>
232    where
233        I: IntoIterator<Item = SnapshotItem> + Send + Sync,
234    {
235        let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
236        let batch = self.inner.database.write_batch()?;
237
238        {
239            let _span = tracing::trace_span!("update task data").entered();
240            let mut snapshot_meta =
241                parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
242                    let mut max_new_task_id = 0;
243                    let mut data_items = 0;
244                    let mut meta_items = 0;
245                    let mut task_cache_items = 0;
246                    for SnapshotItem {
247                        task_id,
248                        meta,
249                        data,
250                        task_type_hash,
251                    } in shard
252                    {
253                        let key = IntKey::new(*task_id);
254                        let key = key.as_ref();
255                        if let Some(meta) = meta {
256                            batch.put(
257                                KeySpace::TaskMeta,
258                                WriteBuffer::Borrowed(key),
259                                WriteBuffer::SmallVec(meta),
260                            )?;
261                            meta_items += 1;
262                        }
263                        if let Some(data) = data {
264                            batch.put(
265                                KeySpace::TaskData,
266                                WriteBuffer::Borrowed(key),
267                                WriteBuffer::SmallVec(data),
268                            )?;
269                            data_items += 1;
270                        }
271                        // Write task cache entry inline if this is a new task
272                        if let Some(task_type_hash) = task_type_hash {
273                            batch.put(
274                                KeySpace::TaskCache,
275                                WriteBuffer::Borrowed(&task_type_hash),
276                                WriteBuffer::Borrowed(key),
277                            )?;
278                            task_cache_items += 1;
279                            max_new_task_id = max_new_task_id.max(*task_id);
280                        }
281                    }
282                    Ok(SnapshotMeta {
283                        data_items,
284                        meta_items,
285                        task_cache_items,
286                        // The on-disk byte totals aren't known until the batch is committed below;
287                        // they're filled in from `CommitStats` after `batch.commit()`.
288                        bytes_written: 0,
289                        bytes_deleted: 0,
290                        max_next_task_id: max_new_task_id,
291                    })
292                })?
293                .into_iter()
294                .reduce(|t1, t2| t1.merge(t2))
295                .unwrap_or_default();
296
297            let span = tracing::trace_span!("flush task data").entered();
298            parallel::try_for_each(
299                &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
300                |&key_space| {
301                    let _span = span.clone().entered();
302                    // Safety: `map_collect_owned` has returned, so no concurrent `put` or
303                    // `delete` on these key spaces are in-flight.
304                    unsafe { batch.flush(key_space) }
305                },
306            )?;
307
308            let mut next_task_id = get_next_free_task_id(&batch)?;
309            next_task_id = next_task_id.max(snapshot_meta.max_next_task_id + 1);
310
311            save_infra(&batch, next_task_id, operations)?;
312            {
313                let _span = tracing::trace_span!("commit").entered();
314                // Byte totals are the physical on-disk bytes (post-compression, including .sst /
315                // .blob / .meta files) produced and removed by the commit.
316                let stats = batch.commit().context("Unable to commit operations")?;
317                snapshot_meta.bytes_written = stats.bytes_written;
318                snapshot_meta.bytes_deleted = stats.bytes_deleted;
319            }
320            Ok(snapshot_meta)
321        }
322    }
323
324    pub(crate) fn lookup_task_candidates(
325        &self,
326        native_fn: &'static NativeFunction,
327        this: Option<RawVc>,
328        arg: &dyn DynTaskInputs,
329    ) -> Result<SmallVec<[TaskId; 1]>> {
330        let inner = &*self.inner;
331        if inner.database.is_empty() {
332            // Checking if the database is empty is a performance optimization
333            // to avoid computing the hash.
334            return Ok(SmallVec::new());
335        }
336        let hash = compute_task_type_hash_from_components(native_fn, this, arg);
337        let buffers = inner
338            .database
339            .get_multiple(KeySpace::TaskCache, &hash)
340            .with_context(|| {
341                format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
342            })?;
343
344        let mut task_ids = SmallVec::with_capacity(buffers.len());
345        for bytes in buffers {
346            let bytes = Borrow::<[u8]>::borrow(&bytes).try_into()?;
347            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
348            task_ids.push(id);
349        }
350        Ok(task_ids)
351    }
352
353    pub(crate) fn lookup_data(
354        &self,
355        task_id: TaskId,
356        category: SpecificTaskDataCategory,
357        storage: &mut TaskStorage,
358    ) -> Result<()> {
359        let inner = &*self.inner;
360        let Some(bytes) = inner
361            .database
362            .get(category.key_space(), IntKey::new(*task_id).as_ref())
363            .with_context(|| {
364                format!("Looking up task storage for {task_id} from database failed")
365            })?
366        else {
367            return Ok(());
368        };
369        let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
370        storage
371            .decode(category, &mut decoder)
372            .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
373    }
374
375    pub(crate) fn batch_lookup_data(
376        &self,
377        task_ids: &[TaskId],
378        category: SpecificTaskDataCategory,
379    ) -> Result<Vec<TaskStorage>> {
380        let inner = &*self.inner;
381        let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
382        let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
383        let bytes = inner
384            .database
385            .batch_get(category.key_space(), &keys)
386            .with_context(|| {
387                format!(
388                    "Looking up typed data for {} tasks from database failed",
389                    task_ids.len()
390                )
391            })?;
392        bytes
393            .into_iter()
394            .map(|opt_bytes| {
395                let mut storage = TaskStorage::new();
396                if let Some(bytes) = opt_bytes {
397                    let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
398                    storage
399                        .decode(category, &mut decoder)
400                        .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
401                }
402                Ok(storage)
403            })
404            .collect::<Result<Vec<_>>>()
405    }
406
407    pub(crate) fn compact(&self) -> Result<Option<CommitStats>> {
408        self.inner.database.compact()
409    }
410
411    pub(crate) fn shutdown(&self) -> Result<()> {
412        self.inner.database.shutdown()
413    }
414
415    pub(crate) fn has_unrecoverable_write_error(&self) -> bool {
416        self.inner.database.has_unrecoverable_write_error()
417    }
418}
419
420fn get_next_free_task_id(batch: &TurboWriteBatch<'_>) -> Result<u32, anyhow::Error> {
421    Ok(
422        match batch.get(
423            KeySpace::Infra,
424            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
425        )? {
426            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
427            None => 1,
428        },
429    )
430}
431
432fn save_infra(
433    batch: &TurboWriteBatch<'_>,
434    next_task_id: u32,
435    operations: Vec<Arc<AnyOperation>>,
436) -> Result<(), anyhow::Error> {
437    batch
438        .put(
439            KeySpace::Infra,
440            WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
441            WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
442        )
443        .context("Unable to write next free task id")?;
444    {
445        let _span =
446            tracing::trace_span!("update operations", operations = operations.len()).entered();
447        let operations =
448            turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
449        batch
450            .put(
451                KeySpace::Infra,
452                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
453                WriteBuffer::SmallVec(operations),
454            )
455            .context("Unable to write operations")?;
456    }
457    // Safety: save_infra is called after all concurrent writes to Infra are done.
458    unsafe { batch.flush(KeySpace::Infra)? };
459    Ok(())
460}
461
462#[cfg(test)]
463mod tests {
464    use std::borrow::Borrow;
465
466    use turbo_tasks::TaskId;
467
468    use super::*;
469    use crate::database::{turbo::TurboKeyValueDatabase, write_batch::WriteBuffer};
470
471    /// Helper to write to the database using the concurrent batch API.
472    fn write_task_cache_entry(
473        db: &TurboKeyValueDatabase,
474        hash: u64,
475        task_id: TaskId,
476    ) -> Result<()> {
477        let batch = db.write_batch()?;
478        batch.put(
479            KeySpace::TaskCache,
480            WriteBuffer::Borrowed(&hash.to_le_bytes()),
481            WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
482        )?;
483        batch.commit()?;
484        Ok(())
485    }
486
487    /// Tests that `get_multiple` correctly returns multiple TaskIds when the same hash key
488    /// is used (simulating a hash collision scenario).
489    ///
490    /// This is a lower-level test that verifies the database layer correctly handles
491    /// the case where multiple task IDs are stored under the same hash key.
492    #[tokio::test(flavor = "multi_thread")]
493    async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
494        let tempdir = tempfile::tempdir()?;
495        let path = tempdir.path();
496
497        // Use is_short_session=true to disable background compaction (which requires turbo-tasks
498        // context)
499        let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
500
501        // Simulate a hash collision by writing multiple TaskIds with the same hash key
502        let collision_hash: u64 = 0xDEADBEEF;
503        let task_id_1 = TaskId::try_from(100u32).unwrap();
504        let task_id_2 = TaskId::try_from(200u32).unwrap();
505        let task_id_3 = TaskId::try_from(300u32).unwrap();
506
507        // Write three task IDs under the same hash key (simulating collision)
508        // Each write creates a new SST file, so all three will be returned by get_multiple
509        write_task_cache_entry(&db, collision_hash, task_id_1)?;
510        write_task_cache_entry(&db, collision_hash, task_id_2)?;
511        write_task_cache_entry(&db, collision_hash, task_id_3)?;
512
513        // Now query using get_multiple - should return all three TaskIds
514        let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
515
516        assert_eq!(
517            results.len(),
518            3,
519            "Should return all 3 task IDs for the colliding hash"
520        );
521
522        // Convert results to TaskIds and verify all three are present
523        let mut found_ids: Vec<TaskId> = results
524            .iter()
525            .map(|bytes| {
526                let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
527                TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
528            })
529            .collect();
530        found_ids.sort_by_key(|id| **id);
531
532        assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
533
534        db.shutdown()?;
535        Ok(())
536    }
537
538    /// Tests that multiple distinct keys written in a single batch with flush can be read back.
539    /// This mirrors the actual save_snapshot pattern: write many TaskCache entries, flush, commit.
540    #[tokio::test(flavor = "multi_thread")]
541    async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
542        let tempdir = tempfile::tempdir()?;
543        let path = tempdir.path();
544
545        let n = 100_000;
546        let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
547        let task_ids: Vec<TaskId> = (1..=n as u32)
548            .map(|i| TaskId::try_from(i).unwrap())
549            .collect();
550
551        // Write all entries in a single batch with flush (like save_snapshot does)
552        {
553            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
554            let batch = db.write_batch()?;
555
556            for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
557                batch.put(
558                    KeySpace::TaskCache,
559                    WriteBuffer::Borrowed(&hash.to_le_bytes()),
560                    WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
561                )?;
562            }
563            // Flush TaskCache (like the new code does)
564            unsafe { batch.flush(KeySpace::TaskCache) }?;
565            batch.commit()?;
566
567            db.shutdown()?;
568        }
569
570        // Reopen and verify all entries are readable
571        {
572            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
573            let mut found = 0;
574            let mut missing = 0;
575            for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
576                let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
577                if results.is_empty() {
578                    missing += 1;
579                } else {
580                    found += 1;
581                    let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
582                    let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
583                    assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
584                }
585            }
586            assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
587            db.shutdown()?;
588        }
589
590        Ok(())
591    }
592}