1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{self, File, OpenOptions, ReadDir},
5 io::{BufWriter, Write},
6 mem::take,
7 ops::RangeInclusive,
8 path::{Path, PathBuf},
9 sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use dashmap::DashSet;
15use jiff::Timestamp;
16use memmap2::Mmap;
17use nohash_hasher::BuildNoHashHasher;
18use parking_lot::{Mutex, RwLock};
19use smallvec::SmallVec;
20use tracing::span::EnteredSpan;
21
22pub use crate::compaction::selector::CompactConfig;
23use crate::{
24 DbConfig, FamilyKind, QueryKey,
25 arc_bytes::ArcBytes,
26 compaction::selector::{Compactable, get_merge_segments},
27 compression::{checksum_block, decompress_into_arc},
28 constants::{
29 DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE,
30 MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE,
31 },
32 key::{StoreKey, hash_key},
33 lookup_entry::{IterValue, LookupEntry, LookupValue},
34 merge_iter::MergeIter,
35 meta_file::{MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
36 meta_file_builder::MetaFileBuilder,
37 mmap_helper::advise_mmap_for_persistence,
38 parallel_scheduler::ParallelScheduler,
39 rc_bytes::RcBytes,
40 sst_filter::SstFilter,
41 static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFileIter},
42 static_sorted_file_builder::{StaticSortedFileBuilderMeta, StreamingSstWriter},
43 write_batch::{FinishResult, WriteBatch},
44};
45
46#[cfg(feature = "stats")]
47#[derive(Debug)]
48pub struct CacheStatistics {
49 pub hit_rate: f32,
50 pub fill: f32,
51 pub items: usize,
52 pub size: u64,
53 pub hits: u64,
54 pub misses: u64,
55}
56
57#[cfg(feature = "stats")]
58impl CacheStatistics {
59 fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
60 where
61 Key: Eq + std::hash::Hash,
62 Val: Clone,
63 We: quick_cache::Weighter<Key, Val> + Clone,
64 B: std::hash::BuildHasher + Clone,
65 L: quick_cache::Lifecycle<Key, Val> + Clone,
66 {
67 let size = cache.weight();
68 let hits = cache.hits();
69 let misses = cache.misses();
70 Self {
71 hit_rate: hits as f32 / (hits + misses) as f32,
72 fill: size as f32 / cache.capacity() as f32,
73 items: cache.len(),
74 size,
75 hits,
76 misses,
77 }
78 }
79}
80
81#[cfg(feature = "stats")]
82#[derive(Debug)]
83pub struct Statistics {
84 pub meta_files: usize,
85 pub sst_files: usize,
86 pub key_block_cache: CacheStatistics,
87 pub value_block_cache: CacheStatistics,
88 pub hits: u64,
89 pub misses: u64,
90 pub miss_family: u64,
91 pub miss_range: u64,
92 pub miss_amqf: u64,
93 pub miss_key: u64,
94}
95
96#[cfg(feature = "stats")]
97#[derive(Default)]
98struct TrackedStats {
99 hits_deleted: std::sync::atomic::AtomicU64,
100 hits_small: std::sync::atomic::AtomicU64,
101 hits_blob: std::sync::atomic::AtomicU64,
102 miss_family: std::sync::atomic::AtomicU64,
103 miss_range: std::sync::atomic::AtomicU64,
104 miss_amqf: std::sync::atomic::AtomicU64,
105 miss_key: std::sync::atomic::AtomicU64,
106 miss_global: std::sync::atomic::AtomicU64,
107}
108
109enum ActiveWriteState {
111 Active(&'static str),
114 Error,
117}
118
119enum DeferredDeletion {
125 Sst(u32),
126 Meta(u32),
127 Blob(u32),
128}
129
130pub(crate) struct WriteOperationGuard<'a> {
137 active: &'a Mutex<Option<ActiveWriteState>>,
139 path: &'a Path,
141 seq_before: u32,
145 succeeded: bool,
147}
148
149impl WriteOperationGuard<'_> {
150 pub(crate) fn success(&mut self) {
154 self.succeeded = true;
155 }
156}
157
158fn delete_orphan_files(path: &Path, seq_before: u32) -> Result<()> {
163 let mut current_file = OpenOptions::new()
167 .write(true)
168 .truncate(false)
169 .read(false)
170 .open(path.join("CURRENT"))?;
171 current_file.write_u32::<BE>(seq_before)?;
172 current_file.sync_all()?;
173
174 for entry in fs::read_dir(path)? {
175 let entry = entry?;
176 let path = entry.path();
177 if let Some(ext) = path.extension().and_then(|s| s.to_str())
178 && let Some(seq) = path
179 .file_stem()
180 .and_then(|s| s.to_str())
181 .and_then(|s| s.parse::<u32>().ok())
182 && seq > seq_before
183 {
184 match ext {
185 "sst" | "meta" | "blob" | "del" => fs::remove_file(&path)?,
186 _ => {}
187 }
188 }
189 }
190 Ok(())
191}
192
193impl Drop for WriteOperationGuard<'_> {
194 fn drop(&mut self) {
195 if self.succeeded {
196 *self.active.lock() = None;
198 return;
199 }
200
201 match delete_orphan_files(self.path, self.seq_before) {
204 Ok(()) => *self.active.lock() = None,
205 Err(_) => *self.active.lock() = Some(ActiveWriteState::Error),
206 }
207 }
208}
209
210pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
213 parallel_scheduler: S,
214 path: PathBuf,
216 read_only: bool,
219 inner: RwLock<Inner<FAMILIES>>,
221 is_empty: AtomicBool,
224 active_write_operation: Mutex<Option<ActiveWriteState>>,
227 deferred_deletions: Mutex<Vec<DeferredDeletion>>,
231 key_block_cache: BlockCache,
233 value_block_cache: BlockCache,
235 config: DbConfig<FAMILIES>,
237 #[cfg(feature = "stats")]
239 stats: TrackedStats,
240}
241
242struct Inner<const FAMILIES: usize> {
244 meta_files: Vec<MetaFile>,
246 current_sequence_number: u32,
248 accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
252}
253
254pub struct CommitOptions {
255 new_meta_files: Vec<(u32, File)>,
256 new_sst_files: Vec<(u32, File)>,
257 new_blob_files: Vec<(u32, File)>,
258 sst_seq_numbers_to_delete: Vec<u32>,
259 blob_seq_numbers_to_delete: Vec<u32>,
260 sequence_number: u32,
261 keys_written: u64,
262}
263
264impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
265 pub fn open(path: PathBuf) -> Result<Self> {
270 Self::open_with_parallel_scheduler(path, Default::default())
271 }
272
273 pub fn open_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
275 Self::open_with_config_and_parallel_scheduler(path, config, Default::default())
276 }
277
278 pub fn open_read_only_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
281 Self::open_read_only_with_parallel_scheduler(path, config, Default::default())
282 }
283}
284
285impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
286 fn new(
287 path: PathBuf,
288 read_only: bool,
289 parallel_scheduler: S,
290 config: DbConfig<FAMILIES>,
291 ) -> Self {
292 Self {
293 parallel_scheduler,
294 path,
295 read_only,
296 inner: RwLock::new(Inner {
297 meta_files: Vec::new(),
298 current_sequence_number: 0,
299 accessed_key_hashes: [(); FAMILIES]
300 .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
301 }),
302 is_empty: AtomicBool::new(true),
303 active_write_operation: Mutex::new(None),
304 deferred_deletions: Mutex::new(Vec::new()),
305 key_block_cache: BlockCache::with(
306 KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
307 KEY_BLOCK_CACHE_SIZE,
308 Default::default(),
309 Default::default(),
310 Default::default(),
311 ),
312 value_block_cache: BlockCache::with(
313 VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
314 VALUE_BLOCK_CACHE_SIZE,
315 Default::default(),
316 Default::default(),
317 Default::default(),
318 ),
319 config,
320 #[cfg(feature = "stats")]
321 stats: TrackedStats::default(),
322 }
323 }
324
325 pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
330 Self::open_with_config_and_parallel_scheduler(path, DbConfig::default(), parallel_scheduler)
331 }
332
333 pub fn open_with_config_and_parallel_scheduler(
335 path: PathBuf,
336 config: DbConfig<FAMILIES>,
337 parallel_scheduler: S,
338 ) -> Result<Self> {
339 let mut db = Self::new(path, false, parallel_scheduler, config);
340 db.open_directory(false)?;
341 Ok(db)
342 }
343
344 fn open_read_only_with_parallel_scheduler(
347 path: PathBuf,
348 config: DbConfig<FAMILIES>,
349 parallel_scheduler: S,
350 ) -> Result<Self> {
351 let mut db = Self::new(path, true, parallel_scheduler, config);
352 db.open_directory(false)?;
353 Ok(db)
354 }
355
356 fn open_directory(&mut self, read_only: bool) -> Result<()> {
358 match fs::read_dir(&self.path) {
359 Ok(entries) => {
360 if !self
361 .load_directory(entries, read_only)
362 .context("Loading persistence directory failed")?
363 {
364 if read_only {
365 bail!("Failed to open database");
366 }
367 self.init_directory()
368 .context("Initializing persistence directory failed")?;
369 }
370 Ok(())
371 }
372 Err(e) => {
373 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
374 self.create_and_init_directory()
375 .context("Creating and initializing persistence directory failed")?;
376 Ok(())
377 } else {
378 Err(e).context("Failed to open database")
379 }
380 }
381 }
382 }
383
384 fn create_and_init_directory(&mut self) -> Result<()> {
386 fs::create_dir_all(&self.path)?;
387 self.init_directory()
388 }
389
390 fn init_directory(&mut self) -> Result<()> {
392 let mut current = File::create(self.path.join("CURRENT"))?;
393 current.write_u32::<BE>(0)?;
394 current.flush()?;
395 Ok(())
396 }
397
398 fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
400 let mut meta_files = Vec::new();
401 let mut current_file = match File::open(self.path.join("CURRENT")) {
402 Ok(file) => file,
403 Err(e) => {
404 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
405 return Ok(false);
406 } else {
407 return Err(e).context("Failed to open CURRENT file");
408 }
409 }
410 };
411 let current = current_file.read_u32::<BE>()?;
412 drop(current_file);
413
414 let mut deleted_files = HashSet::new();
415 for entry in entries {
416 let entry = entry?;
417 let path = entry.path();
418 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
419 let seq: u32 = path
420 .file_stem()
421 .context("File has no file stem")?
422 .to_str()
423 .context("File stem is not valid utf-8")?
424 .parse()?;
425 if deleted_files.contains(&seq) {
426 continue;
427 }
428 if seq > current {
429 if !read_only {
430 fs::remove_file(&path)?;
431 }
432 } else {
433 match ext {
434 "meta" => {
435 meta_files.push(seq);
436 }
437 "del" => {
438 let mut content = &*fs::read(&path)?;
439 let mut no_existing_files = true;
440 while !content.is_empty() {
441 let seq = content.read_u32::<BE>()?;
442 deleted_files.insert(seq);
443 if !read_only {
444 let sst_file = self.path.join(format!("{seq:08}.sst"));
446 let meta_file = self.path.join(format!("{seq:08}.meta"));
447 let blob_file = self.path.join(format!("{seq:08}.blob"));
448 for path in [sst_file, meta_file, blob_file] {
449 if fs::exists(&path)? {
450 fs::remove_file(path)?;
451 no_existing_files = false;
452 }
453 }
454 }
455 }
456 if !read_only && no_existing_files {
457 fs::remove_file(&path)?;
458 }
459 }
460 "blob" | "sst" => {
461 }
463 _ => {
464 if !path
465 .file_name()
466 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
467 {
468 bail!("Unexpected file in persistence directory: {:?}", path);
469 }
470 }
471 }
472 }
473 } else {
474 match path.file_stem().and_then(|s| s.to_str()) {
475 Some("CURRENT") => {
476 }
478 Some("LOG") => {
479 }
481 _ => {
482 if !path
483 .file_name()
484 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
485 {
486 bail!("Unexpected file in persistence directory: {:?}", path);
487 }
488 }
489 }
490 }
491 }
492
493 meta_files.retain(|seq| !deleted_files.contains(seq));
494 meta_files.sort_unstable();
495 let mut meta_files = self
496 .parallel_scheduler
497 .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
498 let meta_file = MetaFile::open(&self.path, seq)?;
499 Ok(meta_file)
500 })?;
501
502 let mut sst_filter = SstFilter::new();
503 for meta_file in meta_files.iter_mut().rev() {
504 sst_filter.apply_filter(meta_file);
505 }
506
507 let inner = self.inner.get_mut();
508 self.is_empty
509 .store(meta_files.is_empty(), Ordering::Relaxed);
510 inner.meta_files = meta_files;
511 inner.current_sequence_number = current;
512 Ok(true)
513 }
514
515 #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
517 fn read_blob(&self, seq: u32) -> Result<ArcBytes> {
518 let path = self.path.join(format!("{seq:08}.blob"));
519 let file = File::open(&path)
520 .with_context(|| format!("Failed to open blob file {}", path.display()))?;
521 let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
522 format!(
523 "Failed to mmap blob file {} ({} bytes)",
524 path.display(),
525 file.metadata().map(|m| m.len()).unwrap_or(0)
526 )
527 })?;
528 #[cfg(unix)]
529 mmap.advise(memmap2::Advice::Sequential)?;
530 #[cfg(unix)]
531 mmap.advise(memmap2::Advice::WillNeed)?;
532 advise_mmap_for_persistence(&mmap)?;
533 let mut reader = &mmap[..];
534 let uncompressed_length = reader
535 .read_u32::<BE>()
536 .context("Failed to read uncompressed length from blob file")?;
537 let expected_checksum = reader.read_u32::<BE>()?;
538
539 let actual_checksum = checksum_block(reader);
541 if actual_checksum != expected_checksum {
542 bail!(
543 "Cache corruption detected: checksum mismatch in blob file {:08}.blob (expected \
544 {:08x}, got {:08x})",
545 seq,
546 expected_checksum,
547 actual_checksum
548 );
549 }
550
551 let buffer = decompress_into_arc(uncompressed_length, reader)?;
552 Ok(ArcBytes::from(buffer))
553 }
554
555 pub fn is_empty(&self) -> bool {
557 self.is_empty.load(Ordering::Relaxed)
558 }
559
560 pub fn has_unrecoverable_write_error(&self) -> bool {
563 matches!(
564 *self.active_write_operation.lock(),
565 Some(ActiveWriteState::Error)
566 )
567 }
568
569 fn acquire_write_operation(&self, name: &'static str) -> Result<WriteOperationGuard<'_>> {
573 if self.read_only {
574 bail!("Cannot perform write operations on a read-only database");
575 }
576 let mut slot = self.active_write_operation.lock();
577 match &*slot {
578 Some(ActiveWriteState::Active(active_name)) => {
579 bail!(
580 "Another {active_name} is already active (only a single write operation is \
581 allowed at a time)"
582 );
583 }
584 Some(ActiveWriteState::Error) => {
585 bail!(
586 "A previous write operation failed with an unrecoverable error; no further \
587 writes are possible"
588 );
589 }
590 None => {}
591 }
592 *slot = Some(ActiveWriteState::Active(name));
593 drop(slot); let seq_before = self.inner.read().current_sequence_number;
595 Ok(WriteOperationGuard {
596 active: &self.active_write_operation,
597 path: &self.path,
598 seq_before,
599 succeeded: false,
600 })
601 }
602
603 pub fn write_batch<K: StoreKey + Send + Sync>(&self) -> Result<WriteBatch<'_, K, S, FAMILIES>> {
608 let guard = self.acquire_write_operation("write batch")?;
609 let current = guard.seq_before;
611 Ok(WriteBatch::new(
612 guard,
613 self.path.clone(),
614 current,
615 self.parallel_scheduler.clone(),
616 self.config.family_configs,
617 ))
618 }
619
620 pub fn clear_cache(&self) {
622 self.key_block_cache.clear();
623 self.value_block_cache.clear();
624 for meta in self.inner.write().meta_files.iter_mut() {
625 meta.clear_cache();
626 }
627 }
628
629 pub fn clear_block_caches(&self) {
631 self.key_block_cache.clear();
632 self.value_block_cache.clear();
633 }
634
635 pub fn prepare_all_sst_caches(&self) {
638 for meta in self.inner.write().meta_files.iter_mut() {
639 meta.prepare_sst_cache();
640 }
641 }
642
643 fn open_log(&self) -> Result<BufWriter<File>> {
644 if self.read_only {
645 unreachable!("Only write operations can open the log file");
646 }
647 let log_path = self.path.join("LOG");
648 let log_file = OpenOptions::new()
649 .create(true)
650 .append(true)
651 .open(log_path)?;
652 Ok(BufWriter::new(log_file))
653 }
654
655 pub fn commit_write_batch<K: StoreKey + Send + Sync>(
658 &self,
659 mut write_batch: WriteBatch<'_, K, S, FAMILIES>,
660 ) -> Result<()> {
661 if self.read_only {
662 unreachable!("It's not possible to create a write batch for a read-only database");
663 }
664 let FinishResult {
665 sequence_number,
666 new_meta_files,
667 new_sst_files,
668 new_blob_files,
669 keys_written,
670 } = write_batch.finish(|family| {
671 let inner = self.inner.read();
672 let set = &inner.accessed_key_hashes[family as usize];
673 let initial_capacity = set.len() * 20 / 19;
676 let mut amqf =
682 qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
683 .unwrap();
684 set.retain(|hash| {
687 amqf.insert_fingerprint(false, *hash)
691 .expect("Failed to insert fingerprint");
692 false
693 });
694 amqf
695 })?;
696 self.commit(CommitOptions {
697 new_meta_files,
698 new_sst_files,
699 new_blob_files,
700 sst_seq_numbers_to_delete: vec![],
701 blob_seq_numbers_to_delete: vec![],
702 sequence_number,
703 keys_written,
704 })?;
705 write_batch.mark_succeeded();
707 Ok(())
708 }
709
710 fn commit(
713 &self,
714 CommitOptions {
715 mut new_meta_files,
716 new_sst_files,
717 new_blob_files,
718 mut sst_seq_numbers_to_delete,
719 mut blob_seq_numbers_to_delete,
720 sequence_number: mut seq,
721 keys_written,
722 }: CommitOptions,
723 ) -> Result<(), anyhow::Error> {
724 let time = Timestamp::now();
725
726 new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
727
728 let sync_span = tracing::trace_span!("sync new files").entered();
729
730 enum SyncItem {
731 Meta(u32, File),
732 Sst(File),
733 Blob(u32, File),
734 }
735 enum SyncResult {
736 Meta(MetaFile),
737 Sst,
738 Blob(u32, File),
739 }
740
741 let mut sync_items: Vec<SyncItem> =
742 Vec::with_capacity(new_meta_files.len() + new_sst_files.len() + new_blob_files.len());
743 for (seq, file) in new_meta_files {
744 sync_items.push(SyncItem::Meta(seq, file));
745 }
746 for (_, file) in new_sst_files {
747 sync_items.push(SyncItem::Sst(file));
748 }
749 for (seq, file) in new_blob_files {
750 sync_items.push(SyncItem::Blob(seq, file));
751 }
752
753 let results: Vec<SyncResult> = self
754 .parallel_scheduler
755 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(sync_items, |item| match item {
756 SyncItem::Meta(seq, file) => {
757 file.sync_data()?;
758 let meta_file = MetaFile::open(&self.path, seq)?;
759 Ok(SyncResult::Meta(meta_file))
760 }
761 SyncItem::Sst(file) => {
762 file.sync_data()?;
763 Ok(SyncResult::Sst)
764 }
765 SyncItem::Blob(seq, file) => {
766 file.sync_data()?;
767 Ok(SyncResult::Blob(seq, file))
768 }
769 })?;
770
771 let mut new_meta_files: Vec<MetaFile> = Vec::new();
772 let mut new_blob_files: Vec<(u32, File)> = Vec::new();
773 for result in results {
774 match result {
775 SyncResult::Meta(mf) => new_meta_files.push(mf),
776 SyncResult::Sst => {}
777 SyncResult::Blob(seq, file) => new_blob_files.push((seq, file)),
778 }
779 }
780
781 let mut sst_filter = SstFilter::new();
782 for meta_file in new_meta_files.iter_mut().rev() {
783 sst_filter.apply_filter(meta_file);
784 }
785
786 File::open(&self.path)?.sync_data()?;
790 drop(sync_span);
791
792 let new_meta_info = new_meta_files
793 .iter()
794 .map(|meta| {
795 let ssts = meta
796 .entries()
797 .iter()
798 .map(|entry| {
799 let seq = entry.sequence_number();
800 let range = entry.range();
801 let size = entry.size();
802 let flags = entry.flags();
803 (seq, range.min_hash, range.max_hash, size, flags)
804 })
805 .collect::<Vec<_>>();
806 (
807 meta.sequence_number(),
808 meta.family(),
809 ssts,
810 meta.obsolete_sst_files().to_vec(),
811 )
812 })
813 .collect::<Vec<_>>();
814
815 let has_delete_file;
824 let mut meta_seq_numbers_to_delete = Vec::new();
825 let entries_to_remove;
826
827 {
828 let inner = self.inner.read();
829
830 entries_to_remove = inner
836 .meta_files
837 .iter()
838 .rev()
839 .map(|meta_file| sst_filter.apply_filter_collect(meta_file))
840 .collect::<Vec<_>>();
841
842 for meta_file in new_meta_files.iter().rev() {
849 let should_remove = sst_filter.apply_and_get_remove(meta_file);
850 debug_assert!(
851 !should_remove,
852 "newly created meta file should never be a candidate for removal"
853 );
854 }
855 for i in (0..inner.meta_files.len()).rev() {
856 if sst_filter.apply_and_get_remove(&inner.meta_files[i]) {
857 meta_seq_numbers_to_delete.push(inner.meta_files[i].sequence_number());
858 }
859 }
860
861 has_delete_file = !sst_seq_numbers_to_delete.is_empty()
865 || !blob_seq_numbers_to_delete.is_empty()
866 || !meta_seq_numbers_to_delete.is_empty();
867 }
868 if has_delete_file {
869 seq += 1;
870 }
871
872 self.parallel_scheduler.block_in_place(|| {
873 if has_delete_file {
874 sst_seq_numbers_to_delete.sort_unstable();
875 meta_seq_numbers_to_delete.sort_unstable();
876 blob_seq_numbers_to_delete.sort_unstable();
877 let mut buf = Vec::with_capacity(
879 (sst_seq_numbers_to_delete.len()
880 + meta_seq_numbers_to_delete.len()
881 + blob_seq_numbers_to_delete.len())
882 * size_of::<u32>(),
883 );
884 for seq in sst_seq_numbers_to_delete.iter() {
885 buf.write_u32::<BE>(*seq)?;
886 }
887 for seq in meta_seq_numbers_to_delete.iter() {
888 buf.write_u32::<BE>(*seq)?;
889 }
890 for seq in blob_seq_numbers_to_delete.iter() {
891 buf.write_u32::<BE>(*seq)?;
892 }
893 let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
894 file.write_all(&buf)?;
895 file.sync_data()?;
896 }
897
898 let mut current_file = OpenOptions::new()
899 .write(true)
900 .truncate(false)
901 .read(false)
902 .open(self.path.join("CURRENT"))?;
903 current_file.write_u32::<BE>(seq)?;
904 current_file.sync_data()?;
905
906 if let Err(e) = (|| {
924 let mut log = self.open_log()?;
925 writeln!(log, "Time {time}")?;
926 let span = time.until(Timestamp::now())?;
927 writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
928 writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?;
929 for (meta_seq, family, ssts, obsolete) in new_meta_info {
930 for (seq, min, max, size, flags) in ssts {
931 writeln!(
932 log,
933 "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB, {})",
934 range_to_str(min, max),
935 size / 1024 / 1024,
936 flags
937 )?;
938 }
939 for obsolete in obsolete.chunks(15) {
940 write!(log, "{family:3} | {meta_seq:08} |")?;
941 for seq in obsolete {
942 write!(log, " {seq:08}")?;
943 }
944 writeln!(log, " OBSOLETE SST")?;
945 }
946 }
947
948 fn write_seq_numbers<W: std::io::Write, T>(
949 log: &mut W,
950 items: &[T],
951 label: &str,
952 extract_seq: fn(&T) -> u32,
953 ) -> std::io::Result<()> {
954 for chunk in items.chunks(15) {
955 write!(log, " | |")?;
956 for item in chunk {
957 write!(log, " {:08}", extract_seq(item))?;
958 }
959 writeln!(log, " {}", label)?;
960 }
961 Ok(())
962 }
963
964 new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
965 write_seq_numbers(&mut log, &new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
966 write_seq_numbers(
967 &mut log,
968 &blob_seq_numbers_to_delete,
969 "BLOB DELETED",
970 |&seq| seq,
971 )?;
972 write_seq_numbers(
973 &mut log,
974 &sst_seq_numbers_to_delete,
975 "SST DELETED",
976 |&seq| seq,
977 )?;
978 write_seq_numbers(
979 &mut log,
980 &meta_seq_numbers_to_delete,
981 "META DELETED",
982 |&seq| seq,
983 )?;
984 anyhow::Ok(())
985 })() {
986 eprintln!("turbo-persistence: failed to write LOG after commit {seq:08}: {e:#}");
987 }
988
989 anyhow::Ok(())
990 })?;
991
992 {
998 let mut inner = self.inner.write();
999
1000 for (meta_file, to_remove) in inner
1006 .meta_files
1007 .iter_mut()
1008 .zip(entries_to_remove.into_iter().rev())
1009 {
1010 if !to_remove.is_empty() {
1011 meta_file.retain_entries(|seq| !to_remove.contains(&seq));
1012 }
1013 }
1014
1015 inner.meta_files.append(&mut new_meta_files);
1016 if !meta_seq_numbers_to_delete.is_empty() {
1017 let to_delete: HashSet<u32> = meta_seq_numbers_to_delete.iter().copied().collect();
1018 inner
1019 .meta_files
1020 .retain(|meta| !to_delete.contains(&meta.sequence_number()));
1021 }
1022 inner.current_sequence_number = seq;
1023 self.is_empty
1024 .store(inner.meta_files.is_empty(), Ordering::Relaxed);
1025 }
1026
1027 self.deferred_deletions.lock().extend(
1032 Self::try_delete_files(&self.path, &sst_seq_numbers_to_delete, "sst")
1033 .map(DeferredDeletion::Sst)
1034 .chain(
1035 Self::try_delete_files(&self.path, &meta_seq_numbers_to_delete, "meta")
1036 .map(DeferredDeletion::Meta),
1037 )
1038 .chain(
1039 Self::try_delete_files(&self.path, &blob_seq_numbers_to_delete, "blob")
1040 .map(DeferredDeletion::Blob),
1041 ),
1042 );
1043
1044 self.retry_deferred_deletions();
1046
1047 #[cfg(feature = "verbose_log")]
1049 {
1050 let _: Result<(), _> = (|| -> anyhow::Result<()> {
1051 let mut log = self.open_log()?;
1052 writeln!(log, "New database state:")?;
1053 writeln!(log, "FAM | META SEQ | SST SEQ FLAGS | RANGE")?;
1054 let inner = self.inner.read();
1055 let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
1056 let mut set = HashSet::new();
1057 move |family| set.insert(*family)
1058 });
1059 for family in families {
1060 for meta in inner.meta_files.iter() {
1061 if meta.family() != family {
1062 continue;
1063 }
1064 let meta_seq = meta.sequence_number();
1065 for entry in meta.entries().iter() {
1066 let seq = entry.sequence_number();
1067 let range = entry.range();
1068 writeln!(
1069 log,
1070 "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
1071 entry.flags(),
1072 range_to_str(range.min_hash, range.max_hash)
1073 )?;
1074 }
1075 }
1076 }
1077 Ok(())
1078 })();
1079 }
1080
1081 Ok(())
1082 }
1083
1084 pub fn full_compact(&self) -> Result<()> {
1087 self.compact(&CompactConfig {
1088 min_merge_count: 2,
1089 optimal_merge_count: usize::MAX,
1090 max_merge_count: usize::MAX,
1091 max_merge_bytes: u64::MAX,
1092 min_merge_duplication_bytes: 0,
1093 optimal_merge_duplication_bytes: u64::MAX,
1094 max_merge_segment_count: usize::MAX,
1095 })?;
1096 Ok(())
1097 }
1098
1099 pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
1104 let mut guard = self.acquire_write_operation("compaction")?;
1105
1106 self.clear_cache();
1111
1112 let mut sequence_number;
1113 let mut new_meta_files = Vec::new();
1114 let mut new_sst_files = Vec::new();
1115 let mut sst_seq_numbers_to_delete = Vec::new();
1116 let mut blob_seq_numbers_to_delete = Vec::new();
1117 let mut keys_written = 0;
1118
1119 {
1120 let inner = self.inner.read();
1121 sequence_number = AtomicU32::new(inner.current_sequence_number);
1122 self.compact_internal(
1123 &inner.meta_files,
1124 &sequence_number,
1125 &mut new_meta_files,
1126 &mut new_sst_files,
1127 &mut sst_seq_numbers_to_delete,
1128 &mut blob_seq_numbers_to_delete,
1129 &mut keys_written,
1130 compact_config,
1131 )
1132 .context("Failed to compact database")?;
1133 }
1134
1135 let has_changes = !new_meta_files.is_empty();
1136 if has_changes {
1137 self.commit(CommitOptions {
1138 new_meta_files,
1139 new_sst_files,
1140 new_blob_files: Vec::new(),
1141 sst_seq_numbers_to_delete,
1142 blob_seq_numbers_to_delete,
1143 sequence_number: *sequence_number.get_mut(),
1144 keys_written,
1145 })
1146 .context("Failed to commit the database compaction")?;
1147 }
1148
1149 guard.success();
1150 Ok(has_changes)
1151 }
1152
1153 fn compact_internal(
1155 &self,
1156 meta_files: &[MetaFile],
1157 sequence_number: &AtomicU32,
1158 new_meta_files: &mut Vec<(u32, File)>,
1159 new_sst_files: &mut Vec<(u32, File)>,
1160 sst_seq_numbers_to_delete: &mut Vec<u32>,
1161 blob_seq_numbers_to_delete: &mut Vec<u32>,
1162 keys_written: &mut u64,
1163 compact_config: &CompactConfig,
1164 ) -> Result<()> {
1165 if meta_files.is_empty() {
1166 return Ok(());
1167 }
1168
1169 struct SstWithRange {
1170 meta_index: usize,
1171 index_in_meta: u32,
1172 seq: u32,
1173 range: StaticSortedFileRange,
1174 size: u64,
1175 flags: MetaEntryFlags,
1176 }
1177
1178 impl Compactable for SstWithRange {
1179 fn range(&self) -> RangeInclusive<u64> {
1180 self.range.min_hash..=self.range.max_hash
1181 }
1182
1183 fn size(&self) -> u64 {
1184 self.size
1185 }
1186
1187 fn category(&self) -> u8 {
1188 if self.flags.cold() { 1 } else { 0 }
1191 }
1192 }
1193
1194 let ssts_with_ranges = meta_files
1195 .iter()
1196 .enumerate()
1197 .flat_map(|(meta_index, meta)| {
1198 meta.entries()
1199 .iter()
1200 .enumerate()
1201 .map(move |(index_in_meta, entry)| SstWithRange {
1202 meta_index,
1203 index_in_meta: index_in_meta as u32,
1204 seq: entry.sequence_number(),
1205 range: entry.range(),
1206 size: entry.size(),
1207 flags: entry.flags(),
1208 })
1209 })
1210 .collect::<Vec<_>>();
1211
1212 let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
1213
1214 for sst in ssts_with_ranges {
1215 sst_by_family[sst.range.family as usize].push(sst);
1216 }
1217
1218 let path = &self.path;
1219
1220 let log_mutex = Mutex::new(());
1221
1222 struct PartialResultPerFamily {
1223 new_meta_file: Option<(u32, File)>,
1224 new_sst_files: Vec<(u32, File)>,
1225 sst_seq_numbers_to_delete: Vec<u32>,
1226 blob_seq_numbers_to_delete: Vec<u32>,
1227 keys_written: u64,
1228 }
1229
1230 let mut compact_config = compact_config.clone();
1231 let merge_jobs = sst_by_family
1232 .into_iter()
1233 .enumerate()
1234 .filter_map(|(family, ssts_with_ranges)| {
1235 if compact_config.max_merge_segment_count == 0 {
1236 return None;
1237 }
1238 let (merge_jobs, real_merge_job_size) =
1239 get_merge_segments(&ssts_with_ranges, &compact_config);
1240 compact_config.max_merge_segment_count -= real_merge_job_size;
1241 Some((family, ssts_with_ranges, merge_jobs))
1242 })
1243 .collect::<Vec<_>>();
1244
1245 let result = self
1246 .parallel_scheduler
1247 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
1248 merge_jobs,
1249 |(family, ssts_with_ranges, merge_jobs)| {
1250 let family = family as u32;
1251
1252 if merge_jobs.is_empty() {
1253 return Ok(PartialResultPerFamily {
1254 new_meta_file: None,
1255 new_sst_files: Vec::new(),
1256 sst_seq_numbers_to_delete: Vec::new(),
1257 blob_seq_numbers_to_delete: Vec::new(),
1258 keys_written: 0,
1259 });
1260 }
1261
1262 let used_key_hashes: Option<qfilter::Filter> = {
1267 let filters: Vec<qfilter::FilterRef<'_>> = meta_files
1268 .iter()
1269 .filter(|m| m.family() == family)
1270 .filter_map(|meta_file| {
1271 meta_file.deserialize_used_key_hashes_amqf().transpose()
1272 })
1273 .collect::<Result<Vec<_>>>()?
1274 .into_iter()
1275 .filter(|amqf| !amqf.is_empty())
1276 .collect();
1277 if filters.is_empty() {
1278 None
1279 } else if filters.len() == 1 {
1280 Some(filters[0].to_owned())
1282 } else {
1283 let total_len: u64 = filters.iter().map(|f| f.len()).sum();
1284 let mut merged =
1287 qfilter::Filter::with_fingerprint_size(total_len, u64::BITS as u8)
1288 .expect("Failed to create merged AMQF filter");
1289 for filter in &filters {
1290 merged
1291 .merge(false, filter)
1292 .expect("Failed to merge AMQF filters");
1293 }
1294 merged.shrink_to_fit();
1295 Some(merged)
1296 }
1297 };
1298
1299 let sst_seq_numbers_to_delete = merge_jobs
1301 .iter()
1302 .filter(|l| l.len() > 1)
1303 .flat_map(|l| l.iter().copied())
1304 .map(|index| ssts_with_ranges[index].seq)
1305 .collect::<Vec<_>>();
1306
1307 let span = tracing::trace_span!(
1309 "merge files",
1310 family = self.config.family_configs[family as usize].name
1311 );
1312 enum PartialMergeResult<'l> {
1313 Merged {
1314 new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1315 blob_seq_numbers_to_delete: Vec<u32>,
1316 keys_written: u64,
1317 indices: SmallVec<[usize; 1]>,
1318 },
1319 Move {
1320 seq: u32,
1321 meta: StaticSortedFileBuilderMeta<'l>,
1322 },
1323 }
1324 let merge_result = self
1325 .parallel_scheduler
1326 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
1327 let _span = span.clone().entered();
1328 if indices.len() == 1 {
1329 let index = indices[0];
1331 let meta_index = ssts_with_ranges[index].meta_index;
1332 let index_in_meta = ssts_with_ranges[index].index_in_meta;
1333 let meta_file = &meta_files[meta_index];
1334 let entry = meta_file.entry(index_in_meta);
1335 let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
1336 let meta = StaticSortedFileBuilderMeta {
1337 min_hash: entry.min_hash(),
1338 max_hash: entry.max_hash(),
1339 amqf,
1340 block_count: entry.block_count(),
1341 size: entry.size(),
1342 flags: entry.flags(),
1343 entries: 0,
1344 };
1345 return Ok(PartialMergeResult::Move {
1346 seq: entry.sequence_number(),
1347 meta,
1348 });
1349 }
1350
1351 let iters = indices
1355 .iter()
1356 .map(|&index| {
1357 let meta_index = ssts_with_ranges[index].meta_index;
1358 let index_in_meta = ssts_with_ranges[index].index_in_meta;
1359 let entry = meta_files[meta_index].entry(index_in_meta);
1360 StaticSortedFileIter::open(path, entry.sst_metadata())
1361 })
1362 .collect::<Result<Vec<_>>>()?;
1363
1364 let iter = MergeIter::new(iters.into_iter())?;
1365
1366 let mut blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1367
1368 struct Collector {
1369 writer: Option<(u32, StreamingSstWriter<LookupEntry>)>,
1376 flags: MetaEntryFlags,
1377 new_sst_files:
1378 Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1379 last_hash: Option<u64>,
1382 }
1383 impl Collector {
1384 fn new(flags: MetaEntryFlags) -> Self {
1385 Self {
1386 writer: None,
1387 flags,
1388 new_sst_files: Vec::new(),
1389 last_hash: None,
1390 }
1391 }
1392
1393 fn ensure_writer(
1395 &mut self,
1396 path: &Path,
1397 sequence_number: &AtomicU32,
1398 ) -> Result<&mut StreamingSstWriter<LookupEntry>>
1399 {
1400 if self.writer.is_none() {
1401 let seq =
1402 sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1403 let sst_path = path.join(format!("{seq:08}.sst"));
1404 let writer = StreamingSstWriter::new(
1405 &sst_path,
1406 self.flags,
1407 MAX_ENTRIES_PER_COMPACTED_FILE as u64,
1408 )?;
1409 self.writer = Some((seq, writer));
1410 }
1411 Ok(&mut self.writer.as_mut().unwrap().1)
1412 }
1413
1414 fn close_sst_file(&mut self, keys_written: &mut u64) -> Result<()> {
1418 if let Some((seq, writer)) = self.writer.take() {
1419 let _span =
1420 tracing::trace_span!("close merged sst file").entered();
1421 let (meta, file) = writer.close()?;
1422 *keys_written += meta.entries;
1423 self.new_sst_files.push((seq, file, meta));
1424 }
1425 Ok(())
1426 }
1427
1428 fn add_entry(
1432 &mut self,
1433 entry: LookupEntry,
1434 path: &Path,
1435 sequence_number: &AtomicU32,
1436 keys_written: &mut u64,
1437 ) -> Result<()> {
1438 let key_changed = self.last_hash != Some(entry.hash);
1439 if key_changed
1442 && let Some((_, ref writer)) = self.writer
1443 && writer.is_full(
1444 MAX_ENTRIES_PER_COMPACTED_FILE,
1445 DATA_THRESHOLD_PER_COMPACTED_FILE,
1446 )
1447 {
1448 self.close_sst_file(keys_written)?;
1449 }
1450 self.last_hash = Some(entry.hash);
1451 let writer = self.ensure_writer(path, sequence_number)?;
1452 writer.add(entry)?;
1453 Ok(())
1454 }
1455 }
1456 #[cfg(debug_assertions)]
1457 impl Drop for Collector {
1458 fn drop(&mut self) {
1459 if !std::thread::panicking() {
1460 assert!(
1461 self.writer.is_none(),
1462 "Collector dropped with an open writer"
1463 );
1464 }
1465 }
1466 }
1467 let mut used_collector = Collector::new(MetaEntryFlags::WARM);
1468 let mut unused_collector = Collector::new(MetaEntryFlags::COLD);
1469 let mut current_key: Option<RcBytes> = None;
1470 let mut keys_written = 0;
1471
1472 let mut skip_remaining_for_this_key = false;
1479 let family_config = &self.config.family_configs[family as usize];
1480
1481 for entry in iter {
1482 let entry = entry?;
1483 if current_key.as_ref() != Some(&entry.key) {
1484 skip_remaining_for_this_key = false;
1486 current_key = Some(entry.key.clone());
1487 }
1488 if !skip_remaining_for_this_key {
1489 let is_used = used_key_hashes
1490 .as_ref()
1491 .is_some_and(|amqf| amqf.contains_fingerprint(entry.hash));
1492 let collector = if is_used {
1493 &mut used_collector
1494 } else {
1495 &mut unused_collector
1496 };
1497 match family_config.kind {
1498 FamilyKind::MultiValue => {
1499 if matches!(entry.value, IterValue::Deleted) {
1502 skip_remaining_for_this_key = true;
1503 }
1504 }
1505 FamilyKind::SingleValue => {
1506 skip_remaining_for_this_key = true;
1509 }
1510 }
1511 collector.add_entry(
1512 entry,
1513 path,
1514 sequence_number,
1515 &mut keys_written,
1516 )?;
1517 } else {
1518 if let IterValue::Blob { sequence_number } = &entry.value {
1522 blob_seq_numbers_to_delete.push(*sequence_number);
1523 }
1524 }
1525 }
1526
1527 used_collector.close_sst_file(&mut keys_written)?;
1529 unused_collector.close_sst_file(&mut keys_written)?;
1530
1531 let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1532 new_sst_files.append(&mut used_collector.new_sst_files);
1533 Ok(PartialMergeResult::Merged {
1534 new_sst_files,
1535 blob_seq_numbers_to_delete,
1536 keys_written,
1537 indices,
1538 })
1539 })
1540 .with_context(|| {
1541 format!("Failed to merge database files for family {family}")
1542 })?;
1543
1544 let Some((sst_files_len, blob_delete_len)) = merge_result
1545 .iter()
1546 .map(|r| {
1547 if let PartialMergeResult::Merged {
1548 new_sst_files,
1549 blob_seq_numbers_to_delete,
1550 indices: _,
1551 keys_written: _,
1552 } = r
1553 {
1554 (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1555 } else {
1556 (0, 0)
1557 }
1558 })
1559 .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1560 else {
1561 unreachable!()
1562 };
1563
1564 let mut new_sst_files = Vec::with_capacity(sst_files_len);
1565 let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1566
1567 let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1568 let mut meta_file_builder = MetaFileBuilder::new(family);
1569
1570 let mut keys_written = 0;
1571 self.parallel_scheduler.block_in_place(|| {
1572 let guard = log_mutex.lock();
1573 let mut log = self.open_log()?;
1574 writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1575 for result in merge_result {
1576 match result {
1577 PartialMergeResult::Merged {
1578 new_sst_files: merged_new_sst_files,
1579 blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1580 keys_written: merged_keys_written,
1581 indices,
1582 } => {
1583 writeln!(
1584 log,
1585 "{family:3} | {meta_seq:08} | MERGE \
1586 ({merged_keys_written} keys):"
1587 )?;
1588 for i in indices.iter() {
1589 let seq = ssts_with_ranges[*i].seq;
1590 let (min, max) = ssts_with_ranges[*i].range().into_inner();
1591 writeln!(
1592 log,
1593 "{family:3} | {meta_seq:08} | {seq:08} INPUT | {}",
1594 range_to_str(min, max)
1595 )?;
1596 }
1597 for (seq, file, meta) in merged_new_sst_files {
1598 let min = meta.min_hash;
1599 let max = meta.max_hash;
1600 writeln!(
1601 log,
1602 "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1603 ({})",
1604 range_to_str(min, max),
1605 meta.flags
1606 )?;
1607
1608 meta_file_builder.add(seq, meta);
1609 new_sst_files.push((seq, file));
1610 }
1611 blob_seq_numbers_to_delete
1612 .extend(merged_blob_seq_numbers_to_delete);
1613 keys_written += merged_keys_written;
1614 }
1615 PartialMergeResult::Move { seq, meta } => {
1616 let min = meta.min_hash;
1617 let max = meta.max_hash;
1618 writeln!(
1619 log,
1620 "{family:3} | {meta_seq:08} | {seq:08} MOVED | {}",
1621 range_to_str(min, max)
1622 )?;
1623
1624 meta_file_builder.add(seq, meta);
1625 }
1626 }
1627 }
1628 drop(log);
1629 drop(guard);
1630
1631 anyhow::Ok(())
1632 })?;
1633
1634 for &seq in sst_seq_numbers_to_delete.iter() {
1635 meta_file_builder.add_obsolete_sst_file(seq);
1636 }
1637
1638 let meta_file = {
1639 let _span = tracing::trace_span!("write meta file").entered();
1640 self.parallel_scheduler
1641 .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1642 };
1643
1644 Ok(PartialResultPerFamily {
1645 new_meta_file: Some((meta_seq, meta_file)),
1646 new_sst_files,
1647 sst_seq_numbers_to_delete,
1648 blob_seq_numbers_to_delete,
1649 keys_written,
1650 })
1651 },
1652 )?;
1653
1654 for PartialResultPerFamily {
1655 new_meta_file: inner_new_meta_file,
1656 new_sst_files: mut inner_new_sst_files,
1657 sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1658 blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1659 keys_written: inner_keys_written,
1660 } in result
1661 {
1662 new_meta_files.extend(inner_new_meta_file);
1663 new_sst_files.append(&mut inner_new_sst_files);
1664 sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1665 blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1666 *keys_written += inner_keys_written;
1667 }
1668
1669 Ok(())
1670 }
1671
1672 pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcBytes>> {
1675 debug_assert!(family < FAMILIES, "Family index out of bounds");
1676 if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1677 panic!(
1679 "only single valued tables can be queried with `get', call `get_multiple` instead"
1680 )
1681 }
1682 let span = tracing::trace_span!(
1683 "database read",
1684 name = self.config.family_configs[family].name,
1685 result_size = tracing::field::Empty
1686 )
1687 .entered();
1688 let results = self.get_impl::<K, false>(family, key, &span)?;
1689 debug_assert!(results.len() <= 1, "get() should return at most one result");
1690 Ok(results.into_iter().next())
1691 }
1692
1693 pub fn get_multiple<K: QueryKey>(
1703 &self,
1704 family: usize,
1705 key: &K,
1706 ) -> Result<SmallVec<[ArcBytes; 1]>> {
1707 debug_assert!(family < FAMILIES, "Family index out of bounds");
1708 if self.config.family_configs[family].kind != FamilyKind::MultiValue {
1709 panic!("only multi-valued tables can be queried with `get_multiple`")
1711 }
1712 let span = tracing::trace_span!(
1713 "database read multiple",
1714 name = self.config.family_configs[family].name,
1715 result_count = tracing::field::Empty,
1716 result_size = tracing::field::Empty
1717 )
1718 .entered();
1719 let results = self.get_impl::<K, true>(family, key, &span)?;
1720 Ok(results)
1721 }
1722
1723 fn get_impl<K: QueryKey, const FIND_ALL: bool>(
1728 &self,
1729 family: usize,
1730 key: &K,
1731 span: &EnteredSpan,
1732 ) -> Result<SmallVec<[ArcBytes; 1]>> {
1733 let hash = hash_key(key);
1734 let inner = self.inner.read();
1735 let mut output: SmallVec<[ArcBytes; 1]> = SmallVec::new();
1736 #[cfg(feature = "stats")]
1739 let mut found_in_sst = false;
1740
1741 let mut size = 0;
1742
1743 for meta in inner.meta_files.iter().rev() {
1744 match meta.lookup::<K, FIND_ALL>(
1745 family as u32,
1746 hash,
1747 key,
1748 &self.key_block_cache,
1749 &self.value_block_cache,
1750 )? {
1751 MetaLookupResult::FamilyMiss => {
1752 #[cfg(feature = "stats")]
1753 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1754 }
1755 MetaLookupResult::RangeMiss => {
1756 #[cfg(feature = "stats")]
1757 self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1758 }
1759 MetaLookupResult::QuickFilterMiss => {
1760 #[cfg(feature = "stats")]
1761 self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1762 }
1763 MetaLookupResult::SstLookup(result) => match result {
1764 SstLookupResult::Found(values) => {
1765 #[cfg(feature = "stats")]
1766 {
1767 found_in_sst = true;
1768 }
1769 inner.accessed_key_hashes[family].insert(hash);
1770 for value in values {
1773 match value {
1774 LookupValue::Deleted => {
1775 #[cfg(feature = "stats")]
1776 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1777 if !FIND_ALL {
1778 span.record("result_size", "deleted");
1779 return Ok(SmallVec::new());
1780 }
1781 if output.is_empty() {
1785 span.record("result_size", "deleted");
1786 } else {
1787 span.record("result_size", size);
1788 }
1789 return Ok(output);
1790 }
1791 LookupValue::Slice { value } => {
1792 #[cfg(feature = "stats")]
1793 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1794 if !FIND_ALL {
1795 span.record("result_size", value.len());
1796 return Ok(SmallVec::from_buf([value]));
1797 }
1798 size += value.len();
1799 output.push(value);
1800 }
1801 LookupValue::Blob { sequence_number } => {
1802 #[cfg(feature = "stats")]
1803 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1804 let blob = self.read_blob(sequence_number)?;
1805 if !FIND_ALL {
1806 span.record("result_size", blob.len());
1807 return Ok(SmallVec::from_buf([blob]));
1808 }
1809 size += blob.len();
1810 output.push(blob);
1811 }
1812 }
1813 }
1814 }
1815 SstLookupResult::NotFound => {
1816 #[cfg(feature = "stats")]
1817 self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1818 }
1819 },
1820 }
1821 }
1822
1823 #[cfg(feature = "stats")]
1824 if !found_in_sst {
1825 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1826 }
1827
1828 if FIND_ALL {
1829 span.record("result_count", output.len());
1830 }
1831 if output.is_empty() {
1832 span.record("result_size", "not_found");
1833 } else {
1834 span.record("result_size", size);
1835 }
1836 Ok(output)
1837 }
1838
1839 pub fn batch_get<K: QueryKey>(
1840 &self,
1841 family: usize,
1842 keys: &[K],
1843 ) -> Result<Vec<Option<ArcBytes>>> {
1844 debug_assert!(family < FAMILIES, "Family index out of bounds");
1845 if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1846 panic!("only single valued tables can be queried with `batch_get'")
1848 }
1849 let span = tracing::trace_span!(
1850 "database batch read",
1851 name = self.config.family_configs[family].name,
1852 keys = keys.len(),
1853 not_found = tracing::field::Empty,
1854 deleted = tracing::field::Empty,
1855 result_size = tracing::field::Empty
1856 )
1857 .entered();
1858 let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
1859 let mut empty_cells = keys.len();
1860 for (index, key) in keys.iter().enumerate() {
1861 let hash = hash_key(key);
1862 cells.push((hash, index, None));
1863 }
1864 cells.sort_by_key(|(hash, _, _)| *hash);
1865 let inner = self.inner.read();
1866 for meta in inner.meta_files.iter().rev() {
1867 let _result = meta.batch_lookup(
1868 family as u32,
1869 keys,
1870 &mut cells,
1871 &mut empty_cells,
1872 &self.key_block_cache,
1873 &self.value_block_cache,
1874 )?;
1875
1876 #[cfg(feature = "stats")]
1877 {
1878 let crate::meta_file::MetaBatchLookupResult {
1879 family_miss,
1880 range_misses,
1881 quick_filter_misses,
1882 sst_misses,
1883 hits: _,
1884 } = _result;
1885 if family_miss {
1886 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1887 }
1888 if range_misses > 0 {
1889 self.stats
1890 .miss_range
1891 .fetch_add(range_misses as u64, Ordering::Relaxed);
1892 }
1893 if quick_filter_misses > 0 {
1894 self.stats
1895 .miss_amqf
1896 .fetch_add(quick_filter_misses as u64, Ordering::Relaxed);
1897 }
1898 if sst_misses > 0 {
1899 self.stats
1900 .miss_key
1901 .fetch_add(sst_misses as u64, Ordering::Relaxed);
1902 }
1903 }
1904
1905 if empty_cells == 0 {
1906 break;
1907 }
1908 }
1909 let mut deleted = 0;
1910 let mut not_found = 0;
1911 let mut result_size = 0;
1912 let mut results = vec![None; keys.len()];
1913 for (hash, index, result) in cells {
1914 if let Some(result) = result {
1915 inner.accessed_key_hashes[family].insert(hash);
1916 let result = match result {
1917 LookupValue::Deleted => {
1918 #[cfg(feature = "stats")]
1919 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1920 deleted += 1;
1921 None
1922 }
1923 LookupValue::Slice { value } => {
1924 #[cfg(feature = "stats")]
1925 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1926 result_size += value.len();
1927 Some(value)
1928 }
1929 LookupValue::Blob { sequence_number } => {
1930 #[cfg(feature = "stats")]
1931 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1932 let blob = self.read_blob(sequence_number)?;
1933 result_size += blob.len();
1934 Some(blob)
1935 }
1936 };
1937 results[index] = result;
1938 } else {
1939 #[cfg(feature = "stats")]
1940 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1941 not_found += 1;
1942 }
1943 }
1944 span.record("not_found", not_found);
1945 span.record("deleted", deleted);
1946 span.record("result_size", result_size);
1947 Ok(results)
1948 }
1949
1950 #[cfg(feature = "stats")]
1952 pub fn statistics(&self) -> Statistics {
1953 let inner = self.inner.read();
1954 Statistics {
1955 meta_files: inner.meta_files.len(),
1956 sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1957 key_block_cache: CacheStatistics::new(&self.key_block_cache),
1958 value_block_cache: CacheStatistics::new(&self.value_block_cache),
1959 hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1960 + self.stats.hits_small.load(Ordering::Relaxed)
1961 + self.stats.hits_blob.load(Ordering::Relaxed),
1962 misses: self.stats.miss_global.load(Ordering::Relaxed),
1963 miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1964 miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1965 miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1966 miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1967 }
1968 }
1969
1970 pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1971 Ok(self
1972 .inner
1973 .read()
1974 .meta_files
1975 .iter()
1976 .rev()
1977 .map(|meta_file| {
1978 let entries = meta_file
1979 .entries()
1980 .iter()
1981 .map(|entry| {
1982 let amqf = entry.raw_amqf(meta_file.amqf_data());
1983 MetaFileEntryInfo {
1984 sequence_number: entry.sequence_number(),
1985 min_hash: entry.min_hash(),
1986 max_hash: entry.max_hash(),
1987 sst_size: entry.size(),
1988 flags: entry.flags(),
1989 amqf_size: entry.amqf_size(),
1990 amqf_entries: amqf.len(),
1991 block_count: entry.block_count(),
1992 }
1993 })
1994 .collect();
1995 MetaFileInfo {
1996 sequence_number: meta_file.sequence_number(),
1997 family: meta_file.family(),
1998 obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1999 entries,
2000 }
2001 })
2002 .collect())
2003 }
2004
2005 pub fn shutdown(&self) -> Result<()> {
2008 #[cfg(feature = "print_stats")]
2009 println!("{:#?}", self.statistics());
2010 self.retry_deferred_deletions();
2011 Ok(())
2012 }
2013
2014 fn try_delete_files<'a>(
2017 dir: &'a Path,
2018 seqs: &'a [u32],
2019 ext: &'a str,
2020 ) -> impl Iterator<Item = u32> + 'a {
2021 seqs.iter()
2022 .copied()
2023 .filter(move |&seq| fs::remove_file(dir.join(format!("{seq:08}.{ext}"))).is_err())
2024 }
2025
2026 fn retry_deferred_deletions(&self) {
2031 let mut deferred = self.deferred_deletions.lock();
2032 deferred.retain(|entry| {
2033 let (seq, ext) = match *entry {
2034 DeferredDeletion::Sst(seq) => (seq, "sst"),
2035 DeferredDeletion::Meta(seq) => (seq, "meta"),
2036 DeferredDeletion::Blob(seq) => (seq, "blob"),
2037 };
2038 fs::remove_file(self.path.join(format!("{seq:08}.{ext}"))).is_err()
2040 });
2041 }
2042}
2043
2044fn range_to_str(min: u64, max: u64) -> String {
2045 use std::fmt::Write;
2046 const DISPLAY_SIZE: usize = 100;
2047 const TOTAL_SIZE: u64 = u64::MAX;
2048 let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2049 let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2050 let mut range_str = String::new();
2051 for i in 0..DISPLAY_SIZE {
2052 if i == start_pos && i == end_pos {
2053 range_str.push('O');
2054 } else if i == start_pos {
2055 range_str.push('[');
2056 } else if i == end_pos {
2057 range_str.push(']');
2058 } else if i > start_pos && i < end_pos {
2059 range_str.push('=');
2060 } else {
2061 range_str.push(' ');
2062 }
2063 }
2064 write!(range_str, " | {min:016x}-{max:016x}").unwrap();
2065 range_str
2066}
2067
2068pub struct MetaFileInfo {
2069 pub sequence_number: u32,
2070 pub family: u32,
2071 pub obsolete_sst_files: Vec<u32>,
2072 pub entries: Vec<MetaFileEntryInfo>,
2073}
2074
2075pub struct MetaFileEntryInfo {
2076 pub sequence_number: u32,
2077 pub min_hash: u64,
2078 pub max_hash: u64,
2079 pub amqf_size: u32,
2080 pub amqf_entries: usize,
2081 pub sst_size: u64,
2082 pub flags: MetaEntryFlags,
2083 pub block_count: u16,
2084}