1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_tasks::{
12 DynTaskInputs, RawVc, TaskId,
13 macro_helpers::NativeFunction,
14 panic_hooks::{PanicHookGuard, register_panic_hook},
15 parallel,
16};
17
18use crate::{
19 GitVersionInfo,
20 backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
21 backing_storage::{
22 BackingStorage, BackingStorageSealed, SnapshotItem, SnapshotMeta,
23 compute_task_type_hash_from_components,
24 },
25 database::{
26 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
27 db_versioning::handle_db_versioning,
28 key_value_database::{KeySpace, KeyValueDatabase},
29 write_batch::{ConcurrentWriteBatch, WriteBuffer},
30 },
31 db_invalidation::invalidation_reasons,
32};
33
34const META_KEY_OPERATIONS: u32 = 0;
35const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
36
37struct IntKey([u8; 4]);
38
39impl IntKey {
40 fn new(value: u32) -> Self {
41 Self(value.to_le_bytes())
42 }
43}
44
45impl AsRef<[u8]> for IntKey {
46 fn as_ref(&self) -> &[u8] {
47 &self.0
48 }
49}
50
51fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
52 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
53 Ok(n)
54}
55
56fn should_invalidate_on_panic() -> bool {
65 fn env_is_falsy(key: &str) -> bool {
66 env::var_os(key)
67 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
68 }
69 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
70 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
71 });
72 *SHOULD_INVALIDATE
73}
74
75pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
76 database: T,
77 base_path: Option<PathBuf>,
80 invalidated: Mutex<bool>,
82 _panic_hook_guard: Option<PanicHookGuard>,
85}
86
87pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
88 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
90}
91
92impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
96 pub(crate) fn new_in_memory(database: T) -> Self {
97 Self {
98 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
99 database,
100 base_path: None,
101 invalidated: Mutex::new(false),
102 _panic_hook_guard: None,
103 }),
104 }
105 }
106
107 pub(crate) fn open_versioned_on_disk(
119 base_path: PathBuf,
120 version_info: &GitVersionInfo,
121 is_ci: bool,
122 database: impl FnOnce(PathBuf) -> Result<T>,
123 ) -> Result<(Self, StartupCacheState)>
124 where
125 T: Send + Sync + 'static,
126 {
127 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
128 .context("Failed to check database invalidation and cleanup")?;
129 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
130 .context("Failed to handle database versioning")?;
131 let database = (database)(versioned_path).context("Failed to open database")?;
132 let backing_storage = Self {
133 inner: Arc::new_cyclic(
134 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
135 let panic_hook_guard = if should_invalidate_on_panic() {
136 let weak_inner = weak_inner.clone();
137 Some(register_panic_hook(Box::new(move |_| {
138 let Some(inner) = weak_inner.upgrade() else {
139 return;
140 };
141 let _ = inner.invalidate(invalidation_reasons::PANIC);
146 })))
147 } else {
148 None
149 };
150 KeyValueDatabaseBackingStorageInner {
151 database,
152 base_path: Some(base_path),
153 invalidated: Mutex::new(false),
154 _panic_hook_guard: panic_hook_guard,
155 }
156 },
157 ),
158 };
159 Ok((backing_storage, startup_cache_state))
160 }
161}
162
163impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
164 fn invalidate(&self, reason_code: &str) -> Result<()> {
165 if let Some(base_path) = &self.base_path {
167 let mut invalidated_guard = self
170 .invalidated
171 .lock()
172 .unwrap_or_else(PoisonError::into_inner);
173 if *invalidated_guard {
174 return Ok(());
175 }
176 invalidate_db(base_path, reason_code)?;
180 self.database.prevent_writes();
181 *invalidated_guard = true;
183 }
184 Ok(())
185 }
186
187 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
189 self.database
190 .get(KeySpace::Infra, IntKey::new(key).as_ref())?
191 .map(as_u32)
192 .transpose()
193 }
194}
195
196impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
197 for KeyValueDatabaseBackingStorage<T>
198{
199 fn invalidate(&self, reason_code: &str) -> Result<()> {
200 self.inner.invalidate(reason_code)
201 }
202}
203
204impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
205 for KeyValueDatabaseBackingStorage<T>
206{
207 fn next_free_task_id(&self) -> Result<TaskId> {
208 Ok(self
209 .inner
210 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
211 .context("Unable to read next free task id from database")?
212 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
213 }
214
215 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
216 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
217 let Some(operations) =
218 database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
219 else {
220 return Ok(Vec::new());
221 };
222 let operations = turbo_bincode_decode(operations.borrow())?;
223 Ok(operations)
224 }
225 get(&self.inner.database).context("Unable to read uncompleted operations from database")
226 }
227
228 fn save_snapshot<I>(
229 &self,
230 operations: Vec<Arc<AnyOperation>>,
231 snapshots: Vec<I>,
232 ) -> Result<SnapshotMeta>
233 where
234 I: IntoIterator<Item = SnapshotItem> + Send + Sync,
235 {
236 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
237 let batch = self.inner.database.write_batch()?;
238
239 {
240 let _span = tracing::trace_span!("update task data").entered();
241 let snapshot_meta =
242 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
243 let mut max_new_task_id = 0;
244 let mut data_items = 0;
245 let mut meta_items = 0;
246 let mut task_cache_items = 0;
247 for SnapshotItem {
248 task_id,
249 meta,
250 data,
251 task_type_hash,
252 } in shard
253 {
254 let key = IntKey::new(*task_id);
255 let key = key.as_ref();
256 if let Some(meta) = meta {
257 batch.put(
258 KeySpace::TaskMeta,
259 WriteBuffer::Borrowed(key),
260 WriteBuffer::SmallVec(meta),
261 )?;
262 meta_items += 1;
263 }
264 if let Some(data) = data {
265 batch.put(
266 KeySpace::TaskData,
267 WriteBuffer::Borrowed(key),
268 WriteBuffer::SmallVec(data),
269 )?;
270 data_items += 1;
271 }
272 if let Some(task_type_hash) = task_type_hash {
274 batch.put(
275 KeySpace::TaskCache,
276 WriteBuffer::Borrowed(&task_type_hash),
277 WriteBuffer::Borrowed(key),
278 )?;
279 task_cache_items += 1;
280 max_new_task_id = max_new_task_id.max(*task_id);
281 }
282 }
283 Ok(SnapshotMeta {
284 data_items,
285 meta_items,
286 task_cache_items,
287 max_next_task_id: max_new_task_id,
288 })
289 })?
290 .into_iter()
291 .reduce(|t1, t2| t1.merge(t2))
292 .unwrap_or_default();
293
294 let span = tracing::trace_span!("flush task data").entered();
295 parallel::try_for_each(
296 &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
297 |&key_space| {
298 let _span = span.clone().entered();
299 unsafe { batch.flush(key_space) }
302 },
303 )?;
304
305 let mut next_task_id = get_next_free_task_id(&batch)?;
306 next_task_id = next_task_id.max(snapshot_meta.max_next_task_id + 1);
307
308 save_infra(&batch, next_task_id, operations)?;
309 {
310 let _span = tracing::trace_span!("commit").entered();
311 batch.commit().context("Unable to commit operations")?;
312 }
313 Ok(snapshot_meta)
314 }
315 }
316
317 fn lookup_task_candidates(
318 &self,
319 native_fn: &'static NativeFunction,
320 this: Option<RawVc>,
321 arg: &dyn DynTaskInputs,
322 ) -> Result<SmallVec<[TaskId; 1]>> {
323 let inner = &*self.inner;
324 if inner.database.is_empty() {
325 return Ok(SmallVec::new());
328 }
329 let hash = compute_task_type_hash_from_components(native_fn, this, arg);
330 let buffers = inner
331 .database
332 .get_multiple(KeySpace::TaskCache, &hash)
333 .with_context(|| {
334 format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
335 })?;
336
337 let mut task_ids = SmallVec::with_capacity(buffers.len());
338 for bytes in buffers {
339 let bytes = bytes.borrow().try_into()?;
340 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
341 task_ids.push(id);
342 }
343 Ok(task_ids)
344 }
345
346 fn lookup_data(
347 &self,
348 task_id: TaskId,
349 category: SpecificTaskDataCategory,
350 storage: &mut TaskStorage,
351 ) -> Result<()> {
352 let inner = &*self.inner;
353 let Some(bytes) = inner
354 .database
355 .get(category.key_space(), IntKey::new(*task_id).as_ref())
356 .with_context(|| {
357 format!("Looking up task storage for {task_id} from database failed")
358 })?
359 else {
360 return Ok(());
361 };
362 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
363 storage
364 .decode(category, &mut decoder)
365 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
366 }
367
368 fn batch_lookup_data(
369 &self,
370 task_ids: &[TaskId],
371 category: SpecificTaskDataCategory,
372 ) -> Result<Vec<TaskStorage>> {
373 let inner = &*self.inner;
374 let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
375 let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
376 let bytes = inner
377 .database
378 .batch_get(category.key_space(), &keys)
379 .with_context(|| {
380 format!(
381 "Looking up typed data for {} tasks from database failed",
382 task_ids.len()
383 )
384 })?;
385 bytes
386 .into_iter()
387 .map(|opt_bytes| {
388 let mut storage = TaskStorage::new();
389 if let Some(bytes) = opt_bytes {
390 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
391 storage
392 .decode(category, &mut decoder)
393 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
394 }
395 Ok(storage)
396 })
397 .collect::<Result<Vec<_>>>()
398 }
399
400 fn compact(&self) -> Result<bool> {
401 self.inner.database.compact()
402 }
403
404 fn shutdown(&self) -> Result<()> {
405 self.inner.database.shutdown()
406 }
407
408 fn has_unrecoverable_write_error(&self) -> bool {
409 self.inner.database.has_unrecoverable_write_error()
410 }
411}
412
413fn get_next_free_task_id<'a>(batch: &impl ConcurrentWriteBatch<'a>) -> Result<u32, anyhow::Error> {
414 Ok(
415 match batch.get(
416 KeySpace::Infra,
417 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
418 )? {
419 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
420 None => 1,
421 },
422 )
423}
424
425fn save_infra<'a>(
426 batch: &impl ConcurrentWriteBatch<'a>,
427 next_task_id: u32,
428 operations: Vec<Arc<AnyOperation>>,
429) -> Result<(), anyhow::Error> {
430 batch
431 .put(
432 KeySpace::Infra,
433 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
434 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
435 )
436 .context("Unable to write next free task id")?;
437 {
438 let _span =
439 tracing::trace_span!("update operations", operations = operations.len()).entered();
440 let operations =
441 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
442 batch
443 .put(
444 KeySpace::Infra,
445 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
446 WriteBuffer::SmallVec(operations),
447 )
448 .context("Unable to write operations")?;
449 }
450 unsafe { batch.flush(KeySpace::Infra)? };
452 Ok(())
453}
454
455#[cfg(test)]
456mod tests {
457 use std::borrow::Borrow;
458
459 use turbo_tasks::TaskId;
460
461 use super::*;
462 use crate::database::{
463 key_value_database::KeyValueDatabase,
464 turbo::TurboKeyValueDatabase,
465 write_batch::{ConcurrentWriteBatch, WriteBuffer},
466 };
467
468 fn write_task_cache_entry(
470 db: &TurboKeyValueDatabase,
471 hash: u64,
472 task_id: TaskId,
473 ) -> Result<()> {
474 let batch = db.write_batch()?;
475 batch.put(
476 KeySpace::TaskCache,
477 WriteBuffer::Borrowed(&hash.to_le_bytes()),
478 WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
479 )?;
480 batch.commit()?;
481 Ok(())
482 }
483
484 #[tokio::test(flavor = "multi_thread")]
490 async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
491 let tempdir = tempfile::tempdir()?;
492 let path = tempdir.path();
493
494 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
497
498 let collision_hash: u64 = 0xDEADBEEF;
500 let task_id_1 = TaskId::try_from(100u32).unwrap();
501 let task_id_2 = TaskId::try_from(200u32).unwrap();
502 let task_id_3 = TaskId::try_from(300u32).unwrap();
503
504 write_task_cache_entry(&db, collision_hash, task_id_1)?;
507 write_task_cache_entry(&db, collision_hash, task_id_2)?;
508 write_task_cache_entry(&db, collision_hash, task_id_3)?;
509
510 let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
512
513 assert_eq!(
514 results.len(),
515 3,
516 "Should return all 3 task IDs for the colliding hash"
517 );
518
519 let mut found_ids: Vec<TaskId> = results
521 .iter()
522 .map(|bytes| {
523 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
524 TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
525 })
526 .collect();
527 found_ids.sort_by_key(|id| **id);
528
529 assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
530
531 db.shutdown()?;
532 Ok(())
533 }
534
535 #[tokio::test(flavor = "multi_thread")]
538 async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
539 let tempdir = tempfile::tempdir()?;
540 let path = tempdir.path();
541
542 let n = 100_000;
543 let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
544 let task_ids: Vec<TaskId> = (1..=n as u32)
545 .map(|i| TaskId::try_from(i).unwrap())
546 .collect();
547
548 {
550 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
551 let batch = db.write_batch()?;
552
553 for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
554 batch.put(
555 KeySpace::TaskCache,
556 WriteBuffer::Borrowed(&hash.to_le_bytes()),
557 WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
558 )?;
559 }
560 unsafe { batch.flush(KeySpace::TaskCache) }?;
562 batch.commit()?;
563
564 db.shutdown()?;
565 }
566
567 {
569 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
570 let mut found = 0;
571 let mut missing = 0;
572 for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
573 let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
574 if results.is_empty() {
575 missing += 1;
576 } else {
577 found += 1;
578 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
579 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
580 assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
581 }
582 }
583 assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
584 db.shutdown()?;
585 }
586
587 Ok(())
588 }
589}