Skip to main content

turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_tasks::{
12    DynTaskInputs, RawVc, TaskId,
13    macro_helpers::NativeFunction,
14    panic_hooks::{PanicHookGuard, register_panic_hook},
15    parallel,
16};
17
18use crate::{
19    GitVersionInfo,
20    backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
21    backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash_from_components},
22    database::{
23        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
24        db_versioning::handle_db_versioning,
25        key_value_database::KeySpace,
26        turbo::{TurboKeyValueDatabase, TurboWriteBatch},
27        write_batch::WriteBuffer,
28    },
29    db_invalidation::invalidation_reasons,
30};
31
32const META_KEY_OPERATIONS: u32 = 0;
33const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
34
35struct IntKey([u8; 4]);
36
37impl IntKey {
38    fn new(value: u32) -> Self {
39        Self(value.to_le_bytes())
40    }
41}
42
43impl AsRef<[u8]> for IntKey {
44    fn as_ref(&self) -> &[u8] {
45        &self.0
46    }
47}
48
49fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
50    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
51    Ok(n)
52}
53
54// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
55// problems in turbo-tasks.
56//
57// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
58// won't report bugs to us, and we'll never find root-causes for these problems.
59//
60// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
61// feel these pain points and fix the root causes of bugs.
62fn should_invalidate_on_panic() -> bool {
63    fn env_is_falsy(key: &str) -> bool {
64        env::var_os(key)
65            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
66    }
67    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
68        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
69    });
70    *SHOULD_INVALIDATE
71}
72
73struct TurboBackingStorageInner {
74    database: TurboKeyValueDatabase,
75    /// Used when calling [`TurboBackingStorage::invalidate`]. Can be `None` in the
76    /// memory-only/no-op storage case.
77    base_path: Option<PathBuf>,
78    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
79    invalidated: Mutex<bool>,
80    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
81    /// drop.
82    _panic_hook_guard: Option<PanicHookGuard>,
83}
84
85/// The higher-level backing storage passed to [`TurboTasksBackend::new`], used by
86/// [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
87///
88/// Wraps a low-level [`TurboKeyValueDatabase`] and adapts it into the persistence operations the
89/// backend needs (snapshots, task-candidate lookups, etc.).
90///
91/// [`TurboTasksBackend::new`]: crate::TurboTasksBackend::new
92pub struct TurboBackingStorage {
93    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
94    inner: Arc<TurboBackingStorageInner>,
95}
96
97impl TurboBackingStorage {
98    pub(crate) fn new_in_memory(database: TurboKeyValueDatabase) -> Self {
99        Self {
100            inner: Arc::new(TurboBackingStorageInner {
101                database,
102                base_path: None,
103                invalidated: Mutex::new(false),
104                _panic_hook_guard: None,
105            }),
106        }
107    }
108
109    /// Handles boilerplate logic for an on-disk persisted database with versioning.
110    ///
111    /// - Creates a directory per version, with a maximum number of old versions and performs
112    ///   automatic cleanup of old versions.
113    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
114    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
115    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
116    ///
117    /// Along with returning a [`TurboBackingStorage`], this returns a
118    /// [`StartupCacheState`], which can be used by the application for logging information to the
119    /// user or telemetry about the cache.
120    pub(crate) fn open_versioned_on_disk(
121        base_path: PathBuf,
122        version_info: &GitVersionInfo,
123        is_ci: bool,
124        database: impl FnOnce(PathBuf) -> Result<TurboKeyValueDatabase>,
125    ) -> Result<(Self, StartupCacheState)> {
126        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
127            .context("Failed to check database invalidation and cleanup")?;
128        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
129            .context("Failed to handle database versioning")?;
130        let database = (database)(versioned_path).context("Failed to open database")?;
131        let backing_storage = Self {
132            inner: Arc::new_cyclic(move |weak_inner: &Weak<TurboBackingStorageInner>| {
133                let panic_hook_guard = if should_invalidate_on_panic() {
134                    let weak_inner = weak_inner.clone();
135                    Some(register_panic_hook(Box::new(move |_| {
136                        let Some(inner) = weak_inner.upgrade() else {
137                            return;
138                        };
139                        // If a panic happened that must mean something deep inside of turbopack
140                        // or turbo-tasks failed, and it may be hard to recover. We don't want
141                        // the cache to stick around, as that may persist bugs. Make a
142                        // best-effort attempt to invalidate the database (ignoring failures).
143                        let _ = inner.invalidate(invalidation_reasons::PANIC);
144                    })))
145                } else {
146                    None
147                };
148                TurboBackingStorageInner {
149                    database,
150                    base_path: Some(base_path),
151                    invalidated: Mutex::new(false),
152                    _panic_hook_guard: panic_hook_guard,
153                }
154            }),
155        };
156        Ok((backing_storage, startup_cache_state))
157    }
158}
159
160impl TurboBackingStorageInner {
161    fn invalidate(&self, reason_code: &str) -> Result<()> {
162        // `base_path` is `None` for in-memory backing storage (see `noop_backing_storage`).
163        if let Some(base_path) = &self.base_path {
164            // Invalidation could happen frequently if there's a bunch of panics. We only need to
165            // invalidate once, so grab a lock.
166            let mut invalidated_guard = self
167                .invalidated
168                .lock()
169                .unwrap_or_else(PoisonError::into_inner);
170            if *invalidated_guard {
171                return Ok(());
172            }
173            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
174            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
175            // partial writes.
176            invalidate_db(base_path, reason_code)?;
177            self.database.prevent_writes();
178            // Avoid redundant invalidations from future panics
179            *invalidated_guard = true;
180        }
181        Ok(())
182    }
183
184    /// Used to read the next free task ID from the database.
185    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
186        self.database
187            .get(KeySpace::Infra, IntKey::new(key).as_ref())?
188            .map(as_u32)
189            .transpose()
190    }
191}
192
193impl TurboBackingStorage {
194    /// Called when the database should be invalidated upon re-initialization.
195    ///
196    /// This typically means that we'll restart the process or `turbo-tasks` soon with a fresh
197    /// database. If this happens, there's no point in writing anything else to disk, or flushing
198    /// during [`TurboTasksBackend::stop`].
199    ///
200    /// [`TurboTasksBackend::stop`]: turbo_tasks::backend::Backend::stop
201    pub(crate) fn invalidate(&self, reason_code: &str) -> Result<()> {
202        self.inner.invalidate(reason_code)
203    }
204
205    pub(crate) fn next_free_task_id(&self) -> Result<TaskId> {
206        Ok(self
207            .inner
208            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
209            .context("Unable to read next free task id from database")?
210            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
211    }
212
213    pub(crate) fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
214        fn get(database: &TurboKeyValueDatabase) -> Result<Vec<AnyOperation>> {
215            let Some(operations) =
216                database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
217            else {
218                return Ok(Vec::new());
219            };
220            let operations = turbo_bincode_decode(operations.borrow())?;
221            Ok(operations)
222        }
223        get(&self.inner.database).context("Unable to read uncompleted operations from database")
224    }
225
226    pub(crate) fn save_snapshot<I>(
227        &self,
228        operations: Vec<Arc<AnyOperation>>,
229        snapshots: Vec<I>,
230    ) -> Result<SnapshotMeta>
231    where
232        I: IntoIterator<Item = SnapshotItem> + Send + Sync,
233    {
234        let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
235        let batch = self.inner.database.write_batch()?;
236
237        {
238            let _span = tracing::trace_span!("update task data").entered();
239            let snapshot_meta =
240                parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
241                    let mut max_new_task_id = 0;
242                    let mut data_items = 0;
243                    let mut meta_items = 0;
244                    let mut task_cache_items = 0;
245                    for SnapshotItem {
246                        task_id,
247                        meta,
248                        data,
249                        task_type_hash,
250                    } in shard
251                    {
252                        let key = IntKey::new(*task_id);
253                        let key = key.as_ref();
254                        if let Some(meta) = meta {
255                            batch.put(
256                                KeySpace::TaskMeta,
257                                WriteBuffer::Borrowed(key),
258                                WriteBuffer::SmallVec(meta),
259                            )?;
260                            meta_items += 1;
261                        }
262                        if let Some(data) = data {
263                            batch.put(
264                                KeySpace::TaskData,
265                                WriteBuffer::Borrowed(key),
266                                WriteBuffer::SmallVec(data),
267                            )?;
268                            data_items += 1;
269                        }
270                        // Write task cache entry inline if this is a new task
271                        if let Some(task_type_hash) = task_type_hash {
272                            batch.put(
273                                KeySpace::TaskCache,
274                                WriteBuffer::Borrowed(&task_type_hash),
275                                WriteBuffer::Borrowed(key),
276                            )?;
277                            task_cache_items += 1;
278                            max_new_task_id = max_new_task_id.max(*task_id);
279                        }
280                    }
281                    Ok(SnapshotMeta {
282                        data_items,
283                        meta_items,
284                        task_cache_items,
285                        max_next_task_id: max_new_task_id,
286                    })
287                })?
288                .into_iter()
289                .reduce(|t1, t2| t1.merge(t2))
290                .unwrap_or_default();
291
292            let span = tracing::trace_span!("flush task data").entered();
293            parallel::try_for_each(
294                &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
295                |&key_space| {
296                    let _span = span.clone().entered();
297                    // Safety: `map_collect_owned` has returned, so no concurrent `put` or
298                    // `delete` on these key spaces are in-flight.
299                    unsafe { batch.flush(key_space) }
300                },
301            )?;
302
303            let mut next_task_id = get_next_free_task_id(&batch)?;
304            next_task_id = next_task_id.max(snapshot_meta.max_next_task_id + 1);
305
306            save_infra(&batch, next_task_id, operations)?;
307            {
308                let _span = tracing::trace_span!("commit").entered();
309                batch.commit().context("Unable to commit operations")?;
310            }
311            Ok(snapshot_meta)
312        }
313    }
314
315    pub(crate) fn lookup_task_candidates(
316        &self,
317        native_fn: &'static NativeFunction,
318        this: Option<RawVc>,
319        arg: &dyn DynTaskInputs,
320    ) -> Result<SmallVec<[TaskId; 1]>> {
321        let inner = &*self.inner;
322        if inner.database.is_empty() {
323            // Checking if the database is empty is a performance optimization
324            // to avoid computing the hash.
325            return Ok(SmallVec::new());
326        }
327        let hash = compute_task_type_hash_from_components(native_fn, this, arg);
328        let buffers = inner
329            .database
330            .get_multiple(KeySpace::TaskCache, &hash)
331            .with_context(|| {
332                format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
333            })?;
334
335        let mut task_ids = SmallVec::with_capacity(buffers.len());
336        for bytes in buffers {
337            let bytes = Borrow::<[u8]>::borrow(&bytes).try_into()?;
338            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
339            task_ids.push(id);
340        }
341        Ok(task_ids)
342    }
343
344    pub(crate) fn lookup_data(
345        &self,
346        task_id: TaskId,
347        category: SpecificTaskDataCategory,
348        storage: &mut TaskStorage,
349    ) -> Result<()> {
350        let inner = &*self.inner;
351        let Some(bytes) = inner
352            .database
353            .get(category.key_space(), IntKey::new(*task_id).as_ref())
354            .with_context(|| {
355                format!("Looking up task storage for {task_id} from database failed")
356            })?
357        else {
358            return Ok(());
359        };
360        let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
361        storage
362            .decode(category, &mut decoder)
363            .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
364    }
365
366    pub(crate) fn batch_lookup_data(
367        &self,
368        task_ids: &[TaskId],
369        category: SpecificTaskDataCategory,
370    ) -> Result<Vec<TaskStorage>> {
371        let inner = &*self.inner;
372        let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
373        let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
374        let bytes = inner
375            .database
376            .batch_get(category.key_space(), &keys)
377            .with_context(|| {
378                format!(
379                    "Looking up typed data for {} tasks from database failed",
380                    task_ids.len()
381                )
382            })?;
383        bytes
384            .into_iter()
385            .map(|opt_bytes| {
386                let mut storage = TaskStorage::new();
387                if let Some(bytes) = opt_bytes {
388                    let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
389                    storage
390                        .decode(category, &mut decoder)
391                        .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
392                }
393                Ok(storage)
394            })
395            .collect::<Result<Vec<_>>>()
396    }
397
398    pub(crate) fn compact(&self) -> Result<bool> {
399        self.inner.database.compact()
400    }
401
402    pub(crate) fn shutdown(&self) -> Result<()> {
403        self.inner.database.shutdown()
404    }
405
406    pub(crate) fn has_unrecoverable_write_error(&self) -> bool {
407        self.inner.database.has_unrecoverable_write_error()
408    }
409}
410
411fn get_next_free_task_id(batch: &TurboWriteBatch<'_>) -> Result<u32, anyhow::Error> {
412    Ok(
413        match batch.get(
414            KeySpace::Infra,
415            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
416        )? {
417            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
418            None => 1,
419        },
420    )
421}
422
423fn save_infra(
424    batch: &TurboWriteBatch<'_>,
425    next_task_id: u32,
426    operations: Vec<Arc<AnyOperation>>,
427) -> Result<(), anyhow::Error> {
428    batch
429        .put(
430            KeySpace::Infra,
431            WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
432            WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
433        )
434        .context("Unable to write next free task id")?;
435    {
436        let _span =
437            tracing::trace_span!("update operations", operations = operations.len()).entered();
438        let operations =
439            turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
440        batch
441            .put(
442                KeySpace::Infra,
443                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
444                WriteBuffer::SmallVec(operations),
445            )
446            .context("Unable to write operations")?;
447    }
448    // Safety: save_infra is called after all concurrent writes to Infra are done.
449    unsafe { batch.flush(KeySpace::Infra)? };
450    Ok(())
451}
452
453#[cfg(test)]
454mod tests {
455    use std::borrow::Borrow;
456
457    use turbo_tasks::TaskId;
458
459    use super::*;
460    use crate::database::{turbo::TurboKeyValueDatabase, write_batch::WriteBuffer};
461
462    /// Helper to write to the database using the concurrent batch API.
463    fn write_task_cache_entry(
464        db: &TurboKeyValueDatabase,
465        hash: u64,
466        task_id: TaskId,
467    ) -> Result<()> {
468        let batch = db.write_batch()?;
469        batch.put(
470            KeySpace::TaskCache,
471            WriteBuffer::Borrowed(&hash.to_le_bytes()),
472            WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
473        )?;
474        batch.commit()?;
475        Ok(())
476    }
477
478    /// Tests that `get_multiple` correctly returns multiple TaskIds when the same hash key
479    /// is used (simulating a hash collision scenario).
480    ///
481    /// This is a lower-level test that verifies the database layer correctly handles
482    /// the case where multiple task IDs are stored under the same hash key.
483    #[tokio::test(flavor = "multi_thread")]
484    async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
485        let tempdir = tempfile::tempdir()?;
486        let path = tempdir.path();
487
488        // Use is_short_session=true to disable background compaction (which requires turbo-tasks
489        // context)
490        let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
491
492        // Simulate a hash collision by writing multiple TaskIds with the same hash key
493        let collision_hash: u64 = 0xDEADBEEF;
494        let task_id_1 = TaskId::try_from(100u32).unwrap();
495        let task_id_2 = TaskId::try_from(200u32).unwrap();
496        let task_id_3 = TaskId::try_from(300u32).unwrap();
497
498        // Write three task IDs under the same hash key (simulating collision)
499        // Each write creates a new SST file, so all three will be returned by get_multiple
500        write_task_cache_entry(&db, collision_hash, task_id_1)?;
501        write_task_cache_entry(&db, collision_hash, task_id_2)?;
502        write_task_cache_entry(&db, collision_hash, task_id_3)?;
503
504        // Now query using get_multiple - should return all three TaskIds
505        let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
506
507        assert_eq!(
508            results.len(),
509            3,
510            "Should return all 3 task IDs for the colliding hash"
511        );
512
513        // Convert results to TaskIds and verify all three are present
514        let mut found_ids: Vec<TaskId> = results
515            .iter()
516            .map(|bytes| {
517                let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
518                TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
519            })
520            .collect();
521        found_ids.sort_by_key(|id| **id);
522
523        assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
524
525        db.shutdown()?;
526        Ok(())
527    }
528
529    /// Tests that multiple distinct keys written in a single batch with flush can be read back.
530    /// This mirrors the actual save_snapshot pattern: write many TaskCache entries, flush, commit.
531    #[tokio::test(flavor = "multi_thread")]
532    async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
533        let tempdir = tempfile::tempdir()?;
534        let path = tempdir.path();
535
536        let n = 100_000;
537        let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
538        let task_ids: Vec<TaskId> = (1..=n as u32)
539            .map(|i| TaskId::try_from(i).unwrap())
540            .collect();
541
542        // Write all entries in a single batch with flush (like save_snapshot does)
543        {
544            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
545            let batch = db.write_batch()?;
546
547            for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
548                batch.put(
549                    KeySpace::TaskCache,
550                    WriteBuffer::Borrowed(&hash.to_le_bytes()),
551                    WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
552                )?;
553            }
554            // Flush TaskCache (like the new code does)
555            unsafe { batch.flush(KeySpace::TaskCache) }?;
556            batch.commit()?;
557
558            db.shutdown()?;
559        }
560
561        // Reopen and verify all entries are readable
562        {
563            let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
564            let mut found = 0;
565            let mut missing = 0;
566            for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
567                let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
568                if results.is_empty() {
569                    missing += 1;
570                } else {
571                    found += 1;
572                    let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
573                    let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
574                    assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
575                }
576            }
577            assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
578            db.shutdown()?;
579        }
580
581        Ok(())
582    }
583}