1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_tasks::{
12 DynTaskInputs, RawVc, TaskId,
13 macro_helpers::NativeFunction,
14 panic_hooks::{PanicHookGuard, register_panic_hook},
15 parallel,
16};
17
18use crate::{
19 GitVersionInfo,
20 backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
21 backing_storage::{SnapshotItem, SnapshotMeta, compute_task_type_hash_from_components},
22 database::{
23 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
24 db_versioning::handle_db_versioning,
25 key_value_database::KeySpace,
26 turbo::{TurboKeyValueDatabase, TurboWriteBatch},
27 write_batch::WriteBuffer,
28 },
29 db_invalidation::invalidation_reasons,
30};
31
32const META_KEY_OPERATIONS: u32 = 0;
33const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
34
35struct IntKey([u8; 4]);
36
37impl IntKey {
38 fn new(value: u32) -> Self {
39 Self(value.to_le_bytes())
40 }
41}
42
43impl AsRef<[u8]> for IntKey {
44 fn as_ref(&self) -> &[u8] {
45 &self.0
46 }
47}
48
49fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
50 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
51 Ok(n)
52}
53
54fn should_invalidate_on_panic() -> bool {
63 fn env_is_falsy(key: &str) -> bool {
64 env::var_os(key)
65 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
66 }
67 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
68 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
69 });
70 *SHOULD_INVALIDATE
71}
72
73struct TurboBackingStorageInner {
74 database: TurboKeyValueDatabase,
75 base_path: Option<PathBuf>,
78 invalidated: Mutex<bool>,
80 _panic_hook_guard: Option<PanicHookGuard>,
83}
84
85pub struct TurboBackingStorage {
93 inner: Arc<TurboBackingStorageInner>,
95}
96
97impl TurboBackingStorage {
98 pub(crate) fn new_in_memory(database: TurboKeyValueDatabase) -> Self {
99 Self {
100 inner: Arc::new(TurboBackingStorageInner {
101 database,
102 base_path: None,
103 invalidated: Mutex::new(false),
104 _panic_hook_guard: None,
105 }),
106 }
107 }
108
109 pub(crate) fn open_versioned_on_disk(
121 base_path: PathBuf,
122 version_info: &GitVersionInfo,
123 is_ci: bool,
124 database: impl FnOnce(PathBuf) -> Result<TurboKeyValueDatabase>,
125 ) -> Result<(Self, StartupCacheState)> {
126 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
127 .context("Failed to check database invalidation and cleanup")?;
128 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
129 .context("Failed to handle database versioning")?;
130 let database = (database)(versioned_path).context("Failed to open database")?;
131 let backing_storage = Self {
132 inner: Arc::new_cyclic(move |weak_inner: &Weak<TurboBackingStorageInner>| {
133 let panic_hook_guard = if should_invalidate_on_panic() {
134 let weak_inner = weak_inner.clone();
135 Some(register_panic_hook(Box::new(move |_| {
136 let Some(inner) = weak_inner.upgrade() else {
137 return;
138 };
139 let _ = inner.invalidate(invalidation_reasons::PANIC);
144 })))
145 } else {
146 None
147 };
148 TurboBackingStorageInner {
149 database,
150 base_path: Some(base_path),
151 invalidated: Mutex::new(false),
152 _panic_hook_guard: panic_hook_guard,
153 }
154 }),
155 };
156 Ok((backing_storage, startup_cache_state))
157 }
158}
159
160impl TurboBackingStorageInner {
161 fn invalidate(&self, reason_code: &str) -> Result<()> {
162 if let Some(base_path) = &self.base_path {
164 let mut invalidated_guard = self
167 .invalidated
168 .lock()
169 .unwrap_or_else(PoisonError::into_inner);
170 if *invalidated_guard {
171 return Ok(());
172 }
173 invalidate_db(base_path, reason_code)?;
177 self.database.prevent_writes();
178 *invalidated_guard = true;
180 }
181 Ok(())
182 }
183
184 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
186 self.database
187 .get(KeySpace::Infra, IntKey::new(key).as_ref())?
188 .map(as_u32)
189 .transpose()
190 }
191}
192
193impl TurboBackingStorage {
194 pub(crate) fn invalidate(&self, reason_code: &str) -> Result<()> {
202 self.inner.invalidate(reason_code)
203 }
204
205 pub(crate) fn next_free_task_id(&self) -> Result<TaskId> {
206 Ok(self
207 .inner
208 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
209 .context("Unable to read next free task id from database")?
210 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
211 }
212
213 pub(crate) fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
214 fn get(database: &TurboKeyValueDatabase) -> Result<Vec<AnyOperation>> {
215 let Some(operations) =
216 database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
217 else {
218 return Ok(Vec::new());
219 };
220 let operations = turbo_bincode_decode(operations.borrow())?;
221 Ok(operations)
222 }
223 get(&self.inner.database).context("Unable to read uncompleted operations from database")
224 }
225
226 pub(crate) fn save_snapshot<I>(
227 &self,
228 operations: Vec<Arc<AnyOperation>>,
229 snapshots: Vec<I>,
230 ) -> Result<SnapshotMeta>
231 where
232 I: IntoIterator<Item = SnapshotItem> + Send + Sync,
233 {
234 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
235 let batch = self.inner.database.write_batch()?;
236
237 {
238 let _span = tracing::trace_span!("update task data").entered();
239 let snapshot_meta =
240 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(snapshots, |shard: I| {
241 let mut max_new_task_id = 0;
242 let mut data_items = 0;
243 let mut meta_items = 0;
244 let mut task_cache_items = 0;
245 for SnapshotItem {
246 task_id,
247 meta,
248 data,
249 task_type_hash,
250 } in shard
251 {
252 let key = IntKey::new(*task_id);
253 let key = key.as_ref();
254 if let Some(meta) = meta {
255 batch.put(
256 KeySpace::TaskMeta,
257 WriteBuffer::Borrowed(key),
258 WriteBuffer::SmallVec(meta),
259 )?;
260 meta_items += 1;
261 }
262 if let Some(data) = data {
263 batch.put(
264 KeySpace::TaskData,
265 WriteBuffer::Borrowed(key),
266 WriteBuffer::SmallVec(data),
267 )?;
268 data_items += 1;
269 }
270 if let Some(task_type_hash) = task_type_hash {
272 batch.put(
273 KeySpace::TaskCache,
274 WriteBuffer::Borrowed(&task_type_hash),
275 WriteBuffer::Borrowed(key),
276 )?;
277 task_cache_items += 1;
278 max_new_task_id = max_new_task_id.max(*task_id);
279 }
280 }
281 Ok(SnapshotMeta {
282 data_items,
283 meta_items,
284 task_cache_items,
285 max_next_task_id: max_new_task_id,
286 })
287 })?
288 .into_iter()
289 .reduce(|t1, t2| t1.merge(t2))
290 .unwrap_or_default();
291
292 let span = tracing::trace_span!("flush task data").entered();
293 parallel::try_for_each(
294 &[KeySpace::TaskMeta, KeySpace::TaskData, KeySpace::TaskCache],
295 |&key_space| {
296 let _span = span.clone().entered();
297 unsafe { batch.flush(key_space) }
300 },
301 )?;
302
303 let mut next_task_id = get_next_free_task_id(&batch)?;
304 next_task_id = next_task_id.max(snapshot_meta.max_next_task_id + 1);
305
306 save_infra(&batch, next_task_id, operations)?;
307 {
308 let _span = tracing::trace_span!("commit").entered();
309 batch.commit().context("Unable to commit operations")?;
310 }
311 Ok(snapshot_meta)
312 }
313 }
314
315 pub(crate) fn lookup_task_candidates(
316 &self,
317 native_fn: &'static NativeFunction,
318 this: Option<RawVc>,
319 arg: &dyn DynTaskInputs,
320 ) -> Result<SmallVec<[TaskId; 1]>> {
321 let inner = &*self.inner;
322 if inner.database.is_empty() {
323 return Ok(SmallVec::new());
326 }
327 let hash = compute_task_type_hash_from_components(native_fn, this, arg);
328 let buffers = inner
329 .database
330 .get_multiple(KeySpace::TaskCache, &hash)
331 .with_context(|| {
332 format!("Looking up task id for {native_fn:?}(this={this:?}) from database failed")
333 })?;
334
335 let mut task_ids = SmallVec::with_capacity(buffers.len());
336 for bytes in buffers {
337 let bytes = Borrow::<[u8]>::borrow(&bytes).try_into()?;
338 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
339 task_ids.push(id);
340 }
341 Ok(task_ids)
342 }
343
344 pub(crate) fn lookup_data(
345 &self,
346 task_id: TaskId,
347 category: SpecificTaskDataCategory,
348 storage: &mut TaskStorage,
349 ) -> Result<()> {
350 let inner = &*self.inner;
351 let Some(bytes) = inner
352 .database
353 .get(category.key_space(), IntKey::new(*task_id).as_ref())
354 .with_context(|| {
355 format!("Looking up task storage for {task_id} from database failed")
356 })?
357 else {
358 return Ok(());
359 };
360 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
361 storage
362 .decode(category, &mut decoder)
363 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
364 }
365
366 pub(crate) fn batch_lookup_data(
367 &self,
368 task_ids: &[TaskId],
369 category: SpecificTaskDataCategory,
370 ) -> Result<Vec<TaskStorage>> {
371 let inner = &*self.inner;
372 let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
373 let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
374 let bytes = inner
375 .database
376 .batch_get(category.key_space(), &keys)
377 .with_context(|| {
378 format!(
379 "Looking up typed data for {} tasks from database failed",
380 task_ids.len()
381 )
382 })?;
383 bytes
384 .into_iter()
385 .map(|opt_bytes| {
386 let mut storage = TaskStorage::new();
387 if let Some(bytes) = opt_bytes {
388 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
389 storage
390 .decode(category, &mut decoder)
391 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
392 }
393 Ok(storage)
394 })
395 .collect::<Result<Vec<_>>>()
396 }
397
398 pub(crate) fn compact(&self) -> Result<bool> {
399 self.inner.database.compact()
400 }
401
402 pub(crate) fn shutdown(&self) -> Result<()> {
403 self.inner.database.shutdown()
404 }
405
406 pub(crate) fn has_unrecoverable_write_error(&self) -> bool {
407 self.inner.database.has_unrecoverable_write_error()
408 }
409}
410
411fn get_next_free_task_id(batch: &TurboWriteBatch<'_>) -> Result<u32, anyhow::Error> {
412 Ok(
413 match batch.get(
414 KeySpace::Infra,
415 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
416 )? {
417 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
418 None => 1,
419 },
420 )
421}
422
423fn save_infra(
424 batch: &TurboWriteBatch<'_>,
425 next_task_id: u32,
426 operations: Vec<Arc<AnyOperation>>,
427) -> Result<(), anyhow::Error> {
428 batch
429 .put(
430 KeySpace::Infra,
431 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
432 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
433 )
434 .context("Unable to write next free task id")?;
435 {
436 let _span =
437 tracing::trace_span!("update operations", operations = operations.len()).entered();
438 let operations =
439 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
440 batch
441 .put(
442 KeySpace::Infra,
443 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
444 WriteBuffer::SmallVec(operations),
445 )
446 .context("Unable to write operations")?;
447 }
448 unsafe { batch.flush(KeySpace::Infra)? };
450 Ok(())
451}
452
453#[cfg(test)]
454mod tests {
455 use std::borrow::Borrow;
456
457 use turbo_tasks::TaskId;
458
459 use super::*;
460 use crate::database::{turbo::TurboKeyValueDatabase, write_batch::WriteBuffer};
461
462 fn write_task_cache_entry(
464 db: &TurboKeyValueDatabase,
465 hash: u64,
466 task_id: TaskId,
467 ) -> Result<()> {
468 let batch = db.write_batch()?;
469 batch.put(
470 KeySpace::TaskCache,
471 WriteBuffer::Borrowed(&hash.to_le_bytes()),
472 WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
473 )?;
474 batch.commit()?;
475 Ok(())
476 }
477
478 #[tokio::test(flavor = "multi_thread")]
484 async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
485 let tempdir = tempfile::tempdir()?;
486 let path = tempdir.path();
487
488 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
491
492 let collision_hash: u64 = 0xDEADBEEF;
494 let task_id_1 = TaskId::try_from(100u32).unwrap();
495 let task_id_2 = TaskId::try_from(200u32).unwrap();
496 let task_id_3 = TaskId::try_from(300u32).unwrap();
497
498 write_task_cache_entry(&db, collision_hash, task_id_1)?;
501 write_task_cache_entry(&db, collision_hash, task_id_2)?;
502 write_task_cache_entry(&db, collision_hash, task_id_3)?;
503
504 let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
506
507 assert_eq!(
508 results.len(),
509 3,
510 "Should return all 3 task IDs for the colliding hash"
511 );
512
513 let mut found_ids: Vec<TaskId> = results
515 .iter()
516 .map(|bytes| {
517 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
518 TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
519 })
520 .collect();
521 found_ids.sort_by_key(|id| **id);
522
523 assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
524
525 db.shutdown()?;
526 Ok(())
527 }
528
529 #[tokio::test(flavor = "multi_thread")]
532 async fn test_batch_write_with_flush_and_reopen() -> Result<()> {
533 let tempdir = tempfile::tempdir()?;
534 let path = tempdir.path();
535
536 let n = 100_000;
537 let hashes: Vec<u64> = (0..n).map(|i| 0x1000 + i as u64).collect();
538 let task_ids: Vec<TaskId> = (1..=n as u32)
539 .map(|i| TaskId::try_from(i).unwrap())
540 .collect();
541
542 {
544 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
545 let batch = db.write_batch()?;
546
547 for (hash, task_id) in hashes.iter().zip(task_ids.iter()) {
548 batch.put(
549 KeySpace::TaskCache,
550 WriteBuffer::Borrowed(&hash.to_le_bytes()),
551 WriteBuffer::Borrowed(&(**task_id).to_le_bytes()),
552 )?;
553 }
554 unsafe { batch.flush(KeySpace::TaskCache) }?;
556 batch.commit()?;
557
558 db.shutdown()?;
559 }
560
561 {
563 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
564 let mut found = 0;
565 let mut missing = 0;
566 for (hash, expected_id) in hashes.iter().zip(task_ids.iter()) {
567 let results = db.get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())?;
568 if results.is_empty() {
569 missing += 1;
570 } else {
571 found += 1;
572 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(&results[0]).try_into().unwrap();
573 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
574 assert_eq!(id, *expected_id, "Task ID mismatch for hash {hash:#x}");
575 }
576 }
577 assert_eq!(missing, 0, "Found {found}/{n} entries, missing {missing}");
578 db.shutdown()?;
579 }
580
581 Ok(())
582 }
583}