1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_persistence::CommitStats;
12use turbo_tasks::{
13 DynTaskInputs, RawVc, TaskId,
14 macro_helpers::NativeFunction,
15 panic_hooks::{PanicHookGuard, register_panic_hook},
16 parallel,
17};
18
19use crate::{
20 GitVersionInfo,
21 backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
22 backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash_from_components},
23 database::{
24 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
25 db_versioning::handle_db_versioning,
26 key_value_database::KeySpace,
27 turbo::{TurboKeyValueDatabase, TurboWriteBatch},
28 write_batch::WriteBuffer,
29 },
30 db_invalidation::invalidation_reasons,
31};
32
33const META_KEY_OPERATIONS: u32 = 0;
34const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
35
36struct IntKey([u8; 4]);
37
38impl IntKey {
39 fn new(value: u32) -> Self {
40 Self(value.to_le_bytes())
41 }
42}
43
44impl AsRef<[u8]> for IntKey {
45 fn as_ref(&self) -> &[u8] {
46 &self.0
47 }
48}
49
50fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
51 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
52 Ok(n)
53}
54
55fn should_invalidate_on_panic() -> bool {
64 fn env_is_falsy(key: &str) -> bool {
65 env::var_os(key)
66 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
67 }
68 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
69 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
70 });
71 *SHOULD_INVALIDATE
72}
73
74struct TurboBackingStorageInner {
75 database: TurboKeyValueDatabase,
76 base_path: Option<PathBuf>,
79 invalidated: Mutex<bool>,
81 _panic_hook_guard: Option<PanicHookGuard>,
84}
85
86pub struct TurboBackingStorage {
94 inner: Arc<TurboBackingStorageInner>,
96}
97
98impl TurboBackingStorage {
99 pub(crate) fn new_in_memory(database: TurboKeyValueDatabase) -> Self {
100 Self {
101 inner: Arc::new(TurboBackingStorageInner {
102 database,
103 base_path: None,
104 invalidated: Mutex::new(false),
105 _panic_hook_guard: None,
106 }),
107 }
108 }
109
110 pub(crate) fn open_versioned_on_disk(
122 base_path: PathBuf,
123 version_info: &GitVersionInfo,
124 is_ci: bool,
125 database: impl FnOnce(PathBuf) -> Result<TurboKeyValueDatabase>,
126 ) -> Result<(Self, StartupCacheState)> {
127 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
128 .context("Failed to check database invalidation and cleanup")?;
129 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
130 .context("Failed to handle database versioning")?;
131 let database = (database)(versioned_path).context("Failed to open database")?;
132 let backing_storage = Self {
133 inner: Arc::new_cyclic(move |weak_inner: &Weak<TurboBackingStorageInner>| {
134 let panic_hook_guard = if should_invalidate_on_panic() {
135 let weak_inner = weak_inner.clone();
136 Some(register_panic_hook(Box::new(move |_| {
137 let Some(inner) = weak_inner.upgrade() else {
138 return;
139 };
140 let _ = inner.invalidate(invalidation_reasons::PANIC);
145 })))
146 } else {
147 None
148 };
149 TurboBackingStorageInner {
150 database,
151 base_path: Some(base_path),
152 invalidated: Mutex::new(false),
153 _panic_hook_guard: panic_hook_guard,
154 }
155 }),
156 };
157 Ok((backing_storage, startup_cache_state))
158 }
159}
160
161impl TurboBackingStorageInner {
162 fn invalidate(&self, reason_code: &str) -> Result<()> {
163 if let Some(base_path) = &self.base_path {
165 let mut invalidated_guard = self
168 .invalidated
169 .lock()
170 .unwrap_or_else(PoisonError::into_inner);
171 if *invalidated_guard {
172 return Ok(());
173 }
174 invalidate_db(base_path, reason_code)?;
178 self.database.prevent_writes();
179 *invalidated_guard = true;
181 }
182 Ok(())
183 }
184
185 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
187 self.database
188 .get(KeySpace::Infra, IntKey::new(key).as_ref())?
189 .map(as_u32)
190 .transpose()
191 }
192}
193
194impl TurboBackingStorage {
195 pub(crate) fn invalidate(&self, reason_code: &str) -> Result<()> {
203 self.inner.invalidate(reason_code)
204 }
205
206 pub(crate) fn next_free_task_id(&self) -> Result<TaskId> {
207 Ok(self
208 .inner
209 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
210 .context("Unable to read next free task id from database")?
211 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
212 }
213
214 pub(crate) fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
215 fn get(database: &TurboKeyValueDatabase) -> Result<Vec<AnyOperation>> {
216 let Some(operations) =
217 database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
218 else {
219 return Ok(Vec::new());
220 };
221 let operations = turbo_bincode_decode(operations.borrow())?;
222 Ok(operations)
223 }
224 get(&self.inner.database).context("Unable to read uncompleted operations from database")
225 }
226
227 pub(crate) fn save_snapshot<I>(
228 &self,
229 operations: Vec<Arc<AnyOperation>>,
230 snapshots: Vec<I>,
231 ) -> Result<SnapshotMeta>
232 where
233 I: IntoIterator<Item = SnapshotItem> + Send + Sync,
234 {
235 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
236 let batch = self.inner.database.write_batch()?;
237
238 {
239 let _span = tracing::trace_span!("update task data").entered();
240 let mut snapshot_meta =
241 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
242 let mut max_new_task_id = 0;
243 let mut data_items = 0;
244 let mut meta_items = 0;
245 let mut task_cache_items = 0;
246 for SnapshotItem {
247 task_id,
248 meta,
249 data,
250 task_type_hash,
251 } in shard
252 {
253 let key = IntKey::new(*task_id);
254 let key = key.as_ref();
255 if let Some(meta) = meta {
256 batch.put(
257 KeySpace::TaskMeta,
258 WriteBuffer::Borrowed(key),
259 WriteBuffer::SmallVec(meta),
260 )?;
261 meta_items += 1;
262 }
263 if let Some(data) = data {
264 batch.put(
265 KeySpace::TaskData,
266 WriteBuffer::Borrowed(key),
267 WriteBuffer::SmallVec(data),
268 )?;
269 data_items += 1;
270 }
271 if let Some(task_type_hash) = task_type_hash {
273 batch.put(
274 KeySpace::TaskCache,
275 WriteBuffer::Borrowed(&task_type_hash),
276 WriteBuffer::Borrowed(key),
277 )?;
278 task_cache_items += 1;
279 max_new_task_id = max_new_task_id.max(*task_id);
280 }
281 }
282 Ok(SnapshotMeta {
283 data_items,
284 meta_items,
285 task_cache_items,
286 bytes_written: 0,
289 bytes_deleted: 0,
290 max_next_task_id: max_new_task_id,
291 })
292 })?
293 .into_iter()
294 .reduce(|t1, t2| t1.merge(t2))
295 .unwrap_or_default();
296
297 let span = tracing::trace_span!("flush task data").entered();
298 parallel::try_for_each(
299 &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
300 |&key_space| {
301 let _span = span.clone().entered();
302 unsafe { batch.flush(key_space) }
305 },
306 )?;
307
308 let mut next_task_id = get_next_free_task_id(&batch)?;
309 next_task_id = next_task_id.max(snapshot_meta.max_next_task_id + 1);
310
311 save_infra(&batch, next_task_id, operations)?;
312 {
313 let _span = tracing::trace_span!("commit").entered();
314 let stats = batch.commit().context("Unable to commit operations")?;
317 snapshot_meta.bytes_written = stats.bytes_written;
318 snapshot_meta.bytes_deleted = stats.bytes_deleted;
319 }
320 Ok(snapshot_meta)
321 }
322 }
323
324 pub(crate) fn lookup_task_candidates(
325 &self,
326 native_fn: &'static NativeFunction,
327 this: Option<RawVc>,
328 arg: &dyn DynTaskInputs,
329 ) -> Result<SmallVec<[TaskId; 1]>> {
330 let inner = &*self.inner;
331 if inner.database.is_empty() {
332 return Ok(SmallVec::new());
335 }
336 let hash = compute_task_type_hash_from_components(native_fn, this, arg);
337 let buffers = inner
338 .database
339 .get_multiple(KeySpace::TaskCache, &hash)
340 .with_context(|| {
341 format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
342 })?;
343
344 let mut task_ids = SmallVec::with_capacity(buffers.len());
345 for bytes in buffers {
346 let bytes = Borrow::<[u8]>::borrow(&bytes).try_into()?;
347 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
348 task_ids.push(id);
349 }
350 Ok(task_ids)
351 }
352
353 pub(crate) fn lookup_data(
354 &self,
355 task_id: TaskId,
356 category: SpecificTaskDataCategory,
357 storage: &mut TaskStorage,
358 ) -> Result<()> {
359 let inner = &*self.inner;
360 let Some(bytes) = inner
361 .database
362 .get(category.key_space(), IntKey::new(*task_id).as_ref())
363 .with_context(|| {
364 format!("Looking up task storage for {task_id} from database failed")
365 })?
366 else {
367 return Ok(());
368 };
369 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
370 storage
371 .decode(category, &mut decoder)
372 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
373 }
374
375 pub(crate) fn batch_lookup_data(
376 &self,
377 task_ids: &[TaskId],
378 category: SpecificTaskDataCategory,
379 ) -> Result<Vec<TaskStorage>> {
380 let inner = &*self.inner;
381 let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
382 let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
383 let bytes = inner
384 .database
385 .batch_get(category.key_space(), &keys)
386 .with_context(|| {
387 format!(
388 "Looking up typed data for {} tasks from database failed",
389 task_ids.len()
390 )
391 })?;
392 bytes
393 .into_iter()
394 .map(|opt_bytes| {
395 let mut storage = TaskStorage::new();
396 if let Some(bytes) = opt_bytes {
397 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
398 storage
399 .decode(category, &mut decoder)
400 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
401 }
402 Ok(storage)
403 })
404 .collect::<Result<Vec<_>>>()
405 }
406
407 pub(crate) fn compact(&self) -> Result<Option<CommitStats>> {
408 self.inner.database.compact()
409 }
410
411 pub(crate) fn shutdown(&self) -> Result<()> {
412 self.inner.database.shutdown()
413 }
414
415 pub(crate) fn has_unrecoverable_write_error(&self) -> bool {
416 self.inner.database.has_unrecoverable_write_error()
417 }
418}
419
420fn get_next_free_task_id(batch: &TurboWriteBatch<'_>) -> Result<u32, anyhow::Error> {
421 Ok(
422 match batch.get(
423 KeySpace::Infra,
424 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
425 )? {
426 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
427 None => 1,
428 },
429 )
430}
431
432fn save_infra(
433 batch: &TurboWriteBatch<'_>,
434 next_task_id: u32,
435 operations: Vec<Arc<AnyOperation>>,
436) -> Result<(), anyhow::Error> {
437 batch
438 .put(
439 KeySpace::Infra,
440 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
441 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
442 )
443 .context("Unable to write next free task id")?;
444 {
445 let _span =
446 tracing::trace_span!("update operations", operations = operations.len()).entered();
447 let operations =
448 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
449 batch
450 .put(
451 KeySpace::Infra,
452 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
453 WriteBuffer::SmallVec(operations),
454 )
455 .context("Unable to write operations")?;
456 }
457 unsafe { batch.flush(KeySpace::Infra)? };
459 Ok(())
460}
461
462#[cfg(test)]
463mod tests {
464 use std::borrow::Borrow;
465
466 use turbo_tasks::TaskId;
467
468 use super::*;
469 use crate::database::{turbo::TurboKeyValueDatabase, write_batch::WriteBuffer};
470
471 fn write_task_cache_entry(
473 db: &TurboKeyValueDatabase,
474 hash: u64,
475 task_id: TaskId,
476 ) -> Result<()> {
477 let batch = db.write_batch()?;
478 batch.put(
479 KeySpace::TaskCache,
480 WriteBuffer::Borrowed(&hash.to_le_bytes()),
481 WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
482 )?;
483 batch.commit()?;
484 Ok(())
485 }
486
487 #[tokio::test(flavor = "multi_thread")]
493 async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
494 let tempdir = tempfile::tempdir()?;
495 let path = tempdir.path();
496
497 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
500
501 let collision_hash: u64 = 0xDEADBEEF;
503 let task_id_1 = TaskId::try_from(100u32).unwrap();
504 let task_id_2 = TaskId::try_from(200u32).unwrap();
505 let task_id_3 = TaskId::try_from(300u32).unwrap();
506
507 write_task_cache_entry(&db, collision_hash, task_id_1)?;
510 write_task_cache_entry(&db, collision_hash, task_id_2)?;
511 write_task_cache_entry(&db, collision_hash, task_id_3)?;
512
513 let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
515
516 assert_eq!(
517 results.len(),
518 3,
519 "Should return all 3 task IDs for the colliding hash"
520 );
521
522 let mut found_ids: Vec<TaskId> = results
524 .iter()
525 .map(|bytes| {
526 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
527 TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
528 })
529 .collect();
530 found_ids.sort_by_key(|id| **id);
531
532 assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
533
534 db.shutdown()?;
535 Ok(())
536 }
537
538 #[tokio::test(flavor = "multi_thread")]
541 async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
542 let tempdir = tempfile::tempdir()?;
543 let path = tempdir.path();
544
545 let n = 100_000;
546 let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
547 let task_ids: Vec<TaskId> = (1..=n as u32)
548 .map(|i| TaskId::try_from(i).unwrap())
549 .collect();
550
551 {
553 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
554 let batch = db.write_batch()?;
555
556 for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
557 batch.put(
558 KeySpace::TaskCache,
559 WriteBuffer::Borrowed(&hash.to_le_bytes()),
560 WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
561 )?;
562 }
563 unsafe { batch.flush(KeySpace::TaskCache) }?;
565 batch.commit()?;
566
567 db.shutdown()?;
568 }
569
570 {
572 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
573 let mut found = 0;
574 let mut missing = 0;
575 for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
576 let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
577 if results.is_empty() {
578 missing += 1;
579 } else {
580 found += 1;
581 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
582 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
583 assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
584 }
585 }
586 assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
587 db.shutdown()?;
588 }
589
590 Ok(())
591 }
592}