1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use smallvec::SmallVec;
10use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode};
11use turbo_tasks::{
12 TaskId,
13 backend::CachedTaskType,
14 panic_hooks::{PanicHookGuard, register_panic_hook},
15 parallel,
16};
17use turbo_tasks_hash::Xxh3Hash64Hasher;
18
19use crate::{
20 GitVersionInfo,
21 backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
22 backing_storage::{BackingStorage, BackingStorageSealed, SnapshotItem},
23 database::{
24 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
25 db_versioning::handle_db_versioning,
26 key_value_database::{KeySpace, KeyValueDatabase},
27 write_batch::{ConcurrentWriteBatch, WriteBuffer},
28 },
29 db_invalidation::invalidation_reasons,
30 utils::chunked_vec::ChunkedVec,
31};
32
33const META_KEY_OPERATIONS: u32 = 0;
34const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
35
36struct IntKey([u8; 4]);
37
38impl IntKey {
39 fn new(value: u32) -> Self {
40 Self(value.to_le_bytes())
41 }
42}
43
44impl AsRef<[u8]> for IntKey {
45 fn as_ref(&self) -> &[u8] {
46 &self.0
47 }
48}
49
50fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
51 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
52 Ok(n)
53}
54
55fn should_invalidate_on_panic() -> bool {
64 fn env_is_falsy(key: &str) -> bool {
65 env::var_os(key)
66 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
67 }
68 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
69 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
70 });
71 *SHOULD_INVALIDATE
72}
73
74pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
75 database: T,
76 base_path: Option<PathBuf>,
79 invalidated: Mutex<bool>,
81 _panic_hook_guard: Option<PanicHookGuard>,
84}
85
86pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
87 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
89}
90
91impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
95 pub(crate) fn new_in_memory(database: T) -> Self {
96 Self {
97 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
98 database,
99 base_path: None,
100 invalidated: Mutex::new(false),
101 _panic_hook_guard: None,
102 }),
103 }
104 }
105
106 pub(crate) fn open_versioned_on_disk(
118 base_path: PathBuf,
119 version_info: &GitVersionInfo,
120 is_ci: bool,
121 database: impl FnOnce(PathBuf) -> Result<T>,
122 ) -> Result<(Self, StartupCacheState)>
123 where
124 T: Send + Sync + 'static,
125 {
126 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
127 .context("Failed to check database invalidation and cleanup")?;
128 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
129 .context("Failed to handle database versioning")?;
130 let database = (database)(versioned_path).context("Failed to open database")?;
131 let backing_storage = Self {
132 inner: Arc::new_cyclic(
133 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
134 let panic_hook_guard = if should_invalidate_on_panic() {
135 let weak_inner = weak_inner.clone();
136 Some(register_panic_hook(Box::new(move |_| {
137 let Some(inner) = weak_inner.upgrade() else {
138 return;
139 };
140 let _ = inner.invalidate(invalidation_reasons::PANIC);
145 })))
146 } else {
147 None
148 };
149 KeyValueDatabaseBackingStorageInner {
150 database,
151 base_path: Some(base_path),
152 invalidated: Mutex::new(false),
153 _panic_hook_guard: panic_hook_guard,
154 }
155 },
156 ),
157 };
158 Ok((backing_storage, startup_cache_state))
159 }
160}
161
162impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
163 fn invalidate(&self, reason_code: &str) -> Result<()> {
164 if let Some(base_path) = &self.base_path {
166 let mut invalidated_guard = self
169 .invalidated
170 .lock()
171 .unwrap_or_else(PoisonError::into_inner);
172 if *invalidated_guard {
173 return Ok(());
174 }
175 invalidate_db(base_path, reason_code)?;
179 self.database.prevent_writes();
180 *invalidated_guard = true;
182 }
183 Ok(())
184 }
185
186 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
188 self.database
189 .get(KeySpace::Infra, IntKey::new(key).as_ref())?
190 .map(as_u32)
191 .transpose()
192 }
193}
194
195impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
196 for KeyValueDatabaseBackingStorage<T>
197{
198 fn invalidate(&self, reason_code: &str) -> Result<()> {
199 self.inner.invalidate(reason_code)
200 }
201}
202
203impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
204 for KeyValueDatabaseBackingStorage<T>
205{
206 fn next_free_task_id(&self) -> Result<TaskId> {
207 Ok(self
208 .inner
209 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
210 .context("Unable to read next free task id from database")?
211 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
212 }
213
214 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
215 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
216 let Some(operations) =
217 database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())?
218 else {
219 return Ok(Vec::new());
220 };
221 let operations = turbo_bincode_decode(operations.borrow())?;
222 Ok(operations)
223 }
224 get(&self.inner.database).context("Unable to read uncompleted operations from database")
225 }
226
227 fn save_snapshot<I>(
228 &self,
229 operations: Vec<Arc<AnyOperation>>,
230 task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
231 snapshots: Vec<I>,
232 ) -> Result<()>
233 where
234 I: IntoIterator<Item = SnapshotItem> + Send + Sync,
235 {
236 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
237 let batch = self.inner.database.write_batch()?;
238
239 {
240 let _span = tracing::trace_span!("update task data").entered();
241 process_task_data(snapshots, &batch)?;
242 let span = tracing::trace_span!("flush task data").entered();
243 parallel::try_for_each(&[KeySpace::TaskMeta, KeySpace::TaskData], |&key_space| {
244 let _span = span.clone().entered();
245 unsafe { batch.flush(key_space) }
250 })?;
251 }
252
253 let mut next_task_id = get_next_free_task_id(&batch)?;
254
255 {
256 let _span = tracing::trace_span!(
257 "update task cache",
258 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
259 )
260 .entered();
261 let max_task_id = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
262 task_cache_updates,
263 |updates| {
264 let _span = _span.clone().entered();
265 let mut max_task_id = 0;
266 for (task_type, task_id) in updates {
267 let hash = compute_task_type_hash(&task_type);
268 let task_id: u32 = *task_id;
269
270 batch
271 .put(
272 KeySpace::TaskCache,
273 WriteBuffer::Borrowed(&hash.to_le_bytes()),
274 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
275 )
276 .with_context(|| {
277 format!("Unable to write task cache {task_type:?} => {task_id}")
278 })?;
279 max_task_id = max_task_id.max(task_id);
280 }
281
282 Ok(max_task_id)
283 },
284 )?
285 .into_iter()
286 .max()
287 .unwrap_or(0);
288 next_task_id = next_task_id.max(max_task_id + 1);
289 }
290
291 save_infra(&batch, next_task_id, operations)?;
292
293 {
294 let _span = tracing::trace_span!("commit").entered();
295 batch.commit().context("Unable to commit operations")?;
296 }
297 Ok(())
298 }
299
300 fn lookup_task_candidates(&self, task_type: &CachedTaskType) -> Result<SmallVec<[TaskId; 1]>> {
301 let inner = &*self.inner;
302 if inner.database.is_empty() {
303 return Ok(SmallVec::new());
306 }
307 let hash = compute_task_type_hash(task_type);
308 let buffers = inner
309 .database
310 .get_multiple(KeySpace::TaskCache, &hash.to_le_bytes())
311 .with_context(|| {
312 format!("Looking up task id for {task_type:?} from database failed")
313 })?;
314
315 let mut task_ids = SmallVec::with_capacity(buffers.len());
316 for bytes in buffers {
317 let bytes = bytes.borrow().try_into()?;
318 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
319 task_ids.push(id);
320 }
321 Ok(task_ids)
322 }
323
324 fn lookup_data(
325 &self,
326 task_id: TaskId,
327 category: SpecificTaskDataCategory,
328 storage: &mut TaskStorage,
329 ) -> Result<()> {
330 let inner = &*self.inner;
331 let Some(bytes) = inner
332 .database
333 .get(category.key_space(), IntKey::new(*task_id).as_ref())
334 .with_context(|| {
335 format!("Looking up task storage for {task_id} from database failed")
336 })?
337 else {
338 return Ok(());
339 };
340 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
341 storage
342 .decode(category, &mut decoder)
343 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
344 }
345
346 fn batch_lookup_data(
347 &self,
348 task_ids: &[TaskId],
349 category: SpecificTaskDataCategory,
350 ) -> Result<Vec<TaskStorage>> {
351 let inner = &*self.inner;
352 let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
353 let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
354 let bytes = inner
355 .database
356 .batch_get(category.key_space(), &keys)
357 .with_context(|| {
358 format!(
359 "Looking up typed data for {} tasks from database failed",
360 task_ids.len()
361 )
362 })?;
363 bytes
364 .into_iter()
365 .map(|opt_bytes| {
366 let mut storage = TaskStorage::new();
367 if let Some(bytes) = opt_bytes {
368 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
369 storage
370 .decode(category, &mut decoder)
371 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
372 }
373 Ok(storage)
374 })
375 .collect::<Result<Vec<_>>>()
376 }
377
378 fn compact(&self) -> Result<bool> {
379 self.inner.database.compact()
380 }
381
382 fn shutdown(&self) -> Result<()> {
383 self.inner.database.shutdown()
384 }
385}
386
387fn get_next_free_task_id<'a>(batch: &impl ConcurrentWriteBatch<'a>) -> Result<u32, anyhow::Error> {
388 Ok(
389 match batch.get(
390 KeySpace::Infra,
391 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
392 )? {
393 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
394 None => 1,
395 },
396 )
397}
398
399fn save_infra<'a>(
400 batch: &impl ConcurrentWriteBatch<'a>,
401 next_task_id: u32,
402 operations: Vec<Arc<AnyOperation>>,
403) -> Result<(), anyhow::Error> {
404 batch
405 .put(
406 KeySpace::Infra,
407 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
408 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
409 )
410 .context("Unable to write next free task id")?;
411 {
412 let _span =
413 tracing::trace_span!("update operations", operations = operations.len()).entered();
414 let operations =
415 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
416 batch
417 .put(
418 KeySpace::Infra,
419 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
420 WriteBuffer::SmallVec(operations),
421 )
422 .context("Unable to write operations")?;
423 }
424 unsafe { batch.flush(KeySpace::Infra)? };
426 Ok(())
427}
428
429fn compute_task_type_hash(task_type: &CachedTaskType) -> u64 {
434 let mut hasher = Xxh3Hash64Hasher::new();
435 task_type.hash_encode(&mut hasher);
436 let hash = hasher.finish();
437 if cfg!(feature = "verify_serialization") {
438 task_type.hash_encode(&mut hasher);
439 let hash2 = hasher.finish();
440 assert_eq!(
441 hash, hash2,
442 "Hashing TaskType twice was non-deterministic: \n{:?}\ngot hashes {} != {}",
443 task_type, hash, hash2
444 );
445 }
446 hash
447}
448
449fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
450 tasks: Vec<I>,
451 batch: &B,
452) -> Result<()>
453where
454 I: IntoIterator<Item = SnapshotItem> + Send + Sync,
455{
456 parallel::try_for_each_owned(tasks, |tasks| {
457 for SnapshotItem {
458 task_id,
459 meta,
460 data,
461 } in tasks
462 {
463 let key = IntKey::new(*task_id);
464 let key = key.as_ref();
465 if let Some(meta) = meta {
466 batch.put(
467 KeySpace::TaskMeta,
468 WriteBuffer::Borrowed(key),
469 WriteBuffer::SmallVec(meta),
470 )?;
471 }
472 if let Some(data) = data {
473 batch.put(
474 KeySpace::TaskData,
475 WriteBuffer::Borrowed(key),
476 WriteBuffer::SmallVec(data),
477 )?;
478 }
479 }
480 Ok(())
481 })
482}
483#[cfg(test)]
484mod tests {
485 use std::borrow::Borrow;
486
487 use turbo_tasks::TaskId;
488
489 use super::*;
490 use crate::database::{
491 key_value_database::KeyValueDatabase,
492 turbo::TurboKeyValueDatabase,
493 write_batch::{ConcurrentWriteBatch, WriteBuffer},
494 };
495
496 fn write_task_cache_entry(
498 db: &TurboKeyValueDatabase,
499 hash: u64,
500 task_id: TaskId,
501 ) -> Result<()> {
502 let batch = db.write_batch()?;
503 batch.put(
504 KeySpace::TaskCache,
505 WriteBuffer::Borrowed(&hash.to_le_bytes()),
506 WriteBuffer::Borrowed(&(*task_id).to_le_bytes()),
507 )?;
508 batch.commit()?;
509 Ok(())
510 }
511
512 #[tokio::test(flavor = "multi_thread")]
518 async fn test_hash_collision_returns_multiple_candidates() -> Result<()> {
519 let tempdir = tempfile::tempdir()?;
520 let path = tempdir.path();
521
522 let db = TurboKeyValueDatabase::new(path.to_path_buf(), false, true, false)?;
525
526 let collision_hash: u64 = 0xDEADBEEF;
528 let task_id_1 = TaskId::try_from(100u32).unwrap();
529 let task_id_2 = TaskId::try_from(200u32).unwrap();
530 let task_id_3 = TaskId::try_from(300u32).unwrap();
531
532 write_task_cache_entry(&db, collision_hash, task_id_1)?;
535 write_task_cache_entry(&db, collision_hash, task_id_2)?;
536 write_task_cache_entry(&db, collision_hash, task_id_3)?;
537
538 let results = db.get_multiple(KeySpace::TaskCache, &collision_hash.to_le_bytes())?;
540
541 assert_eq!(
542 results.len(),
543 3,
544 "Should return all 3 task IDs for the colliding hash"
545 );
546
547 let mut found_ids: Vec<TaskId> = results
549 .iter()
550 .map(|bytes| {
551 let bytes: [u8; 4] = Borrow::<[u8]>::borrow(bytes).try_into().unwrap();
552 TaskId::try_from(u32::from_le_bytes(bytes)).unwrap()
553 })
554 .collect();
555 found_ids.sort_by_key(|id| **id);
556
557 assert_eq!(found_ids, vec![task_id_1, task_id_2, task_id_3]);
558
559 db.shutdown()?;
560 Ok(())
561 }
562}