1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_tasks::{
12 DynTaskInputs, RawVc, TaskId,
13 macro_helpers::NativeFunction,
14 panic_hooks::{PanicHookGuard, register_panic_hook},
15 parallel,
16};
17
18use crate::{
19 GitVersionInfo,
20 backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
21 backing_storage::{
22 BackingStorage, BackingStorageSealed, SnapshotItem, compute_task_type_hash_from_components,
23 },
24 database::{
25 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26 db_versioning::handle_db_versioning,
27 key_value_database::{KeySpace, KeyValueDatabase},
28 write_batch::{ConcurrentWriteBatch, WriteBuffer},
29 },
30 db_invalidation::invalidation_reasons,
31};
32
33const META_KEY_OPERATIONS: u32 = 0;
34const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
35
36struct IntKey([u8; 4]);
37
38impl IntKey {
39 fn new(value: u32) -> Self {
40 Self(value.to_le_bytes())
41 }
42}
43
44impl AsRef<[u8]> for IntKey {
45 fn as_ref(&self) -> &[u8] {
46 &self.0
47 }
48}
49
50fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
51 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
52 Ok(n)
53}
54
55fn should_invalidate_on_panic() -> bool {
64 fn env_is_falsy(key: &str) -> bool {
65 env::var_os(key)
66 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
67 }
68 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
69 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
70 });
71 *SHOULD_INVALIDATE
72}
73
74pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
75 database: T,
76 base_path: Option<PathBuf>,
79 invalidated: Mutex<bool>,
81 _panic_hook_guard: Option<PanicHookGuard>,
84}
85
86pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
87 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
89}
90
91impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
95 pub(crate) fn new_in_memory(database: T) -> Self {
96 Self {
97 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
98 database,
99 base_path: None,
100 invalidated: Mutex::new(false),
101 _panic_hook_guard: None,
102 }),
103 }
104 }
105
106 pub(crate) fn open_versioned_on_disk(
118 base_path: PathBuf,
119 version_info: &GitVersionInfo,
120 is_ci: bool,
121 database: impl FnOnce(PathBuf) -> Result<T>,
122 ) -> Result<(Self, StartupCacheState)>
123 where
124 T: Send + Sync + 'static,
125 {
126 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
127 .context("Failed to check database invalidation and cleanup")?;
128 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
129 .context("Failed to handle database versioning")?;
130 let database = (database)(versioned_path).context("Failed to open database")?;
131 let backing_storage = Self {
132 inner: Arc::new_cyclic(
133 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
134 let panic_hook_guard = if should_invalidate_on_panic() {
135 let weak_inner = weak_inner.clone();
136 Some(register_panic_hook(Box::new(move |_| {
137 let Some(inner) = weak_inner.upgrade() else {
138 return;
139 };
140 let _ = inner.invalidate(invalidation_reasons::PANIC);
145 })))
146 } else {
147 None
148 };
149 KeyValueDatabaseBackingStorageInner {
150 database,
151 base_path: Some(base_path),
152 invalidated: Mutex::new(false),
153 _panic_hook_guard: panic_hook_guard,
154 }
155 },
156 ),
157 };
158 Ok((backing_storage, startup_cache_state))
159 }
160}
161
162impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
163 fn invalidate(&self, reason_code: &str) -> Result<()> {
164 if let Some(base_path) = &self.base_path {
166 let mut invalidated_guard = self
169 .invalidated
170 .lock()
171 .unwrap_or_else(PoisonError::into_inner);
172 if *invalidated_guard {
173 return Ok(());
174 }
175 invalidate_db(base_path, reason_code)?;
179 self.database.prevent_writes();
180 *invalidated_guard = true;
182 }
183 Ok(())
184 }
185
186 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
188 self.database
189 .get(KeySpace::Infra, IntKey::new(key).as_ref())?
190 .map(as_u32)
191 .transpose()
192 }
193}
194
195impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
196 for KeyValueDatabaseBackingStorage<T>
197{
198 fn invalidate(&self, reason_code: &str) -> Result<()> {
199 self.inner.invalidate(reason_code)
200 }
201}
202
203impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
204 for KeyValueDatabaseBackingStorage<T>
205{
206 fn next_free_task_id(&self) -> Result<TaskId> {
207 Ok(self
208 .inner
209 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
210 .context("Unable to read next free task id from database")?
211 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
212 }
213
214 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
215 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
216 let Some(operations) =
217 database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
218 else {
219 return Ok(Vec::new());
220 };
221 let operations = turbo_bincode_decode(operations.borrow())?;
222 Ok(operations)
223 }
224 get(&self.inner.database).context("Unable to read uncompleted operations from database")
225 }
226
227 fn save_snapshot<I>(&self, operations: Vec<Arc<AnyOperation>>, snapshots: Vec<I>) -> Result<()>
228 where
229 I: IntoIterator<Item = SnapshotItem> + Send + Sync,
230 {
231 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
232 let batch = self.inner.database.write_batch()?;
233
234 {
235 let _span = tracing::trace_span!("update task data").entered();
236 let max_new_task_id =
237 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
238 let mut max_new_task_id = 0;
239 for SnapshotItem {
240 task_id,
241 meta,
242 data,
243 task_type_hash,
244 } in shard
245 {
246 let key = IntKey::new(*task_id);
247 let key = key.as_ref();
248 if let Some(meta) = meta {
249 batch.put(
250 KeySpace::TaskMeta,
251 WriteBuffer::Borrowed(key),
252 WriteBuffer::SmallVec(meta),
253 )?;
254 }
255 if let Some(data) = data {
256 batch.put(
257 KeySpace::TaskData,
258 WriteBuffer::Borrowed(key),
259 WriteBuffer::SmallVec(data),
260 )?;
261 }
262 if let Some(task_type_hash) = task_type_hash {
264 batch.put(
265 KeySpace::TaskCache,
266 WriteBuffer::Borrowed(&task_type_hash),
267 WriteBuffer::Borrowed(key),
268 )?;
269 max_new_task_id = max_new_task_id.max(*task_id);
270 }
271 }
272 Ok(max_new_task_id)
273 })?
274 .into_iter()
275 .max()
276 .unwrap_or_default();
277
278 let span = tracing::trace_span!("flush task data").entered();
279 parallel::try_for_each(
280 &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
281 |&key_space| {
282 let _span = span.clone().entered();
283 unsafe { batch.flush(key_space) }
286 },
287 )?;
288
289 let mut next_task_id = get_next_free_task_id(&batch)?;
290 next_task_id = next_task_id.max(max_new_task_id + 1);
291
292 save_infra(&batch, next_task_id, operations)?;
293 {
294 let _span = tracing::trace_span!("commit").entered();
295 batch.commit().context("Unable to commit operations")?;
296 }
297 Ok(())
298 }
299 }
300
301 fn lookup_task_candidates(
302 &self,
303 native_fn: &'static NativeFunction,
304 this: Option<RawVc>,
305 arg: &dyn DynTaskInputs,
306 ) -> Result<SmallVec<[TaskId; 1]>> {
307 let inner = &*self.inner;
308 if inner.database.is_empty() {
309 return Ok(SmallVec::new());
312 }
313 let hash = compute_task_type_hash_from_components(native_fn, this, arg);
314 let buffers = inner
315 .database
316 .get_multiple(KeySpace::TaskCache, &hash)
317 .with_context(|| {
318 format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
319 })?;
320
321 let mut task_ids = SmallVec::with_capacity(buffers.len());
322 for bytes in buffers {
323 let bytes = bytes.borrow().try_into()?;
324 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
325 task_ids.push(id);
326 }
327 Ok(task_ids)
328 }
329
330 fn lookup_data(
331 &self,
332 task_id: TaskId,
333 category: SpecificTaskDataCategory,
334 storage: &mut TaskStorage,
335 ) -> Result<()> {
336 let inner = &*self.inner;
337 let Some(bytes) = inner
338 .database
339 .get(category.key_space(), IntKey::new(*task_id).as_ref())
340 .with_context(|| {
341 format!("Looking up task storage for {task_id} from database failed")
342 })?
343 else {
344 return Ok(());
345 };
346 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
347 storage
348 .decode(category, &mut decoder)
349 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
350 }
351
352 fn batch_lookup_data(
353 &self,
354 task_ids: &[TaskId],
355 category: SpecificTaskDataCategory,
356 ) -> Result<Vec<TaskStorage>> {
357 let inner = &*self.inner;
358 let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
359 let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
360 let bytes = inner
361 .database
362 .batch_get(category.key_space(), &keys)
363 .with_context(|| {
364 format!(
365 "Looking up typed data for {} tasks from database failed",
366 task_ids.len()
367 )
368 })?;
369 bytes
370 .into_iter()
371 .map(|opt_bytes| {
372 let mut storage = TaskStorage::new();
373 if let Some(bytes) = opt_bytes {
374 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
375 storage
376 .decode(category, &mut decoder)
377 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
378 }
379 Ok(storage)
380 })
381 .collect::<Result<Vec<_>>>()
382 }
383
384 fn compact(&self) -> Result<bool> {
385 self.inner.database.compact()
386 }
387
388 fn shutdown(&self) -> Result<()> {
389 self.inner.database.shutdown()
390 }
391
392 fn has_unrecoverable_write_error(&self) -> bool {
393 self.inner.database.has_unrecoverable_write_error()
394 }
395}
396
397fn get_next_free_task_id<'a>(batch: &impl ConcurrentWriteBatch<'a>) -> Result<u32, anyhow::Error> {
398 Ok(
399 match batch.get(
400 KeySpace::Infra,
401 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
402 )? {
403 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
404 None => 1,
405 },
406 )
407}
408
409fn save_infra<'a>(
410 batch: &impl ConcurrentWriteBatch<'a>,
411 next_task_id: u32,
412 operations: Vec<Arc<AnyOperation>>,
413) -> Result<(), anyhow::Error> {
414 batch
415 .put(
416 KeySpace::Infra,
417 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
418 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
419 )
420 .context("Unable to write next free task id")?;
421 {
422 let _span =
423 tracing::trace_span!("update operations", operations = operations.len()).entered();
424 let operations =
425 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
426 batch
427 .put(
428 KeySpace::Infra,
429 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
430 WriteBuffer::SmallVec(operations),
431 )
432 .context("Unable to write operations")?;
433 }
434 unsafe { batch.flush(KeySpace::Infra)? };
436 Ok(())
437}
438
439#[cfg(test)]
440mod tests {
441 use std::borrow::Borrow;
442
443 use turbo_tasks::TaskId;
444
445 use super::*;
446 use crate::database::{
447 key_value_database::KeyValueDatabase,
448 turbo::TurboKeyValueDatabase,
449 write_batch::{ConcurrentWriteBatch, WriteBuffer},
450 };
451
452 fn write_task_cache_entry(
454 db: &TurboKeyValueDatabase,
455 hash: u64,
456 task_id: TaskId,
457 ) -> Result<()> {
458 let batch = db.write_batch()?;
459 batch.put(
460 KeySpace::TaskCache,
461 WriteBuffer::Borrowed(&hash.to_le_bytes()),
462 WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
463 )?;
464 batch.commit()?;
465 Ok(())
466 }
467
468 #[tokio::test(flavor = "multi_thread")]
474 async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
475 let tempdir = tempfile::tempdir()?;
476 let path = tempdir.path();
477
478 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
481
482 let collision_hash: u64 = 0xDEADBEEF;
484 let task_id_1 = TaskId::try_from(100u32).unwrap();
485 let task_id_2 = TaskId::try_from(200u32).unwrap();
486 let task_id_3 = TaskId::try_from(300u32).unwrap();
487
488 write_task_cache_entry(&db, collision_hash, task_id_1)?;
491 write_task_cache_entry(&db, collision_hash, task_id_2)?;
492 write_task_cache_entry(&db, collision_hash, task_id_3)?;
493
494 let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
496
497 assert_eq!(
498 results.len(),
499 3,
500 "Should return all 3 task IDs for the colliding hash"
501 );
502
503 let mut found_ids: Vec<TaskId> = results
505 .iter()
506 .map(|bytes| {
507 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
508 TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
509 })
510 .collect();
511 found_ids.sort_by_key(|id| **id);
512
513 assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
514
515 db.shutdown()?;
516 Ok(())
517 }
518
519 #[tokio::test(flavor = "multi_thread")]
522 async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
523 let tempdir = tempfile::tempdir()?;
524 let path = tempdir.path();
525
526 let n = 100_000;
527 let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
528 let task_ids: Vec<TaskId> = (1..=n as u32)
529 .map(|i| TaskId::try_from(i).unwrap())
530 .collect();
531
532 {
534 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
535 let batch = db.write_batch()?;
536
537 for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
538 batch.put(
539 KeySpace::TaskCache,
540 WriteBuffer::Borrowed(&hash.to_le_bytes()),
541 WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
542 )?;
543 }
544 unsafe { batch.flush(KeySpace::TaskCache) }?;
546 batch.commit()?;
547
548 db.shutdown()?;
549 }
550
551 {
553 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
554 let mut found = 0;
555 let mut missing = 0;
556 for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
557 let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
558 if results.is_empty() {
559 missing += 1;
560 } else {
561 found += 1;
562 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
563 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
564 assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
565 }
566 }
567 assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
568 db.shutdown()?;
569 }
570
571 Ok(())
572 }
573}