1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{self, File, OpenOptions, ReadDir},
5 io::{BufWriter, Write},
6 mem::take,
7 ops::RangeInclusive,
8 path::{Path, PathBuf},
9 sync::{
10 OnceLock,
11 atomic::{AtomicBool, AtomicU32, Ordering},
12 },
13};
14
15use anyhow::{Context, Result, bail};
16use byteorder::{BE, ReadBytesExt, WriteBytesExt};
17use dashmap::DashSet;
18use jiff::Timestamp;
19use memmap2::Mmap;
20use nohash_hasher::BuildNoHashHasher;
21use parking_lot::{Mutex, RwLock};
22use smallvec::SmallVec;
23use tracing::span::EnteredSpan;
24
25pub use crate::compaction::selector::CompactConfig;
26use crate::{
27 DbConfig, FamilyKind, QueryKey,
28 arc_bytes::ArcBytes,
29 compaction::selector::{Compactable, get_merge_segments},
30 compression::{checksum_block, decompress_into_arc},
31 constants::{
32 DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE,
33 MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE,
34 },
35 key::{StoreKey, hash_key},
36 lookup_entry::{IterValue, LookupEntry, LookupValue},
37 merge_iter::MergeIter,
38 meta_file::{MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
39 meta_file_builder::MetaFileBuilder,
40 mmap_helper::advise_mmap_for_persistence,
41 parallel_scheduler::ParallelScheduler,
42 rc_bytes::RcBytes,
43 sst_filter::SstFilter,
44 static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFileIter},
45 static_sorted_file_builder::{StaticSortedFileBuilderMeta, StreamingSstWriter},
46 write_batch::{FinishResult, WriteBatch},
47};
48
49#[cfg(feature = "stats")]
50#[derive(Debug)]
51pub struct CacheStatistics {
52 pub hit_rate: f32,
53 pub fill: f32,
54 pub items: usize,
55 pub size: u64,
56 pub hits: u64,
57 pub misses: u64,
58}
59
60#[cfg(feature = "stats")]
61impl CacheStatistics {
62 fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
63 where
64 Key: Eq + std::hash::Hash,
65 Val: Clone,
66 We: quick_cache::Weighter<Key, Val> + Clone,
67 B: std::hash::BuildHasher + Clone,
68 L: quick_cache::Lifecycle<Key, Val> + Clone,
69 {
70 let size = cache.weight();
71 let hits = cache.hits();
72 let misses = cache.misses();
73 Self {
74 hit_rate: hits as f32 / (hits + misses) as f32,
75 fill: size as f32 / cache.capacity() as f32,
76 items: cache.len(),
77 size,
78 hits,
79 misses,
80 }
81 }
82}
83
84#[cfg(feature = "stats")]
85#[derive(Debug)]
86pub struct Statistics {
87 pub meta_files: usize,
88 pub sst_files: usize,
89 pub key_block_cache: CacheStatistics,
90 pub value_block_cache: CacheStatistics,
91 pub hits: u64,
92 pub misses: u64,
93 pub miss_family: u64,
94 pub miss_range: u64,
95 pub miss_amqf: u64,
96 pub miss_key: u64,
97}
98
99#[cfg(feature = "stats")]
100#[derive(Default)]
101struct TrackedStats {
102 hits_deleted: std::sync::atomic::AtomicU64,
103 hits_small: std::sync::atomic::AtomicU64,
104 hits_blob: std::sync::atomic::AtomicU64,
105 miss_family: std::sync::atomic::AtomicU64,
106 miss_range: std::sync::atomic::AtomicU64,
107 miss_amqf: std::sync::atomic::AtomicU64,
108 miss_key: std::sync::atomic::AtomicU64,
109 miss_global: std::sync::atomic::AtomicU64,
110}
111
112enum ActiveWriteState {
114 Active(&'static str),
117 Error,
120}
121
122enum DeferredDeletion {
128 Sst(u32),
129 Meta(u32),
130 Blob(u32),
131}
132
133pub(crate) struct WriteOperationGuard<'a> {
140 active: &'a Mutex<Option<ActiveWriteState>>,
142 path: &'a Path,
144 seq_before: u32,
148 succeeded: bool,
150}
151
152impl WriteOperationGuard<'_> {
153 pub(crate) fn success(&mut self) {
157 self.succeeded = true;
158 }
159}
160
161fn delete_orphan_files(path: &Path, seq_before: u32) -> Result<()> {
166 let mut current_file = OpenOptions::new()
170 .write(true)
171 .truncate(false)
172 .read(false)
173 .open(path.join("CURRENT"))?;
174 current_file.write_u32::<BE>(seq_before)?;
175 current_file.sync_all()?;
176
177 for entry in fs::read_dir(path)? {
178 let entry = entry?;
179 let path = entry.path();
180 if let Some(ext) = path.extension().and_then(|s| s.to_str())
181 && let Some(seq) = path
182 .file_stem()
183 .and_then(|s| s.to_str())
184 .and_then(|s| s.parse::<u32>().ok())
185 && seq > seq_before
186 {
187 match ext {
188 "sst" | "meta" | "blob" | "del" => fs::remove_file(&path)?,
189 _ => {}
190 }
191 }
192 }
193 Ok(())
194}
195
196impl Drop for WriteOperationGuard<'_> {
197 fn drop(&mut self) {
198 if self.succeeded {
199 *self.active.lock() = None;
201 return;
202 }
203
204 match delete_orphan_files(self.path, self.seq_before) {
207 Ok(()) => *self.active.lock() = None,
208 Err(_) => *self.active.lock() = Some(ActiveWriteState::Error),
209 }
210 }
211}
212
213pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
216 parallel_scheduler: S,
217 path: PathBuf,
219 read_only: bool,
222 inner: RwLock<Inner<FAMILIES>>,
224 is_empty: AtomicBool,
227 active_write_operation: Mutex<Option<ActiveWriteState>>,
230 deferred_deletions: Mutex<Vec<DeferredDeletion>>,
234 key_block_cache: OnceLock<BlockCache>,
238 value_block_cache: OnceLock<BlockCache>,
241 config: DbConfig<FAMILIES>,
243 #[cfg(feature = "stats")]
245 stats: TrackedStats,
246}
247
248struct Inner<const FAMILIES: usize> {
250 meta_files: Vec<MetaFile>,
252 current_sequence_number: u32,
254 accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
258}
259
260pub struct CommitOptions {
261 new_meta_files: Vec<(u32, File)>,
262 new_sst_files: Vec<(u32, File)>,
263 new_blob_files: Vec<(u32, File)>,
264 sst_seq_numbers_to_delete: Vec<u32>,
265 blob_seq_numbers_to_delete: Vec<u32>,
266 sequence_number: u32,
267 keys_written: u64,
268}
269
270struct OpenOpts<S: ParallelScheduler, const FAMILIES: usize> {
271 path: PathBuf,
272 read_only: bool,
273 parallel_scheduler: S,
274 config: DbConfig<FAMILIES>,
275}
276
277impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
278 pub fn open(path: PathBuf) -> Result<Self> {
283 Self::open_with_parallel_scheduler(path, Default::default())
284 }
285
286 pub fn open_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
288 Self::open_with_config_and_parallel_scheduler(path, config, Default::default())
289 }
290
291 pub fn open_read_only_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
294 Self::open_read_only_with_parallel_scheduler(path, config, Default::default())
295 }
296
297 pub fn empty_in_memory_with_config(config: DbConfig<FAMILIES>) -> Self {
301 Self::new(OpenOpts {
304 path: PathBuf::new(),
305 read_only: true,
306 parallel_scheduler: Default::default(),
307 config,
308 })
309 }
310}
311
312impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
313 fn new(
314 OpenOpts {
315 path,
316 read_only,
317 parallel_scheduler,
318 config,
319 }: OpenOpts<S, FAMILIES>,
320 ) -> Self {
321 Self {
322 parallel_scheduler,
323 path,
324 read_only,
325 inner: RwLock::new(Inner {
326 meta_files: Vec::new(),
327 current_sequence_number: 0,
328 accessed_key_hashes: [(); FAMILIES]
329 .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
330 }),
331 is_empty: AtomicBool::new(true),
332 active_write_operation: Mutex::new(None),
333 deferred_deletions: Mutex::new(Vec::new()),
334 key_block_cache: OnceLock::new(),
335 value_block_cache: OnceLock::new(),
336 config,
337 #[cfg(feature = "stats")]
338 stats: TrackedStats::default(),
339 }
340 }
341
342 pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
347 Self::open_with_config_and_parallel_scheduler(path, DbConfig::default(), parallel_scheduler)
348 }
349
350 pub fn open_with_config_and_parallel_scheduler(
352 path: PathBuf,
353 config: DbConfig<FAMILIES>,
354 parallel_scheduler: S,
355 ) -> Result<Self> {
356 let mut db = Self::new(OpenOpts {
357 path,
358 read_only: false,
359 parallel_scheduler,
360 config,
361 });
362 db.open_directory(false)?;
363 Ok(db)
364 }
365
366 fn open_read_only_with_parallel_scheduler(
369 path: PathBuf,
370 config: DbConfig<FAMILIES>,
371 parallel_scheduler: S,
372 ) -> Result<Self> {
373 let mut db = Self::new(OpenOpts {
374 path,
375 read_only: true,
376 parallel_scheduler,
377 config,
378 });
379 db.open_directory(false)?;
380 Ok(db)
381 }
382
383 fn open_directory(&mut self, read_only: bool) -> Result<()> {
385 match fs::read_dir(&self.path) {
386 Ok(entries) => {
387 if !self
388 .load_directory(entries, read_only)
389 .context("Loading persistence directory failed")?
390 {
391 if read_only {
392 bail!("Failed to open database");
393 }
394 self.init_directory()
395 .context("Initializing persistence directory failed")?;
396 }
397 Ok(())
398 }
399 Err(e) => {
400 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
401 self.create_and_init_directory()
402 .context("Creating and initializing persistence directory failed")?;
403 Ok(())
404 } else {
405 Err(e).context("Failed to open database")
406 }
407 }
408 }
409 }
410
411 fn create_and_init_directory(&mut self) -> Result<()> {
413 fs::create_dir_all(&self.path)?;
414 self.init_directory()
415 }
416
417 fn init_directory(&mut self) -> Result<()> {
419 let mut current = File::create(self.path.join("CURRENT"))?;
420 current.write_u32::<BE>(0)?;
421 current.flush()?;
422 Ok(())
423 }
424
425 fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
427 let mut meta_files = Vec::new();
428 let mut current_file = match File::open(self.path.join("CURRENT")) {
429 Ok(file) => file,
430 Err(e) => {
431 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
432 return Ok(false);
433 } else {
434 return Err(e).context("Failed to open CURRENT file");
435 }
436 }
437 };
438 let current = current_file.read_u32::<BE>()?;
439 drop(current_file);
440
441 let mut deleted_files = HashSet::new();
442 for entry in entries {
443 let entry = entry?;
444 let path = entry.path();
445 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
446 let seq: u32 = path
447 .file_stem()
448 .context("File has no file stem")?
449 .to_str()
450 .context("File stem is not valid utf-8")?
451 .parse()?;
452 if deleted_files.contains(&seq) {
453 continue;
454 }
455 if seq > current {
456 if !read_only {
457 fs::remove_file(&path)?;
458 }
459 } else {
460 match ext {
461 "meta" => {
462 meta_files.push(seq);
463 }
464 "del" => {
465 let mut content = &*fs::read(&path)?;
466 let mut no_existing_files = true;
467 while !content.is_empty() {
468 let seq = content.read_u32::<BE>()?;
469 deleted_files.insert(seq);
470 if !read_only {
471 let sst_file = self.path.join(format!("{seq:08}.sst"));
473 let meta_file = self.path.join(format!("{seq:08}.meta"));
474 let blob_file = self.path.join(format!("{seq:08}.blob"));
475 for path in [sst_file, meta_file, blob_file] {
476 if fs::exists(&path)? {
477 fs::remove_file(path)?;
478 no_existing_files = false;
479 }
480 }
481 }
482 }
483 if !read_only && no_existing_files {
484 fs::remove_file(&path)?;
485 }
486 }
487 "blob" | "sst" => {
488 }
490 _ => {
491 if !path
492 .file_name()
493 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
494 {
495 bail!("Unexpected file in persistence directory: {:?}", path);
496 }
497 }
498 }
499 }
500 } else {
501 match path.file_stem().and_then(|s| s.to_str()) {
502 Some("CURRENT") => {
503 }
505 Some("LOG") => {
506 }
508 _ => {
509 if !path
510 .file_name()
511 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
512 {
513 bail!("Unexpected file in persistence directory: {:?}", path);
514 }
515 }
516 }
517 }
518 }
519
520 meta_files.retain(|seq| !deleted_files.contains(seq));
521 meta_files.sort_unstable();
522 let mut meta_files = self
523 .parallel_scheduler
524 .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
525 let meta_file = MetaFile::open(&self.path, seq)?;
526 Ok(meta_file)
527 })?;
528
529 let mut sst_filter = SstFilter::new();
530 for meta_file in meta_files.iter_mut().rev() {
531 sst_filter.apply_filter(meta_file);
532 }
533
534 let inner = self.inner.get_mut();
535 self.is_empty
536 .store(meta_files.is_empty(), Ordering::Relaxed);
537 inner.meta_files = meta_files;
538 inner.current_sequence_number = current;
539 Ok(true)
540 }
541
542 #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
544 fn read_blob(&self, seq: u32) -> Result<ArcBytes> {
545 let path = self.path.join(format!("{seq:08}.blob"));
546 let file = File::open(&path)
547 .with_context(|| format!("Failed to open blob file {}", path.display()))?;
548 let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
549 format!(
550 "Failed to mmap blob file {} ({} bytes)",
551 path.display(),
552 file.metadata().map(|m| m.len()).unwrap_or(0)
553 )
554 })?;
555 #[cfg(unix)]
556 mmap.advise(memmap2::Advice::Sequential)?;
557 #[cfg(unix)]
558 mmap.advise(memmap2::Advice::WillNeed)?;
559 advise_mmap_for_persistence(&mmap)?;
560 let mut reader = &mmap[..];
561 let uncompressed_length = reader
562 .read_u32::<BE>()
563 .context("Failed to read uncompressed length from blob file")?;
564 let expected_checksum = reader.read_u32::<BE>()?;
565
566 let actual_checksum = checksum_block(reader);
568 if actual_checksum != expected_checksum {
569 bail!(
570 "Cache corruption detected: checksum mismatch in blob file {:08}.blob (expected \
571 {:08x}, got {:08x})",
572 seq,
573 expected_checksum,
574 actual_checksum
575 );
576 }
577
578 let buffer = decompress_into_arc(uncompressed_length, reader)?;
579 Ok(ArcBytes::from(buffer))
580 }
581
582 pub fn is_empty(&self) -> bool {
584 self.is_empty.load(Ordering::Relaxed)
585 }
586
587 pub fn has_unrecoverable_write_error(&self) -> bool {
590 matches!(
591 *self.active_write_operation.lock(),
592 Some(ActiveWriteState::Error)
593 )
594 }
595
596 fn acquire_write_operation(&self, name: &'static str) -> Result<WriteOperationGuard<'_>> {
600 if self.read_only {
601 bail!("Cannot perform write operations on a read-only database");
602 }
603 let mut slot = self.active_write_operation.lock();
604 match &*slot {
605 Some(ActiveWriteState::Active(active_name)) => {
606 bail!(
607 "Another {active_name} is already active (only a single write operation is \
608 allowed at a time)"
609 );
610 }
611 Some(ActiveWriteState::Error) => {
612 bail!(
613 "A previous write operation failed with an unrecoverable error; no further \
614 writes are possible"
615 );
616 }
617 None => {}
618 }
619 *slot = Some(ActiveWriteState::Active(name));
620 drop(slot); let seq_before = self.inner.read().current_sequence_number;
622 Ok(WriteOperationGuard {
623 active: &self.active_write_operation,
624 path: &self.path,
625 seq_before,
626 succeeded: false,
627 })
628 }
629
630 pub fn write_batch<K: StoreKey + Send + Sync>(&self) -> Result<WriteBatch<'_, K, S, FAMILIES>> {
635 let guard = self.acquire_write_operation("write batch")?;
636 let current = guard.seq_before;
638 Ok(WriteBatch::new(
639 guard,
640 self.path.clone(),
641 current,
642 self.parallel_scheduler.clone(),
643 self.config.family_configs,
644 ))
645 }
646
647 fn key_block_cache(&self) -> &BlockCache {
648 self.key_block_cache.get_or_init(|| {
649 BlockCache::with(
650 KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
651 KEY_BLOCK_CACHE_SIZE,
652 Default::default(),
653 Default::default(),
654 Default::default(),
655 )
656 })
657 }
658
659 fn value_block_cache(&self) -> &BlockCache {
660 self.value_block_cache.get_or_init(|| {
661 BlockCache::with(
662 VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
663 VALUE_BLOCK_CACHE_SIZE,
664 Default::default(),
665 Default::default(),
666 Default::default(),
667 )
668 })
669 }
670
671 pub fn clear_cache(&self) {
673 self.clear_block_caches();
674 for meta in self.inner.write().meta_files.iter_mut() {
675 meta.clear_cache();
676 }
677 }
678
679 pub fn clear_block_caches(&self) {
682 if let Some(cache) = self.key_block_cache.get() {
683 cache.clear();
684 }
685 if let Some(cache) = self.value_block_cache.get() {
686 cache.clear();
687 }
688 }
689
690 pub fn prepare_all_sst_caches(&self) {
693 for meta in self.inner.write().meta_files.iter_mut() {
694 meta.prepare_sst_cache();
695 }
696 }
697
698 fn open_log(&self) -> Result<BufWriter<File>> {
699 if self.read_only {
700 unreachable!("Only write operations can open the log file");
701 }
702 let log_path = self.path.join("LOG");
703 let log_file = OpenOptions::new()
704 .create(true)
705 .append(true)
706 .open(log_path)?;
707 Ok(BufWriter::new(log_file))
708 }
709
710 pub fn commit_write_batch<K: StoreKey + Send + Sync>(
713 &self,
714 mut write_batch: WriteBatch<'_, K, S, FAMILIES>,
715 ) -> Result<()> {
716 if self.read_only {
717 unreachable!("It's not possible to create a write batch for a read-only database");
718 }
719 let FinishResult {
720 sequence_number,
721 new_meta_files,
722 new_sst_files,
723 new_blob_files,
724 keys_written,
725 } = write_batch.finish(|family| {
726 let inner = self.inner.read();
727 let set = &inner.accessed_key_hashes[family as usize];
728 let initial_capacity = set.len() * 20 / 19;
731 let mut amqf =
737 qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
738 .unwrap();
739 set.retain(|hash| {
742 amqf.insert_fingerprint(false, *hash)
746 .expect("Failed to insert fingerprint");
747 false
748 });
749 amqf
750 })?;
751 self.commit(CommitOptions {
752 new_meta_files,
753 new_sst_files,
754 new_blob_files,
755 sst_seq_numbers_to_delete: vec![],
756 blob_seq_numbers_to_delete: vec![],
757 sequence_number,
758 keys_written,
759 })?;
760 write_batch.mark_succeeded();
762 Ok(())
763 }
764
765 fn commit(
768 &self,
769 CommitOptions {
770 mut new_meta_files,
771 new_sst_files,
772 new_blob_files,
773 mut sst_seq_numbers_to_delete,
774 mut blob_seq_numbers_to_delete,
775 sequence_number: mut seq,
776 keys_written,
777 }: CommitOptions,
778 ) -> Result<(), anyhow::Error> {
779 let time = Timestamp::now();
780
781 new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
782
783 let sync_span = tracing::trace_span!("sync new files").entered();
784
785 enum SyncItem {
786 Meta(u32, File),
787 Sst(File),
788 Blob(u32, File),
789 }
790 enum SyncResult {
791 Meta(MetaFile),
792 Sst,
793 Blob(u32, File),
794 }
795
796 let mut sync_items: Vec<SyncItem> =
797 Vec::with_capacity(new_meta_files.len() + new_sst_files.len() + new_blob_files.len());
798 for (seq, file) in new_meta_files {
799 sync_items.push(SyncItem::Meta(seq, file));
800 }
801 for (_, file) in new_sst_files {
802 sync_items.push(SyncItem::Sst(file));
803 }
804 for (seq, file) in new_blob_files {
805 sync_items.push(SyncItem::Blob(seq, file));
806 }
807
808 let results: Vec<SyncResult> = self
809 .parallel_scheduler
810 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(sync_items, |item| match item {
811 SyncItem::Meta(seq, file) => {
812 file.sync_data()?;
813 let meta_file = MetaFile::open(&self.path, seq)?;
814 Ok(SyncResult::Meta(meta_file))
815 }
816 SyncItem::Sst(file) => {
817 file.sync_data()?;
818 Ok(SyncResult::Sst)
819 }
820 SyncItem::Blob(seq, file) => {
821 file.sync_data()?;
822 Ok(SyncResult::Blob(seq, file))
823 }
824 })?;
825
826 let mut new_meta_files: Vec<MetaFile> = Vec::new();
827 let mut new_blob_files: Vec<(u32, File)> = Vec::new();
828 for result in results {
829 match result {
830 SyncResult::Meta(mf) => new_meta_files.push(mf),
831 SyncResult::Sst => {}
832 SyncResult::Blob(seq, file) => new_blob_files.push((seq, file)),
833 }
834 }
835
836 let mut sst_filter = SstFilter::new();
837 for meta_file in new_meta_files.iter_mut().rev() {
838 sst_filter.apply_filter(meta_file);
839 }
840
841 File::open(&self.path)?.sync_data()?;
845 drop(sync_span);
846
847 let new_meta_info = new_meta_files
848 .iter()
849 .map(|meta| {
850 let ssts = meta
851 .entries()
852 .iter()
853 .map(|entry| {
854 let seq = entry.sequence_number();
855 let range = entry.range();
856 let size = entry.size();
857 let flags = entry.flags();
858 (seq, range.min_hash, range.max_hash, size, flags)
859 })
860 .collect::<Vec<_>>();
861 (
862 meta.sequence_number(),
863 meta.family(),
864 ssts,
865 meta.obsolete_sst_files().to_vec(),
866 )
867 })
868 .collect::<Vec<_>>();
869
870 let has_delete_file;
879 let mut meta_seq_numbers_to_delete = Vec::new();
880 let entries_to_remove;
881
882 {
883 let inner = self.inner.read();
884
885 entries_to_remove = inner
891 .meta_files
892 .iter()
893 .rev()
894 .map(|meta_file| sst_filter.apply_filter_collect(meta_file))
895 .collect::<Vec<_>>();
896
897 for meta_file in new_meta_files.iter().rev() {
904 let should_remove = sst_filter.apply_and_get_remove(meta_file);
905 debug_assert!(
906 !should_remove,
907 "newly created meta file should never be a candidate for removal"
908 );
909 }
910 for i in (0..inner.meta_files.len()).rev() {
911 if sst_filter.apply_and_get_remove(&inner.meta_files[i]) {
912 meta_seq_numbers_to_delete.push(inner.meta_files[i].sequence_number());
913 }
914 }
915
916 has_delete_file = !sst_seq_numbers_to_delete.is_empty()
920 || !blob_seq_numbers_to_delete.is_empty()
921 || !meta_seq_numbers_to_delete.is_empty();
922 }
923 if has_delete_file {
924 seq += 1;
925 }
926
927 self.parallel_scheduler.block_in_place(|| {
928 if has_delete_file {
929 sst_seq_numbers_to_delete.sort_unstable();
930 meta_seq_numbers_to_delete.sort_unstable();
931 blob_seq_numbers_to_delete.sort_unstable();
932 let mut buf = Vec::with_capacity(
934 (sst_seq_numbers_to_delete.len()
935 + meta_seq_numbers_to_delete.len()
936 + blob_seq_numbers_to_delete.len())
937 * size_of::<u32>(),
938 );
939 for seq in sst_seq_numbers_to_delete.iter() {
940 buf.write_u32::<BE>(*seq)?;
941 }
942 for seq in meta_seq_numbers_to_delete.iter() {
943 buf.write_u32::<BE>(*seq)?;
944 }
945 for seq in blob_seq_numbers_to_delete.iter() {
946 buf.write_u32::<BE>(*seq)?;
947 }
948 let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
949 file.write_all(&buf)?;
950 file.sync_data()?;
951 }
952
953 let mut current_file = OpenOptions::new()
954 .write(true)
955 .truncate(false)
956 .read(false)
957 .open(self.path.join("CURRENT"))?;
958 current_file.write_u32::<BE>(seq)?;
959 current_file.sync_data()?;
960
961 if let Err(e) = (|| {
979 let mut log = self.open_log()?;
980 writeln!(log, "Time {time}")?;
981 let span = time.until(Timestamp::now())?;
982 writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
983 writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?;
984 for (meta_seq, family, ssts, obsolete) in new_meta_info {
985 for (seq, min, max, size, flags) in ssts {
986 writeln!(
987 log,
988 "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB, {})",
989 range_to_str(min, max),
990 size / 1024 / 1024,
991 flags
992 )?;
993 }
994 for obsolete in obsolete.chunks(15) {
995 write!(log, "{family:3} | {meta_seq:08} |")?;
996 for seq in obsolete {
997 write!(log, " {seq:08}")?;
998 }
999 writeln!(log, " OBSOLETE SST")?;
1000 }
1001 }
1002
1003 fn write_seq_numbers<W: std::io::Write, T>(
1004 log: &mut W,
1005 items: &[T],
1006 label: &str,
1007 extract_seq: fn(&T) -> u32,
1008 ) -> std::io::Result<()> {
1009 for chunk in items.chunks(15) {
1010 write!(log, " | |")?;
1011 for item in chunk {
1012 write!(log, " {:08}", extract_seq(item))?;
1013 }
1014 writeln!(log, " {}", label)?;
1015 }
1016 Ok(())
1017 }
1018
1019 new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
1020 write_seq_numbers(&mut log, &new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
1021 write_seq_numbers(
1022 &mut log,
1023 &blob_seq_numbers_to_delete,
1024 "BLOB DELETED",
1025 |&seq| seq,
1026 )?;
1027 write_seq_numbers(
1028 &mut log,
1029 &sst_seq_numbers_to_delete,
1030 "SST DELETED",
1031 |&seq| seq,
1032 )?;
1033 write_seq_numbers(
1034 &mut log,
1035 &meta_seq_numbers_to_delete,
1036 "META DELETED",
1037 |&seq| seq,
1038 )?;
1039 anyhow::Ok(())
1040 })() {
1041 eprintln!("turbo-persistence: failed to write LOG after commit {seq:08}: {e:#}");
1042 }
1043
1044 anyhow::Ok(())
1045 })?;
1046
1047 {
1053 let mut inner = self.inner.write();
1054
1055 for (meta_file, to_remove) in inner
1061 .meta_files
1062 .iter_mut()
1063 .zip(entries_to_remove.into_iter().rev())
1064 {
1065 if !to_remove.is_empty() {
1066 meta_file.retain_entries(|seq| !to_remove.contains(&seq));
1067 }
1068 }
1069
1070 inner.meta_files.append(&mut new_meta_files);
1071 if !meta_seq_numbers_to_delete.is_empty() {
1072 let to_delete: HashSet<u32> = meta_seq_numbers_to_delete.iter().copied().collect();
1073 inner
1074 .meta_files
1075 .retain(|meta| !to_delete.contains(&meta.sequence_number()));
1076 }
1077 inner.current_sequence_number = seq;
1078 self.is_empty
1079 .store(inner.meta_files.is_empty(), Ordering::Relaxed);
1080 }
1081
1082 self.deferred_deletions.lock().extend(
1087 Self::try_delete_files(&self.path, &sst_seq_numbers_to_delete, "sst")
1088 .map(DeferredDeletion::Sst)
1089 .chain(
1090 Self::try_delete_files(&self.path, &meta_seq_numbers_to_delete, "meta")
1091 .map(DeferredDeletion::Meta),
1092 )
1093 .chain(
1094 Self::try_delete_files(&self.path, &blob_seq_numbers_to_delete, "blob")
1095 .map(DeferredDeletion::Blob),
1096 ),
1097 );
1098
1099 self.retry_deferred_deletions();
1101
1102 #[cfg(feature = "verbose_log")]
1104 {
1105 let _: Result<(), _> = (|| -> anyhow::Result<()> {
1106 let mut log = self.open_log()?;
1107 writeln!(log, "New database state:")?;
1108 writeln!(log, "FAM | META SEQ | SST SEQ FLAGS | RANGE")?;
1109 let inner = self.inner.read();
1110 let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
1111 let mut set = HashSet::new();
1112 move |family| set.insert(*family)
1113 });
1114 for family in families {
1115 for meta in inner.meta_files.iter() {
1116 if meta.family() != family {
1117 continue;
1118 }
1119 let meta_seq = meta.sequence_number();
1120 for entry in meta.entries().iter() {
1121 let seq = entry.sequence_number();
1122 let range = entry.range();
1123 writeln!(
1124 log,
1125 "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
1126 entry.flags(),
1127 range_to_str(range.min_hash, range.max_hash)
1128 )?;
1129 }
1130 }
1131 }
1132 Ok(())
1133 })();
1134 }
1135
1136 Ok(())
1137 }
1138
1139 pub fn full_compact(&self) -> Result<()> {
1142 self.compact(&CompactConfig {
1143 min_merge_count: 2,
1144 optimal_merge_count: usize::MAX,
1145 max_merge_count: usize::MAX,
1146 max_merge_bytes: u64::MAX,
1147 min_merge_duplication_bytes: 0,
1148 optimal_merge_duplication_bytes: u64::MAX,
1149 max_merge_segment_count: usize::MAX,
1150 })?;
1151 Ok(())
1152 }
1153
1154 pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
1159 let mut guard = self.acquire_write_operation("compaction")?;
1160
1161 self.clear_cache();
1166
1167 let mut sequence_number;
1168 let mut new_meta_files = Vec::new();
1169 let mut new_sst_files = Vec::new();
1170 let mut sst_seq_numbers_to_delete = Vec::new();
1171 let mut blob_seq_numbers_to_delete = Vec::new();
1172 let mut keys_written = 0;
1173
1174 {
1175 let inner = self.inner.read();
1176 sequence_number = AtomicU32::new(inner.current_sequence_number);
1177 self.compact_internal(
1178 &inner.meta_files,
1179 &sequence_number,
1180 &mut new_meta_files,
1181 &mut new_sst_files,
1182 &mut sst_seq_numbers_to_delete,
1183 &mut blob_seq_numbers_to_delete,
1184 &mut keys_written,
1185 compact_config,
1186 )
1187 .context("Failed to compact database")?;
1188 }
1189
1190 let has_changes = !new_meta_files.is_empty();
1191 if has_changes {
1192 self.commit(CommitOptions {
1193 new_meta_files,
1194 new_sst_files,
1195 new_blob_files: Vec::new(),
1196 sst_seq_numbers_to_delete,
1197 blob_seq_numbers_to_delete,
1198 sequence_number: *sequence_number.get_mut(),
1199 keys_written,
1200 })
1201 .context("Failed to commit the database compaction")?;
1202 }
1203
1204 guard.success();
1205 Ok(has_changes)
1206 }
1207
1208 fn compact_internal(
1210 &self,
1211 meta_files: &[MetaFile],
1212 sequence_number: &AtomicU32,
1213 new_meta_files: &mut Vec<(u32, File)>,
1214 new_sst_files: &mut Vec<(u32, File)>,
1215 sst_seq_numbers_to_delete: &mut Vec<u32>,
1216 blob_seq_numbers_to_delete: &mut Vec<u32>,
1217 keys_written: &mut u64,
1218 compact_config: &CompactConfig,
1219 ) -> Result<()> {
1220 if meta_files.is_empty() {
1221 return Ok(());
1222 }
1223
1224 struct SstWithRange {
1225 meta_index: usize,
1226 index_in_meta: u32,
1227 seq: u32,
1228 range: StaticSortedFileRange,
1229 size: u64,
1230 flags: MetaEntryFlags,
1231 }
1232
1233 impl Compactable for SstWithRange {
1234 fn range(&self) -> RangeInclusive<u64> {
1235 self.range.min_hash..=self.range.max_hash
1236 }
1237
1238 fn size(&self) -> u64 {
1239 self.size
1240 }
1241
1242 fn category(&self) -> u8 {
1243 if self.flags.cold() { 1 } else { 0 }
1246 }
1247 }
1248
1249 let ssts_with_ranges = meta_files
1250 .iter()
1251 .enumerate()
1252 .flat_map(|(meta_index, meta)| {
1253 meta.entries()
1254 .iter()
1255 .enumerate()
1256 .map(move |(index_in_meta, entry)| SstWithRange {
1257 meta_index,
1258 index_in_meta: index_in_meta as u32,
1259 seq: entry.sequence_number(),
1260 range: entry.range(),
1261 size: entry.size(),
1262 flags: entry.flags(),
1263 })
1264 })
1265 .collect::<Vec<_>>();
1266
1267 let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
1268
1269 for sst in ssts_with_ranges {
1270 sst_by_family[sst.range.family as usize].push(sst);
1271 }
1272
1273 let path = &self.path;
1274
1275 let log_mutex = Mutex::new(());
1276
1277 struct PartialResultPerFamily {
1278 new_meta_file: Option<(u32, File)>,
1279 new_sst_files: Vec<(u32, File)>,
1280 sst_seq_numbers_to_delete: Vec<u32>,
1281 blob_seq_numbers_to_delete: Vec<u32>,
1282 keys_written: u64,
1283 }
1284
1285 let mut compact_config = compact_config.clone();
1286 let merge_jobs = sst_by_family
1287 .into_iter()
1288 .enumerate()
1289 .filter_map(|(family, ssts_with_ranges)| {
1290 if compact_config.max_merge_segment_count == 0 {
1291 return None;
1292 }
1293 let (merge_jobs, real_merge_job_size) =
1294 get_merge_segments(&ssts_with_ranges, &compact_config);
1295 compact_config.max_merge_segment_count -= real_merge_job_size;
1296 Some((family, ssts_with_ranges, merge_jobs))
1297 })
1298 .collect::<Vec<_>>();
1299
1300 let result = self
1301 .parallel_scheduler
1302 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
1303 merge_jobs,
1304 |(family, ssts_with_ranges, merge_jobs)| {
1305 let family = family as u32;
1306
1307 if merge_jobs.is_empty() {
1308 return Ok(PartialResultPerFamily {
1309 new_meta_file: None,
1310 new_sst_files: Vec::new(),
1311 sst_seq_numbers_to_delete: Vec::new(),
1312 blob_seq_numbers_to_delete: Vec::new(),
1313 keys_written: 0,
1314 });
1315 }
1316
1317 let used_key_hashes: Option<qfilter::Filter> = {
1322 let filters: Vec<qfilter::FilterRef<'_>> = meta_files
1323 .iter()
1324 .filter(|m| m.family() == family)
1325 .filter_map(|meta_file| {
1326 meta_file.deserialize_used_key_hashes_amqf().transpose()
1327 })
1328 .collect::<Result<Vec<_>>>()?
1329 .into_iter()
1330 .filter(|amqf| !amqf.is_empty())
1331 .collect();
1332 if filters.is_empty() {
1333 None
1334 } else if filters.len() == 1 {
1335 Some(filters[0].to_owned())
1337 } else {
1338 let total_len: u64 = filters.iter().map(|f| f.len()).sum();
1339 let mut merged =
1342 qfilter::Filter::with_fingerprint_size(total_len, u64::BITS as u8)
1343 .expect("Failed to create merged AMQF filter");
1344 for filter in &filters {
1345 merged
1346 .merge(false, filter)
1347 .expect("Failed to merge AMQF filters");
1348 }
1349 merged.shrink_to_fit();
1350 Some(merged)
1351 }
1352 };
1353
1354 let sst_seq_numbers_to_delete = merge_jobs
1356 .iter()
1357 .filter(|l| l.len() > 1)
1358 .flat_map(|l| l.iter().copied())
1359 .map(|index| ssts_with_ranges[index].seq)
1360 .collect::<Vec<_>>();
1361
1362 let span = tracing::trace_span!(
1364 "merge files",
1365 family = self.config.family_configs[family as usize].name
1366 );
1367 enum PartialMergeResult<'l> {
1368 Merged {
1369 new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1370 blob_seq_numbers_to_delete: Vec<u32>,
1371 keys_written: u64,
1372 indices: SmallVec<[usize; 1]>,
1373 },
1374 Move {
1375 seq: u32,
1376 meta: StaticSortedFileBuilderMeta<'l>,
1377 },
1378 }
1379 let merge_result = self
1380 .parallel_scheduler
1381 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
1382 let _span = span.clone().entered();
1383 if indices.len() == 1 {
1384 let index = indices[0];
1386 let meta_index = ssts_with_ranges[index].meta_index;
1387 let index_in_meta = ssts_with_ranges[index].index_in_meta;
1388 let meta_file = &meta_files[meta_index];
1389 let entry = meta_file.entry(index_in_meta);
1390 let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
1391 let meta = StaticSortedFileBuilderMeta {
1392 min_hash: entry.min_hash(),
1393 max_hash: entry.max_hash(),
1394 amqf,
1395 block_count: entry.block_count(),
1396 size: entry.size(),
1397 flags: entry.flags(),
1398 entries: 0,
1399 };
1400 return Ok(PartialMergeResult::Move {
1401 seq: entry.sequence_number(),
1402 meta,
1403 });
1404 }
1405
1406 let iters = indices
1410 .iter()
1411 .map(|&index| {
1412 let meta_index = ssts_with_ranges[index].meta_index;
1413 let index_in_meta = ssts_with_ranges[index].index_in_meta;
1414 let entry = meta_files[meta_index].entry(index_in_meta);
1415 StaticSortedFileIter::open(path, entry.sst_metadata())
1416 })
1417 .collect::<Result<Vec<_>>>()?;
1418
1419 let iter = MergeIter::new(iters.into_iter())?;
1420
1421 let mut blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1422
1423 struct Collector {
1424 writer: Option<(u32, StreamingSstWriter<LookupEntry>)>,
1431 flags: MetaEntryFlags,
1432 new_sst_files:
1433 Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1434 last_hash: Option<u64>,
1437 }
1438 impl Collector {
1439 fn new(flags: MetaEntryFlags) -> Self {
1440 Self {
1441 writer: None,
1442 flags,
1443 new_sst_files: Vec::new(),
1444 last_hash: None,
1445 }
1446 }
1447
1448 fn ensure_writer(
1450 &mut self,
1451 path: &Path,
1452 sequence_number: &AtomicU32,
1453 ) -> Result<&mut StreamingSstWriter<LookupEntry>>
1454 {
1455 if self.writer.is_none() {
1456 let seq =
1457 sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1458 let sst_path = path.join(format!("{seq:08}.sst"));
1459 let writer = StreamingSstWriter::new(
1460 &sst_path,
1461 self.flags,
1462 MAX_ENTRIES_PER_COMPACTED_FILE as u64,
1463 )?;
1464 self.writer = Some((seq, writer));
1465 }
1466 Ok(&mut self.writer.as_mut().unwrap().1)
1467 }
1468
1469 fn close_sst_file(&mut self, keys_written: &mut u64) -> Result<()> {
1473 if let Some((seq, writer)) = self.writer.take() {
1474 let _span =
1475 tracing::trace_span!("close merged sst file").entered();
1476 let (meta, file) = writer.close()?;
1477 *keys_written += meta.entries;
1478 self.new_sst_files.push((seq, file, meta));
1479 }
1480 Ok(())
1481 }
1482
1483 fn add_entry(
1487 &mut self,
1488 entry: LookupEntry,
1489 path: &Path,
1490 sequence_number: &AtomicU32,
1491 keys_written: &mut u64,
1492 ) -> Result<()> {
1493 let key_changed = self.last_hash != Some(entry.hash);
1494 if key_changed
1497 && let Some((_, ref writer)) = self.writer
1498 && writer.is_full(
1499 MAX_ENTRIES_PER_COMPACTED_FILE,
1500 DATA_THRESHOLD_PER_COMPACTED_FILE,
1501 )
1502 {
1503 self.close_sst_file(keys_written)?;
1504 }
1505 self.last_hash = Some(entry.hash);
1506 let writer = self.ensure_writer(path, sequence_number)?;
1507 writer.add(entry)?;
1508 Ok(())
1509 }
1510 }
1511 #[cfg(debug_assertions)]
1512 impl Drop for Collector {
1513 fn drop(&mut self) {
1514 if !std::thread::panicking() {
1515 assert!(
1516 self.writer.is_none(),
1517 "Collector dropped with an open writer"
1518 );
1519 }
1520 }
1521 }
1522 let mut used_collector = Collector::new(MetaEntryFlags::WARM);
1523 let mut unused_collector = Collector::new(MetaEntryFlags::COLD);
1524 let mut current_key: Option<RcBytes> = None;
1525 let mut keys_written = 0;
1526
1527 let mut skip_remaining_for_this_key = false;
1534 let family_config = &self.config.family_configs[family as usize];
1535
1536 for entry in iter {
1537 let entry = entry?;
1538 if current_key.as_ref() != Some(&entry.key) {
1539 skip_remaining_for_this_key = false;
1541 current_key = Some(entry.key.clone());
1542 }
1543 if !skip_remaining_for_this_key {
1544 let is_used = used_key_hashes
1545 .as_ref()
1546 .is_some_and(|amqf| amqf.contains_fingerprint(entry.hash));
1547 let collector = if is_used {
1548 &mut used_collector
1549 } else {
1550 &mut unused_collector
1551 };
1552 match family_config.kind {
1553 FamilyKind::MultiValue => {
1554 if matches!(entry.value, IterValue::Deleted) {
1557 skip_remaining_for_this_key = true;
1558 }
1559 }
1560 FamilyKind::SingleValue => {
1561 skip_remaining_for_this_key = true;
1564 }
1565 }
1566 collector.add_entry(
1567 entry,
1568 path,
1569 sequence_number,
1570 &mut keys_written,
1571 )?;
1572 } else {
1573 if let IterValue::Blob { sequence_number } = &entry.value {
1577 blob_seq_numbers_to_delete.push(*sequence_number);
1578 }
1579 }
1580 }
1581
1582 used_collector.close_sst_file(&mut keys_written)?;
1584 unused_collector.close_sst_file(&mut keys_written)?;
1585
1586 let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1587 new_sst_files.append(&mut used_collector.new_sst_files);
1588 Ok(PartialMergeResult::Merged {
1589 new_sst_files,
1590 blob_seq_numbers_to_delete,
1591 keys_written,
1592 indices,
1593 })
1594 })
1595 .with_context(|| {
1596 format!("Failed to merge database files for family {family}")
1597 })?;
1598
1599 let Some((sst_files_len, blob_delete_len)) = merge_result
1600 .iter()
1601 .map(|r| {
1602 if let PartialMergeResult::Merged {
1603 new_sst_files,
1604 blob_seq_numbers_to_delete,
1605 indices: _,
1606 keys_written: _,
1607 } = r
1608 {
1609 (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1610 } else {
1611 (0, 0)
1612 }
1613 })
1614 .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1615 else {
1616 unreachable!()
1617 };
1618
1619 let mut new_sst_files = Vec::with_capacity(sst_files_len);
1620 let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1621
1622 let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1623 let mut meta_file_builder = MetaFileBuilder::new(family);
1624
1625 let mut keys_written = 0;
1626 self.parallel_scheduler.block_in_place(|| {
1627 let guard = log_mutex.lock();
1628 let mut log = self.open_log()?;
1629 writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1630 for result in merge_result {
1631 match result {
1632 PartialMergeResult::Merged {
1633 new_sst_files: merged_new_sst_files,
1634 blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1635 keys_written: merged_keys_written,
1636 indices,
1637 } => {
1638 writeln!(
1639 log,
1640 "{family:3} | {meta_seq:08} | MERGE \
1641 ({merged_keys_written} keys):"
1642 )?;
1643 for i in indices.iter() {
1644 let seq = ssts_with_ranges[*i].seq;
1645 let (min, max) = ssts_with_ranges[*i].range().into_inner();
1646 writeln!(
1647 log,
1648 "{family:3} | {meta_seq:08} | {seq:08} INPUT | {}",
1649 range_to_str(min, max)
1650 )?;
1651 }
1652 for (seq, file, meta) in merged_new_sst_files {
1653 let min = meta.min_hash;
1654 let max = meta.max_hash;
1655 writeln!(
1656 log,
1657 "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1658 ({})",
1659 range_to_str(min, max),
1660 meta.flags
1661 )?;
1662
1663 meta_file_builder.add(seq, meta);
1664 new_sst_files.push((seq, file));
1665 }
1666 blob_seq_numbers_to_delete
1667 .extend(merged_blob_seq_numbers_to_delete);
1668 keys_written += merged_keys_written;
1669 }
1670 PartialMergeResult::Move { seq, meta } => {
1671 let min = meta.min_hash;
1672 let max = meta.max_hash;
1673 writeln!(
1674 log,
1675 "{family:3} | {meta_seq:08} | {seq:08} MOVED | {}",
1676 range_to_str(min, max)
1677 )?;
1678
1679 meta_file_builder.add(seq, meta);
1680 }
1681 }
1682 }
1683 drop(log);
1684 drop(guard);
1685
1686 anyhow::Ok(())
1687 })?;
1688
1689 for &seq in sst_seq_numbers_to_delete.iter() {
1690 meta_file_builder.add_obsolete_sst_file(seq);
1691 }
1692
1693 let meta_file = {
1694 let _span = tracing::trace_span!("write meta file").entered();
1695 self.parallel_scheduler
1696 .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1697 };
1698
1699 Ok(PartialResultPerFamily {
1700 new_meta_file: Some((meta_seq, meta_file)),
1701 new_sst_files,
1702 sst_seq_numbers_to_delete,
1703 blob_seq_numbers_to_delete,
1704 keys_written,
1705 })
1706 },
1707 )?;
1708
1709 for PartialResultPerFamily {
1710 new_meta_file: inner_new_meta_file,
1711 new_sst_files: mut inner_new_sst_files,
1712 sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1713 blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1714 keys_written: inner_keys_written,
1715 } in result
1716 {
1717 new_meta_files.extend(inner_new_meta_file);
1718 new_sst_files.append(&mut inner_new_sst_files);
1719 sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1720 blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1721 *keys_written += inner_keys_written;
1722 }
1723
1724 Ok(())
1725 }
1726
1727 pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcBytes>> {
1730 debug_assert!(family < FAMILIES, "Family index out of bounds");
1731 if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1732 panic!(
1734 "only single valued tables can be queried with `get', call `get_multiple` instead"
1735 )
1736 }
1737 let span = tracing::trace_span!(
1738 "database read",
1739 name = self.config.family_configs[family].name,
1740 result_size = tracing::field::Empty
1741 )
1742 .entered();
1743 let results = self.get_impl::<K, false>(family, key, &span)?;
1744 debug_assert!(results.len() <= 1, "get() should return at most one result");
1745 Ok(results.into_iter().next())
1746 }
1747
1748 pub fn get_multiple<K: QueryKey>(
1758 &self,
1759 family: usize,
1760 key: &K,
1761 ) -> Result<SmallVec<[ArcBytes; 1]>> {
1762 debug_assert!(family < FAMILIES, "Family index out of bounds");
1763 if self.config.family_configs[family].kind != FamilyKind::MultiValue {
1764 panic!("only multi-valued tables can be queried with `get_multiple`")
1766 }
1767 let span = tracing::trace_span!(
1768 "database read multiple",
1769 name = self.config.family_configs[family].name,
1770 result_count = tracing::field::Empty,
1771 result_size = tracing::field::Empty
1772 )
1773 .entered();
1774 let results = self.get_impl::<K, true>(family, key, &span)?;
1775 Ok(results)
1776 }
1777
1778 fn get_impl<K: QueryKey, const FIND_ALL: bool>(
1783 &self,
1784 family: usize,
1785 key: &K,
1786 span: &EnteredSpan,
1787 ) -> Result<SmallVec<[ArcBytes; 1]>> {
1788 let hash = hash_key(key);
1789 let inner = self.inner.read();
1790 let mut output: SmallVec<[ArcBytes; 1]> = SmallVec::new();
1791 #[cfg(feature = "stats")]
1794 let mut found_in_sst = false;
1795
1796 let mut size = 0;
1797
1798 for meta in inner.meta_files.iter().rev() {
1799 match meta.lookup::<K, FIND_ALL>(
1800 family as u32,
1801 hash,
1802 key,
1803 self.key_block_cache(),
1804 self.value_block_cache(),
1805 )? {
1806 MetaLookupResult::FamilyMiss => {
1807 #[cfg(feature = "stats")]
1808 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1809 }
1810 MetaLookupResult::RangeMiss => {
1811 #[cfg(feature = "stats")]
1812 self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1813 }
1814 MetaLookupResult::QuickFilterMiss => {
1815 #[cfg(feature = "stats")]
1816 self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1817 }
1818 MetaLookupResult::SstLookup(result) => match result {
1819 SstLookupResult::Found(values) => {
1820 #[cfg(feature = "stats")]
1821 {
1822 found_in_sst = true;
1823 }
1824 inner.accessed_key_hashes[family].insert(hash);
1825 for value in values {
1828 match value {
1829 LookupValue::Deleted => {
1830 #[cfg(feature = "stats")]
1831 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1832 if !FIND_ALL {
1833 span.record("result_size", "deleted");
1834 return Ok(SmallVec::new());
1835 }
1836 if output.is_empty() {
1840 span.record("result_size", "deleted");
1841 } else {
1842 span.record("result_size", size);
1843 }
1844 return Ok(output);
1845 }
1846 LookupValue::Slice { value } => {
1847 #[cfg(feature = "stats")]
1848 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1849 if !FIND_ALL {
1850 span.record("result_size", value.len());
1851 return Ok(SmallVec::from_buf([value]));
1852 }
1853 size += value.len();
1854 output.push(value);
1855 }
1856 LookupValue::Blob { sequence_number } => {
1857 #[cfg(feature = "stats")]
1858 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1859 let blob = self.read_blob(sequence_number)?;
1860 if !FIND_ALL {
1861 span.record("result_size", blob.len());
1862 return Ok(SmallVec::from_buf([blob]));
1863 }
1864 size += blob.len();
1865 output.push(blob);
1866 }
1867 }
1868 }
1869 }
1870 SstLookupResult::NotFound => {
1871 #[cfg(feature = "stats")]
1872 self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1873 }
1874 },
1875 }
1876 }
1877
1878 #[cfg(feature = "stats")]
1879 if !found_in_sst {
1880 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1881 }
1882
1883 if FIND_ALL {
1884 span.record("result_count", output.len());
1885 }
1886 if output.is_empty() {
1887 span.record("result_size", "not_found");
1888 } else {
1889 span.record("result_size", size);
1890 }
1891 Ok(output)
1892 }
1893
1894 pub fn batch_get<K: QueryKey>(
1895 &self,
1896 family: usize,
1897 keys: &[K],
1898 ) -> Result<Vec<Option<ArcBytes>>> {
1899 debug_assert!(family < FAMILIES, "Family index out of bounds");
1900 if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1901 panic!("only single valued tables can be queried with `batch_get'")
1903 }
1904 let span = tracing::trace_span!(
1905 "database batch read",
1906 name = self.config.family_configs[family].name,
1907 keys = keys.len(),
1908 not_found = tracing::field::Empty,
1909 deleted = tracing::field::Empty,
1910 result_size = tracing::field::Empty
1911 )
1912 .entered();
1913 let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
1914 let mut empty_cells = keys.len();
1915 for (index, key) in keys.iter().enumerate() {
1916 let hash = hash_key(key);
1917 cells.push((hash, index, None));
1918 }
1919 cells.sort_by_key(|(hash, _, _)| *hash);
1920 let inner = self.inner.read();
1921 for meta in inner.meta_files.iter().rev() {
1922 let _result = meta.batch_lookup(
1923 family as u32,
1924 keys,
1925 &mut cells,
1926 &mut empty_cells,
1927 self.key_block_cache(),
1928 self.value_block_cache(),
1929 )?;
1930
1931 #[cfg(feature = "stats")]
1932 {
1933 let crate::meta_file::MetaBatchLookupResult {
1934 family_miss,
1935 range_misses,
1936 quick_filter_misses,
1937 sst_misses,
1938 hits: _,
1939 } = _result;
1940 if family_miss {
1941 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1942 }
1943 if range_misses > 0 {
1944 self.stats
1945 .miss_range
1946 .fetch_add(range_misses as u64, Ordering::Relaxed);
1947 }
1948 if quick_filter_misses > 0 {
1949 self.stats
1950 .miss_amqf
1951 .fetch_add(quick_filter_misses as u64, Ordering::Relaxed);
1952 }
1953 if sst_misses > 0 {
1954 self.stats
1955 .miss_key
1956 .fetch_add(sst_misses as u64, Ordering::Relaxed);
1957 }
1958 }
1959
1960 if empty_cells == 0 {
1961 break;
1962 }
1963 }
1964 let mut deleted = 0;
1965 let mut not_found = 0;
1966 let mut result_size = 0;
1967 let mut results = vec![None; keys.len()];
1968 for (hash, index, result) in cells {
1969 if let Some(result) = result {
1970 inner.accessed_key_hashes[family].insert(hash);
1971 let result = match result {
1972 LookupValue::Deleted => {
1973 #[cfg(feature = "stats")]
1974 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1975 deleted += 1;
1976 None
1977 }
1978 LookupValue::Slice { value } => {
1979 #[cfg(feature = "stats")]
1980 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1981 result_size += value.len();
1982 Some(value)
1983 }
1984 LookupValue::Blob { sequence_number } => {
1985 #[cfg(feature = "stats")]
1986 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1987 let blob = self.read_blob(sequence_number)?;
1988 result_size += blob.len();
1989 Some(blob)
1990 }
1991 };
1992 results[index] = result;
1993 } else {
1994 #[cfg(feature = "stats")]
1995 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1996 not_found += 1;
1997 }
1998 }
1999 span.record("not_found", not_found);
2000 span.record("deleted", deleted);
2001 span.record("result_size", result_size);
2002 Ok(results)
2003 }
2004
2005 #[cfg(feature = "stats")]
2007 pub fn statistics(&self) -> Statistics {
2008 let inner = self.inner.read();
2009 Statistics {
2010 meta_files: inner.meta_files.len(),
2011 sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
2012 key_block_cache: CacheStatistics::new(self.key_block_cache()),
2013 value_block_cache: CacheStatistics::new(self.value_block_cache()),
2014 hits: self.stats.hits_deleted.load(Ordering::Relaxed)
2015 + self.stats.hits_small.load(Ordering::Relaxed)
2016 + self.stats.hits_blob.load(Ordering::Relaxed),
2017 misses: self.stats.miss_global.load(Ordering::Relaxed),
2018 miss_family: self.stats.miss_family.load(Ordering::Relaxed),
2019 miss_range: self.stats.miss_range.load(Ordering::Relaxed),
2020 miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
2021 miss_key: self.stats.miss_key.load(Ordering::Relaxed),
2022 }
2023 }
2024
2025 pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
2026 Ok(self
2027 .inner
2028 .read()
2029 .meta_files
2030 .iter()
2031 .rev()
2032 .map(|meta_file| {
2033 let entries = meta_file
2034 .entries()
2035 .iter()
2036 .map(|entry| {
2037 let amqf = entry.raw_amqf(meta_file.amqf_data());
2038 MetaFileEntryInfo {
2039 sequence_number: entry.sequence_number(),
2040 min_hash: entry.min_hash(),
2041 max_hash: entry.max_hash(),
2042 sst_size: entry.size(),
2043 flags: entry.flags(),
2044 amqf_size: entry.amqf_size(),
2045 amqf_entries: amqf.len(),
2046 block_count: entry.block_count(),
2047 }
2048 })
2049 .collect();
2050 MetaFileInfo {
2051 sequence_number: meta_file.sequence_number(),
2052 family: meta_file.family(),
2053 obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
2054 entries,
2055 }
2056 })
2057 .collect())
2058 }
2059
2060 pub fn shutdown(&self) -> Result<()> {
2063 #[cfg(feature = "print_stats")]
2064 println!("{:#?}", self.statistics());
2065 self.retry_deferred_deletions();
2066 Ok(())
2067 }
2068
2069 fn try_delete_files<'a>(
2072 dir: &'a Path,
2073 seqs: &'a [u32],
2074 ext: &'a str,
2075 ) -> impl Iterator<Item = u32> + 'a {
2076 seqs.iter()
2077 .copied()
2078 .filter(move |&seq| fs::remove_file(dir.join(format!("{seq:08}.{ext}"))).is_err())
2079 }
2080
2081 fn retry_deferred_deletions(&self) {
2086 let mut deferred = self.deferred_deletions.lock();
2087 deferred.retain(|entry| {
2088 let (seq, ext) = match *entry {
2089 DeferredDeletion::Sst(seq) => (seq, "sst"),
2090 DeferredDeletion::Meta(seq) => (seq, "meta"),
2091 DeferredDeletion::Blob(seq) => (seq, "blob"),
2092 };
2093 fs::remove_file(self.path.join(format!("{seq:08}.{ext}"))).is_err()
2095 });
2096 }
2097}
2098
2099fn range_to_str(min: u64, max: u64) -> String {
2100 use std::fmt::Write;
2101 const DISPLAY_SIZE: usize = 100;
2102 const TOTAL_SIZE: u64 = u64::MAX;
2103 let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2104 let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2105 let mut range_str = String::new();
2106 for i in 0..DISPLAY_SIZE {
2107 if i == start_pos && i == end_pos {
2108 range_str.push('O');
2109 } else if i == start_pos {
2110 range_str.push('[');
2111 } else if i == end_pos {
2112 range_str.push(']');
2113 } else if i > start_pos && i < end_pos {
2114 range_str.push('=');
2115 } else {
2116 range_str.push(' ');
2117 }
2118 }
2119 write!(range_str, " | {min:016x}-{max:016x}").unwrap();
2120 range_str
2121}
2122
2123pub struct MetaFileInfo {
2124 pub sequence_number: u32,
2125 pub family: u32,
2126 pub obsolete_sst_files: Vec<u32>,
2127 pub entries: Vec<MetaFileEntryInfo>,
2128}
2129
2130pub struct MetaFileEntryInfo {
2131 pub sequence_number: u32,
2132 pub min_hash: u64,
2133 pub max_hash: u64,
2134 pub amqf_size: u32,
2135 pub amqf_entries: usize,
2136 pub sst_size: u64,
2137 pub flags: MetaEntryFlags,
2138 pub block_count: u16,
2139}