1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result, anyhow};
9use turbo_bincode::{
10 TurboBincodeBuffer, turbo_bincode_decode, turbo_bincode_encode, turbo_bincode_encode_into,
11};
12use turbo_tasks::{
13 TaskId,
14 backend::CachedTaskType,
15 panic_hooks::{PanicHookGuard, register_panic_hook},
16 parallel,
17};
18
19use crate::{
20 GitVersionInfo,
21 backend::{AnyOperation, TaskDataCategory},
22 backing_storage::{BackingStorage, BackingStorageSealed},
23 data::CachedDataItem,
24 database::{
25 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26 db_versioning::handle_db_versioning,
27 key_value_database::{KeySpace, KeyValueDatabase},
28 write_batch::{
29 BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
30 WriteBuffer,
31 },
32 },
33 db_invalidation::invalidation_reasons,
34 utils::chunked_vec::ChunkedVec,
35};
36
37const META_KEY_OPERATIONS: u32 = 0;
38const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
39
40struct IntKey([u8; 4]);
41
42impl IntKey {
43 fn new(value: u32) -> Self {
44 Self(value.to_le_bytes())
45 }
46}
47
48impl AsRef<[u8]> for IntKey {
49 fn as_ref(&self) -> &[u8] {
50 &self.0
51 }
52}
53
54fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
55 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
56 Ok(n)
57}
58
59fn should_invalidate_on_panic() -> bool {
68 fn env_is_falsy(key: &str) -> bool {
69 env::var_os(key)
70 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
71 }
72 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
73 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
74 });
75 *SHOULD_INVALIDATE
76}
77
78pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
79 database: T,
80 base_path: Option<PathBuf>,
83 invalidated: Mutex<bool>,
85 _panic_hook_guard: Option<PanicHookGuard>,
88}
89
90pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
91 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
93}
94
95impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
99 pub(crate) fn new_in_memory(database: T) -> Self {
100 Self {
101 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
102 database,
103 base_path: None,
104 invalidated: Mutex::new(false),
105 _panic_hook_guard: None,
106 }),
107 }
108 }
109
110 pub(crate) fn open_versioned_on_disk(
122 base_path: PathBuf,
123 version_info: &GitVersionInfo,
124 is_ci: bool,
125 database: impl FnOnce(PathBuf) -> Result<T>,
126 ) -> Result<(Self, StartupCacheState)>
127 where
128 T: Send + Sync + 'static,
129 {
130 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
131 .context("Failed to check database invalidation and cleanup")?;
132 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
133 .context("Failed to handle database versioning")?;
134 let database = (database)(versioned_path).context("Failed to open database")?;
135 let backing_storage = Self {
136 inner: Arc::new_cyclic(
137 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
138 let panic_hook_guard = if should_invalidate_on_panic() {
139 let weak_inner = weak_inner.clone();
140 Some(register_panic_hook(Box::new(move |_| {
141 let Some(inner) = weak_inner.upgrade() else {
142 return;
143 };
144 let _ = inner.invalidate(invalidation_reasons::PANIC);
149 })))
150 } else {
151 None
152 };
153 KeyValueDatabaseBackingStorageInner {
154 database,
155 base_path: Some(base_path),
156 invalidated: Mutex::new(false),
157 _panic_hook_guard: panic_hook_guard,
158 }
159 },
160 ),
161 };
162 Ok((backing_storage, startup_cache_state))
163 }
164}
165
166impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
167 fn with_tx<R>(
168 &self,
169 tx: Option<&T::ReadTransaction<'_>>,
170 f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
171 ) -> Result<R> {
172 if let Some(tx) = tx {
173 f(tx)
174 } else {
175 let tx = self.database.begin_read_transaction()?;
176 let r = f(&tx)?;
177 drop(tx);
178 Ok(r)
179 }
180 }
181
182 fn invalidate(&self, reason_code: &str) -> Result<()> {
183 if let Some(base_path) = &self.base_path {
185 let mut invalidated_guard = self
188 .invalidated
189 .lock()
190 .unwrap_or_else(PoisonError::into_inner);
191 if *invalidated_guard {
192 return Ok(());
193 }
194 invalidate_db(base_path, reason_code)?;
198 self.database.prevent_writes();
199 *invalidated_guard = true;
201 }
202 Ok(())
203 }
204
205 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
207 let tx = self.database.begin_read_transaction()?;
208 self.database
209 .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
210 .map(as_u32)
211 .transpose()
212 }
213}
214
215impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
216 for KeyValueDatabaseBackingStorage<T>
217{
218 fn invalidate(&self, reason_code: &str) -> Result<()> {
219 self.inner.invalidate(reason_code)
220 }
221}
222
223impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
224 for KeyValueDatabaseBackingStorage<T>
225{
226 type ReadTransaction<'l> = T::ReadTransaction<'l>;
227
228 fn next_free_task_id(&self) -> Result<TaskId> {
229 Ok(self
230 .inner
231 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
232 .context("Unable to read next free task id from database")?
233 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
234 }
235
236 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
237 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
238 let tx = database.begin_read_transaction()?;
239 let Some(operations) = database.get(
240 &tx,
241 KeySpace::Infra,
242 IntKey::new(META_KEY_OPERATIONS).as_ref(),
243 )?
244 else {
245 return Ok(Vec::new());
246 };
247 let operations = turbo_bincode_decode(operations.borrow())?;
248 Ok(operations)
249 }
250 get(&self.inner.database).context("Unable to read uncompleted operations from database")
251 }
252
253 fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
254 encode_task_data(task, data)
255 }
256
257 fn save_snapshot<I>(
258 &self,
259 operations: Vec<Arc<AnyOperation>>,
260 task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
261 snapshots: Vec<I>,
262 ) -> Result<()>
263 where
264 I: Iterator<
265 Item = (
266 TaskId,
267 Option<TurboBincodeBuffer>,
268 Option<TurboBincodeBuffer>,
269 ),
270 > + Send
271 + Sync,
272 {
273 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
274 let mut batch = self.inner.database.write_batch()?;
275
276 const INITIAL_ENCODE_BUFFER_CAPACITY: usize = 1024;
278
279 match &mut batch {
281 &mut WriteBatch::Concurrent(ref batch, _) => {
282 {
283 let _span = tracing::trace_span!("update task data").entered();
284 process_task_data(snapshots, Some(batch))?;
285 let span = tracing::trace_span!("flush task data").entered();
286 parallel::try_for_each(
287 &[KeySpace::TaskMeta, KeySpace::TaskData],
288 |&key_space| {
289 let _span = span.clone().entered();
290 unsafe { batch.flush(key_space) }
293 },
294 )?;
295 }
296
297 let mut next_task_id = get_next_free_task_id::<
298 T::SerialWriteBatch<'_>,
299 T::ConcurrentWriteBatch<'_>,
300 >(&mut WriteBatchRef::concurrent(batch))?;
301
302 {
303 let _span = tracing::trace_span!(
304 "update task cache",
305 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
306 )
307 .entered();
308 let max_task_id = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
309 task_cache_updates,
310 |updates| {
311 let _span = _span.clone().entered();
312 let mut max_task_id = 0;
313
314 let mut task_type_bytes =
318 TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
319 for (task_type, task_id) in updates {
320 task_type_bytes.clear();
321 encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
322 let task_id: u32 = *task_id;
323
324 batch
325 .put(
326 KeySpace::ForwardTaskCache,
327 WriteBuffer::Borrowed(&task_type_bytes),
328 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
329 )
330 .with_context(|| {
331 anyhow!(
332 "Unable to write task cache {task_type:?} => {task_id}"
333 )
334 })?;
335 batch
336 .put(
337 KeySpace::ReverseTaskCache,
338 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
339 WriteBuffer::Borrowed(&task_type_bytes),
340 )
341 .with_context(|| {
342 anyhow!(
343 "Unable to write task cache {task_id} => {task_type:?}"
344 )
345 })?;
346 max_task_id = max_task_id.max(task_id);
347 }
348
349 Ok(max_task_id)
350 },
351 )?
352 .into_iter()
353 .max()
354 .unwrap_or(0);
355 next_task_id = next_task_id.max(max_task_id + 1);
356 }
357
358 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
359 &mut WriteBatchRef::concurrent(batch),
360 next_task_id,
361 operations,
362 )?;
363 }
364 WriteBatch::Serial(batch) => {
365 {
366 let _span = tracing::trace_span!("update tasks").entered();
367 let task_items =
368 process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
369 for (task_id, meta, data) in task_items.into_iter().flatten() {
370 let key = IntKey::new(*task_id);
371 let key = key.as_ref();
372 if let Some(meta) = meta {
373 batch
374 .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
375 .with_context(|| {
376 anyhow!("Unable to write meta items for {task_id}")
377 })?;
378 }
379 if let Some(data) = data {
380 batch
381 .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
382 .with_context(|| {
383 anyhow!("Unable to write data items for {task_id}")
384 })?;
385 }
386 }
387 batch.flush(KeySpace::TaskMeta)?;
388 batch.flush(KeySpace::TaskData)?;
389 }
390
391 let mut next_task_id = get_next_free_task_id::<
392 T::SerialWriteBatch<'_>,
393 T::ConcurrentWriteBatch<'_>,
394 >(&mut WriteBatchRef::serial(batch))?;
395
396 {
397 let _span = tracing::trace_span!(
398 "update task cache",
399 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
400 )
401 .entered();
402 let mut task_type_bytes =
406 TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
407 for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
408 encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
409 let task_id = *task_id;
410
411 batch
412 .put(
413 KeySpace::ForwardTaskCache,
414 WriteBuffer::Borrowed(&task_type_bytes),
415 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
416 )
417 .with_context(|| {
418 anyhow!("Unable to write task cache {task_type:?} => {task_id}")
419 })?;
420 batch
421 .put(
422 KeySpace::ReverseTaskCache,
423 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
424 WriteBuffer::Borrowed(&task_type_bytes),
425 )
426 .with_context(|| {
427 anyhow!("Unable to write task cache {task_id} => {task_type:?}")
428 })?;
429 next_task_id = next_task_id.max(task_id + 1);
430 }
431 }
432
433 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
434 &mut WriteBatchRef::serial(batch),
435 next_task_id,
436 operations,
437 )?;
438 }
439 }
440
441 {
442 let _span = tracing::trace_span!("commit").entered();
443 batch
444 .commit()
445 .with_context(|| anyhow!("Unable to commit operations"))?;
446 }
447 Ok(())
448 }
449
450 fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
451 self.inner.database.begin_read_transaction().ok()
452 }
453
454 unsafe fn forward_lookup_task_cache(
455 &self,
456 tx: Option<&T::ReadTransaction<'_>>,
457 task_type: &CachedTaskType,
458 ) -> Result<Option<TaskId>> {
459 let inner = &*self.inner;
460 fn lookup<D: KeyValueDatabase>(
461 database: &D,
462 tx: &D::ReadTransaction<'_>,
463 task_type: &CachedTaskType,
464 ) -> Result<Option<TaskId>> {
465 let mut task_type_bytes = TurboBincodeBuffer::new();
466 encode_task_type(task_type, &mut task_type_bytes, None)?;
467 let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type_bytes)?
468 else {
469 return Ok(None);
470 };
471 let bytes = bytes.borrow().try_into()?;
472 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
473 Ok(Some(id))
474 }
475 if inner.database.is_empty() {
476 return Ok(None);
479 }
480 inner
481 .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
482 .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
483 }
484
485 unsafe fn reverse_lookup_task_cache(
486 &self,
487 tx: Option<&T::ReadTransaction<'_>>,
488 task_id: TaskId,
489 ) -> Result<Option<Arc<CachedTaskType>>> {
490 let inner = &*self.inner;
491 fn lookup<D: KeyValueDatabase>(
492 database: &D,
493 tx: &D::ReadTransaction<'_>,
494 task_id: TaskId,
495 ) -> Result<Option<Arc<CachedTaskType>>> {
496 let Some(bytes) = database.get(
497 tx,
498 KeySpace::ReverseTaskCache,
499 IntKey::new(*task_id).as_ref(),
500 )?
501 else {
502 return Ok(None);
503 };
504 Ok(Some(turbo_bincode_decode(bytes.borrow())?))
505 }
506 inner
507 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
508 .with_context(|| format!("Looking up task type for {task_id} from database failed"))
509 }
510
511 unsafe fn lookup_data(
512 &self,
513 tx: Option<&T::ReadTransaction<'_>>,
514 task_id: TaskId,
515 category: TaskDataCategory,
516 ) -> Result<Vec<CachedDataItem>> {
517 let inner = &*self.inner;
518 fn lookup<D: KeyValueDatabase>(
519 database: &D,
520 tx: &D::ReadTransaction<'_>,
521 task_id: TaskId,
522 category: TaskDataCategory,
523 ) -> Result<Vec<CachedDataItem>> {
524 let Some(bytes) = database.get(
525 tx,
526 match category {
527 TaskDataCategory::Meta => KeySpace::TaskMeta,
528 TaskDataCategory::Data => KeySpace::TaskData,
529 TaskDataCategory::All => unreachable!(),
530 },
531 IntKey::new(*task_id).as_ref(),
532 )?
533 else {
534 return Ok(Vec::new());
535 };
536 let result: Vec<CachedDataItem> = turbo_bincode_decode(bytes.borrow())?;
537 Ok(result)
538 }
539 inner
540 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
541 .with_context(|| format!("Looking up data for {task_id} from database failed"))
542 }
543
544 fn shutdown(&self) -> Result<()> {
545 self.inner.database.shutdown()
546 }
547}
548
549fn get_next_free_task_id<'a, S, C>(
550 batch: &mut WriteBatchRef<'_, 'a, S, C>,
551) -> Result<u32, anyhow::Error>
552where
553 S: SerialWriteBatch<'a>,
554 C: ConcurrentWriteBatch<'a>,
555{
556 Ok(
557 match batch.get(
558 KeySpace::Infra,
559 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
560 )? {
561 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
562 None => 1,
563 },
564 )
565}
566
567fn save_infra<'a, S, C>(
568 batch: &mut WriteBatchRef<'_, 'a, S, C>,
569 next_task_id: u32,
570 operations: Vec<Arc<AnyOperation>>,
571) -> Result<(), anyhow::Error>
572where
573 S: SerialWriteBatch<'a>,
574 C: ConcurrentWriteBatch<'a>,
575{
576 {
577 batch
578 .put(
579 KeySpace::Infra,
580 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
581 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
582 )
583 .context("Unable to write next free task id")?;
584 }
585 {
586 let _span =
587 tracing::trace_span!("update operations", operations = operations.len()).entered();
588 let operations =
589 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
590 batch
591 .put(
592 KeySpace::Infra,
593 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
594 WriteBuffer::SmallVec(operations),
595 )
596 .context("Unable to write operations")?;
597 }
598 batch.flush(KeySpace::Infra)?;
599 Ok(())
600}
601
602fn encode_task_type(
603 task_type: &CachedTaskType,
604 buffer: &mut TurboBincodeBuffer,
605 task_id: Option<TaskId>,
606) -> Result<()> {
607 fn encode_once_into(
608 task_type: &CachedTaskType,
609 buffer: &mut TurboBincodeBuffer,
610 task_id: Option<TaskId>,
611 ) -> Result<()> {
612 turbo_bincode_encode_into(task_type, buffer).with_context(|| {
613 if let Some(task_id) = task_id {
614 format!("Unable to serialize task {task_id} cache key {task_type:?}")
615 } else {
616 format!("Unable to serialize task cache key {task_type:?}")
617 }
618 })
619 }
620
621 debug_assert!(buffer.is_empty());
622 encode_once_into(task_type, buffer, task_id)?;
623
624 if cfg!(feature = "verify_serialization") {
625 macro_rules! println_and_panic {
626 ($($tt:tt)*) => {
627 println!($($tt)*);
628 panic!($($tt)*);
629 };
630 }
631 let deserialize: Result<CachedTaskType, _> = turbo_bincode_decode(buffer);
632 match deserialize {
633 Err(err) => {
634 println_and_panic!("Task type would not be deserializable:\n{err:?}");
635 }
636 Ok(task_type2) => {
637 if &task_type2 != task_type {
638 println_and_panic!(
639 "Task type would not round-trip {task_id:?}:\noriginal: \
640 {task_type:#?}\nround-tripped: {task_type2:#?}"
641 );
642 }
643 let mut buffer2 = TurboBincodeBuffer::new();
644 match encode_once_into(&task_type2, &mut buffer2, task_id) {
645 Err(err) => {
646 println_and_panic!(
647 "Task type would not be serializable the second time:\n{err:?}"
648 );
649 }
650 Ok(()) => {
651 if buffer2 != *buffer {
652 println_and_panic!(
653 "Task type would not serialize to the same bytes the second time \
654 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
655 buffer,
656 buffer2
657 );
658 }
659 }
660 }
661 }
662 }
663 }
664
665 Ok(())
666}
667
668type SerializedTasks = Vec<
669 Vec<(
670 TaskId,
671 Option<WriteBuffer<'static>>,
672 Option<WriteBuffer<'static>>,
673 )>,
674>;
675
676fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
677 tasks: Vec<I>,
678 batch: Option<&B>,
679) -> Result<SerializedTasks>
680where
681 I: Iterator<
682 Item = (
683 TaskId,
684 Option<TurboBincodeBuffer>,
685 Option<TurboBincodeBuffer>,
686 ),
687 > + Send
688 + Sync,
689{
690 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
691 let mut result = Vec::new();
692 for (task_id, meta, data) in tasks {
693 if let Some(batch) = batch {
694 let key = IntKey::new(*task_id);
695 let key = key.as_ref();
696 if let Some(meta) = meta {
697 batch.put(
698 KeySpace::TaskMeta,
699 WriteBuffer::Borrowed(key),
700 WriteBuffer::SmallVec(meta),
701 )?;
702 }
703 if let Some(data) = data {
704 batch.put(
705 KeySpace::TaskData,
706 WriteBuffer::Borrowed(key),
707 WriteBuffer::SmallVec(data),
708 )?;
709 }
710 } else {
711 result.push((
713 task_id,
714 meta.map(WriteBuffer::SmallVec),
715 data.map(WriteBuffer::SmallVec),
716 ));
717 }
718 }
719
720 Ok(result)
721 })
722}
723
724fn encode_task_data(task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
725 let orig_result = turbo_bincode_encode(data);
726 if !cfg!(feature = "verify_serialization")
727 && let Ok(value) = orig_result
728 {
729 return Ok(value);
730 }
731
732 let mut error = Ok(());
733 let mut filtered_data = data.clone();
734 filtered_data.retain(|item| match turbo_bincode_encode(&item) {
735 Ok(buf) => {
736 if cfg!(feature = "verify_serialization") {
737 let deserialized = turbo_bincode_decode::<CachedDataItem>(&buf);
738 if let Err(err) = deserialized {
739 println!("Data item would not be deserializable {task}: {err:?}\n{item:?}");
740 return false;
741 }
742 }
743 true
744 }
745 Err(err) => {
746 if item.is_optional() {
747 if cfg!(feature = "verify_serialization") {
748 println!(
749 "Skipping non-encodable optional item for {task}: {item:?} due to {err}"
750 );
751 }
752 } else {
753 error =
754 Err(err).context(format!("Unable to encode data item for {task}: {item:?}"));
755 }
756 false
757 }
758 });
759 error?;
760
761 (if filtered_data.len() == data.len() {
762 orig_result
763 } else {
764 turbo_bincode_encode(&filtered_data)
765 })
766 .with_context(|| format!("Unable to serialize data items for {task}: {filtered_data:#?}"))
767}