turbo_persistence/
db.rs

1use std::{
2    borrow::Cow,
3    collections::HashSet,
4    fs::{self, File, OpenOptions, ReadDir},
5    io::{BufWriter, Write},
6    mem::{swap, take},
7    ops::RangeInclusive,
8    path::{Path, PathBuf},
9    sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use dashmap::DashSet;
15use jiff::Timestamp;
16use memmap2::Mmap;
17use nohash_hasher::BuildNoHashHasher;
18use parking_lot::{Mutex, RwLock};
19use smallvec::SmallVec;
20
21pub use crate::compaction::selector::CompactConfig;
22use crate::{
23    QueryKey,
24    arc_slice::ArcSlice,
25    compaction::selector::{Compactable, get_merge_segments},
26    compression::decompress_into_arc,
27    constants::{
28        AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
29        KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
30        VALUE_BLOCK_CACHE_SIZE,
31    },
32    key::{StoreKey, hash_key},
33    lookup_entry::{LookupEntry, LookupValue},
34    merge_iter::MergeIter,
35    meta_file::{AmqfCache, MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
36    meta_file_builder::MetaFileBuilder,
37    parallel_scheduler::ParallelScheduler,
38    sst_filter::SstFilter,
39    static_sorted_file::{BlockCache, SstLookupResult},
40    static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
41    write_batch::{FinishResult, WriteBatch},
42};
43
44#[cfg(feature = "stats")]
45#[derive(Debug)]
46pub struct CacheStatistics {
47    pub hit_rate: f32,
48    pub fill: f32,
49    pub items: usize,
50    pub size: u64,
51    pub hits: u64,
52    pub misses: u64,
53}
54
55#[cfg(feature = "stats")]
56impl CacheStatistics {
57    fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
58    where
59        Key: Eq + std::hash::Hash,
60        Val: Clone,
61        We: quick_cache::Weighter<Key, Val> + Clone,
62        B: std::hash::BuildHasher + Clone,
63        L: quick_cache::Lifecycle<Key, Val> + Clone,
64    {
65        let size = cache.weight();
66        let hits = cache.hits();
67        let misses = cache.misses();
68        Self {
69            hit_rate: hits as f32 / (hits + misses) as f32,
70            fill: size as f32 / cache.capacity() as f32,
71            items: cache.len(),
72            size,
73            hits,
74            misses,
75        }
76    }
77}
78
79#[cfg(feature = "stats")]
80#[derive(Debug)]
81pub struct Statistics {
82    pub meta_files: usize,
83    pub sst_files: usize,
84    pub key_block_cache: CacheStatistics,
85    pub value_block_cache: CacheStatistics,
86    pub amqf_cache: CacheStatistics,
87    pub hits: u64,
88    pub misses: u64,
89    pub miss_family: u64,
90    pub miss_range: u64,
91    pub miss_amqf: u64,
92    pub miss_key: u64,
93}
94
95#[cfg(feature = "stats")]
96#[derive(Default)]
97struct TrackedStats {
98    hits_deleted: std::sync::atomic::AtomicU64,
99    hits_small: std::sync::atomic::AtomicU64,
100    hits_blob: std::sync::atomic::AtomicU64,
101    miss_family: std::sync::atomic::AtomicU64,
102    miss_range: std::sync::atomic::AtomicU64,
103    miss_amqf: std::sync::atomic::AtomicU64,
104    miss_key: std::sync::atomic::AtomicU64,
105    miss_global: std::sync::atomic::AtomicU64,
106}
107
108/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
109/// using a single write batch. It allows for concurrent reads.
110pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
111    parallel_scheduler: S,
112    /// The path to the directory where the database is stored
113    path: PathBuf,
114    /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
115    /// no modification on the database is performed.
116    read_only: bool,
117    /// The inner state of the database. Writing will update that.
118    inner: RwLock<Inner<FAMILIES>>,
119    /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent
120    /// write operations.
121    active_write_operation: AtomicBool,
122    /// A cache for deserialized AMQF filters.
123    amqf_cache: AmqfCache,
124    /// A cache for decompressed key blocks.
125    key_block_cache: BlockCache,
126    /// A cache for decompressed value blocks.
127    value_block_cache: BlockCache,
128    /// Statistics for the database.
129    #[cfg(feature = "stats")]
130    stats: TrackedStats,
131}
132
133/// The inner state of the database.
134struct Inner<const FAMILIES: usize> {
135    /// The list of meta files in the database. This is used to derive the SST files.
136    meta_files: Vec<MetaFile>,
137    /// The current sequence number for the database.
138    current_sequence_number: u32,
139    /// The in progress set of hashes of keys that have been accessed.
140    /// It will be flushed onto disk (into a meta file) on next commit.
141    /// It's a dashset to allow modification while only tracking a read lock on Inner.
142    accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
143}
144
145pub struct CommitOptions {
146    new_meta_files: Vec<(u32, File)>,
147    new_sst_files: Vec<(u32, File)>,
148    new_blob_files: Vec<(u32, File)>,
149    sst_seq_numbers_to_delete: Vec<u32>,
150    blob_seq_numbers_to_delete: Vec<u32>,
151    sequence_number: u32,
152    keys_written: u64,
153}
154
155impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
156    /// Open a TurboPersistence database at the given path.
157    /// This will read the directory and might performance cleanup when the database was not closed
158    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
159    /// files, so it's fast.
160    pub fn open(path: PathBuf) -> Result<Self> {
161        Self::open_with_parallel_scheduler(path, Default::default())
162    }
163
164    /// Open a TurboPersistence database at the given path in read only mode.
165    /// This will read the directory. No Cleanup is performed.
166    pub fn open_read_only(path: PathBuf) -> Result<Self> {
167        Self::open_read_only_with_parallel_scheduler(path, Default::default())
168    }
169}
170
171impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
172    fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self {
173        Self {
174            parallel_scheduler,
175            path,
176            read_only,
177            inner: RwLock::new(Inner {
178                meta_files: Vec::new(),
179                current_sequence_number: 0,
180                accessed_key_hashes: [(); FAMILIES]
181                    .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
182            }),
183            active_write_operation: AtomicBool::new(false),
184            amqf_cache: AmqfCache::with(
185                AMQF_CACHE_SIZE as usize / AMQF_AVG_SIZE,
186                AMQF_CACHE_SIZE,
187                Default::default(),
188                Default::default(),
189                Default::default(),
190            ),
191            key_block_cache: BlockCache::with(
192                KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
193                KEY_BLOCK_CACHE_SIZE,
194                Default::default(),
195                Default::default(),
196                Default::default(),
197            ),
198            value_block_cache: BlockCache::with(
199                VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
200                VALUE_BLOCK_CACHE_SIZE,
201                Default::default(),
202                Default::default(),
203                Default::default(),
204            ),
205            #[cfg(feature = "stats")]
206            stats: TrackedStats::default(),
207        }
208    }
209
210    /// Open a TurboPersistence database at the given path.
211    /// This will read the directory and might performance cleanup when the database was not closed
212    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
213    /// files, so it's fast.
214    pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
215        let mut db = Self::new(path, false, parallel_scheduler);
216        db.open_directory(false)?;
217        Ok(db)
218    }
219
220    /// Open a TurboPersistence database at the given path in read only mode.
221    /// This will read the directory. No Cleanup is performed.
222    pub fn open_read_only_with_parallel_scheduler(
223        path: PathBuf,
224        parallel_scheduler: S,
225    ) -> Result<Self> {
226        let mut db = Self::new(path, true, parallel_scheduler);
227        db.open_directory(false)?;
228        Ok(db)
229    }
230
231    /// Performs the initial check on the database directory.
232    fn open_directory(&mut self, read_only: bool) -> Result<()> {
233        match fs::read_dir(&self.path) {
234            Ok(entries) => {
235                if !self
236                    .load_directory(entries, read_only)
237                    .context("Loading persistence directory failed")?
238                {
239                    if read_only {
240                        bail!("Failed to open database");
241                    }
242                    self.init_directory()
243                        .context("Initializing persistence directory failed")?;
244                }
245                Ok(())
246            }
247            Err(e) => {
248                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
249                    self.create_and_init_directory()
250                        .context("Creating and initializing persistence directory failed")?;
251                    Ok(())
252                } else {
253                    Err(e).context("Failed to open database")
254                }
255            }
256        }
257    }
258
259    /// Creates the directory and initializes it.
260    fn create_and_init_directory(&mut self) -> Result<()> {
261        fs::create_dir_all(&self.path)?;
262        self.init_directory()
263    }
264
265    /// Initializes the directory by creating the CURRENT file.
266    fn init_directory(&mut self) -> Result<()> {
267        let mut current = File::create(self.path.join("CURRENT"))?;
268        current.write_u32::<BE>(0)?;
269        current.flush()?;
270        Ok(())
271    }
272
273    /// Loads an existing database directory and performs cleanup if necessary.
274    fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
275        let mut meta_files = Vec::new();
276        let mut current_file = match File::open(self.path.join("CURRENT")) {
277            Ok(file) => file,
278            Err(e) => {
279                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
280                    return Ok(false);
281                } else {
282                    return Err(e).context("Failed to open CURRENT file");
283                }
284            }
285        };
286        let current = current_file.read_u32::<BE>()?;
287        drop(current_file);
288
289        let mut deleted_files = HashSet::new();
290        for entry in entries {
291            let entry = entry?;
292            let path = entry.path();
293            if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
294                let seq: u32 = path
295                    .file_stem()
296                    .context("File has no file stem")?
297                    .to_str()
298                    .context("File stem is not valid utf-8")?
299                    .parse()?;
300                if deleted_files.contains(&seq) {
301                    continue;
302                }
303                if seq > current {
304                    if !read_only {
305                        fs::remove_file(&path)?;
306                    }
307                } else {
308                    match ext {
309                        "meta" => {
310                            meta_files.push(seq);
311                        }
312                        "del" => {
313                            let mut content = &*fs::read(&path)?;
314                            let mut no_existing_files = true;
315                            while !content.is_empty() {
316                                let seq = content.read_u32::<BE>()?;
317                                deleted_files.insert(seq);
318                                if !read_only {
319                                    // Remove the files that are marked for deletion
320                                    let sst_file = self.path.join(format!("{seq:08}.sst"));
321                                    let meta_file = self.path.join(format!("{seq:08}.meta"));
322                                    let blob_file = self.path.join(format!("{seq:08}.blob"));
323                                    for path in [sst_file, meta_file, blob_file] {
324                                        if fs::exists(&path)? {
325                                            fs::remove_file(path)?;
326                                            no_existing_files = false;
327                                        }
328                                    }
329                                }
330                            }
331                            if !read_only && no_existing_files {
332                                fs::remove_file(&path)?;
333                            }
334                        }
335                        "blob" | "sst" => {
336                            // ignore blobs and sst, they are read when needed
337                        }
338                        _ => {
339                            if !path
340                                .file_name()
341                                .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
342                            {
343                                bail!("Unexpected file in persistence directory: {:?}", path);
344                            }
345                        }
346                    }
347                }
348            } else {
349                match path.file_stem().and_then(|s| s.to_str()) {
350                    Some("CURRENT") => {
351                        // Already read
352                    }
353                    Some("LOG") => {
354                        // Ignored, write-only
355                    }
356                    _ => {
357                        if !path
358                            .file_name()
359                            .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
360                        {
361                            bail!("Unexpected file in persistence directory: {:?}", path);
362                        }
363                    }
364                }
365            }
366        }
367
368        meta_files.retain(|seq| !deleted_files.contains(seq));
369        meta_files.sort_unstable();
370        let mut meta_files = self
371            .parallel_scheduler
372            .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
373                let meta_file = MetaFile::open(&self.path, seq)?;
374                Ok(meta_file)
375            })?;
376
377        let mut sst_filter = SstFilter::new();
378        for meta_file in meta_files.iter_mut().rev() {
379            sst_filter.apply_filter(meta_file);
380        }
381
382        let inner = self.inner.get_mut();
383        inner.meta_files = meta_files;
384        inner.current_sequence_number = current;
385        Ok(true)
386    }
387
388    /// Reads and decompresses a blob file. This is not backed by any cache.
389    #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
390    fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
391        let path = self.path.join(format!("{seq:08}.blob"));
392        let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
393        #[cfg(unix)]
394        mmap.advise(memmap2::Advice::Sequential)?;
395        #[cfg(unix)]
396        mmap.advise(memmap2::Advice::WillNeed)?;
397        #[cfg(target_os = "linux")]
398        mmap.advise(memmap2::Advice::DontFork)?;
399        #[cfg(target_os = "linux")]
400        mmap.advise(memmap2::Advice::Unmergeable)?;
401        let mut compressed = &mmap[..];
402        let uncompressed_length = compressed.read_u32::<BE>()?;
403
404        let buffer = decompress_into_arc(uncompressed_length, compressed, None, true)?;
405        Ok(ArcSlice::from(buffer))
406    }
407
408    /// Returns true if the database is empty.
409    pub fn is_empty(&self) -> bool {
410        self.inner.read().meta_files.is_empty()
411    }
412
413    /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a
414    /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
415    /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
416    /// This data will only become visible after the WriteBatch is committed.
417    pub fn write_batch<K: StoreKey + Send + Sync + 'static>(
418        &self,
419    ) -> Result<WriteBatch<K, S, FAMILIES>> {
420        if self.read_only {
421            bail!("Cannot write to a read-only database");
422        }
423        if self
424            .active_write_operation
425            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
426            .is_err()
427        {
428            bail!(
429                "Another write batch or compaction is already active (Only a single write \
430                 operations is allowed at a time)"
431            );
432        }
433        let current = self.inner.read().current_sequence_number;
434        Ok(WriteBatch::new(
435            self.path.clone(),
436            current,
437            self.parallel_scheduler.clone(),
438        ))
439    }
440
441    fn open_log(&self) -> Result<BufWriter<File>> {
442        if self.read_only {
443            unreachable!("Only write operations can open the log file");
444        }
445        let log_path = self.path.join("LOG");
446        let log_file = OpenOptions::new()
447            .create(true)
448            .append(true)
449            .open(log_path)?;
450        Ok(BufWriter::new(log_file))
451    }
452
453    /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
454    /// visible to readers.
455    pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static>(
456        &self,
457        mut write_batch: WriteBatch<K, S, FAMILIES>,
458    ) -> Result<()> {
459        if self.read_only {
460            unreachable!("It's not possible to create a write batch for a read-only database");
461        }
462        let FinishResult {
463            sequence_number,
464            new_meta_files,
465            new_sst_files,
466            new_blob_files,
467            keys_written,
468        } = write_batch.finish(|family| {
469            let inner = self.inner.read();
470            let set = &inner.accessed_key_hashes[family as usize];
471            // len is only a snapshot at that time and it can change while we create the filter.
472            // So we give it 5% more space to make resizes less likely.
473            let initial_capacity = set.len() * 20 / 19;
474            let mut amqf =
475                qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
476                    .unwrap();
477            // This drains items from the set. But due to concurrency it might not be empty
478            // afterwards, but that's fine. It will be part of the next commit.
479            set.retain(|hash| {
480                // Performance-wise it would usually be better to insert sorted fingerprints, but we
481                // assume that hashes are equally distributed, which makes it unnecessary.
482                // Good for cache locality is that we insert in the order of the dashset's buckets.
483                amqf.insert_fingerprint(false, *hash)
484                    .expect("Failed to insert fingerprint");
485                false
486            });
487            amqf
488        })?;
489        self.commit(CommitOptions {
490            new_meta_files,
491            new_sst_files,
492            new_blob_files,
493            sst_seq_numbers_to_delete: vec![],
494            blob_seq_numbers_to_delete: vec![],
495            sequence_number,
496            keys_written,
497        })?;
498        self.active_write_operation.store(false, Ordering::Release);
499        Ok(())
500    }
501
502    /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the
503    /// new files.
504    fn commit(
505        &self,
506        CommitOptions {
507            mut new_meta_files,
508            new_sst_files,
509            mut new_blob_files,
510            mut sst_seq_numbers_to_delete,
511            mut blob_seq_numbers_to_delete,
512            sequence_number: mut seq,
513            keys_written,
514        }: CommitOptions,
515    ) -> Result<(), anyhow::Error> {
516        let time = Timestamp::now();
517
518        new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
519
520        let sync_span = tracing::info_span!("sync new files").entered();
521        let mut new_meta_files = self
522            .parallel_scheduler
523            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
524                file.sync_all()?;
525                let meta_file = MetaFile::open(&self.path, seq)?;
526                Ok(meta_file)
527            })?;
528
529        let mut sst_filter = SstFilter::new();
530        for meta_file in new_meta_files.iter_mut().rev() {
531            sst_filter.apply_filter(meta_file);
532        }
533
534        self.parallel_scheduler.block_in_place(|| {
535            for (_, file) in new_sst_files.iter() {
536                file.sync_all()?;
537            }
538            for (_, file) in new_blob_files.iter() {
539                file.sync_all()?;
540            }
541            anyhow::Ok(())
542        })?;
543        drop(sync_span);
544
545        let new_meta_info = new_meta_files
546            .iter()
547            .map(|meta| {
548                let ssts = meta
549                    .entries()
550                    .iter()
551                    .map(|entry| {
552                        let seq = entry.sequence_number();
553                        let range = entry.range();
554                        let size = entry.size();
555                        let flags = entry.flags();
556                        (seq, range.min_hash, range.max_hash, size, flags)
557                    })
558                    .collect::<Vec<_>>();
559                (
560                    meta.sequence_number(),
561                    meta.family(),
562                    ssts,
563                    meta.obsolete_sst_files().to_vec(),
564                )
565            })
566            .collect::<Vec<_>>();
567
568        let has_delete_file;
569        let mut meta_seq_numbers_to_delete = Vec::new();
570
571        {
572            let mut inner = self.inner.write();
573            for meta_file in inner.meta_files.iter_mut().rev() {
574                sst_filter.apply_filter(meta_file);
575            }
576            inner.meta_files.append(&mut new_meta_files);
577            // apply_and_get_remove need to run in reverse order
578            inner.meta_files.reverse();
579            inner.meta_files.retain(|meta| {
580                if sst_filter.apply_and_get_remove(meta) {
581                    meta_seq_numbers_to_delete.push(meta.sequence_number());
582                    false
583                } else {
584                    true
585                }
586            });
587            inner.meta_files.reverse();
588            has_delete_file = !sst_seq_numbers_to_delete.is_empty()
589                || !blob_seq_numbers_to_delete.is_empty()
590                || !meta_seq_numbers_to_delete.is_empty();
591            if has_delete_file {
592                seq += 1;
593            }
594            inner.current_sequence_number = seq;
595        }
596
597        self.parallel_scheduler.block_in_place(|| {
598            if has_delete_file {
599                sst_seq_numbers_to_delete.sort_unstable();
600                meta_seq_numbers_to_delete.sort_unstable();
601                blob_seq_numbers_to_delete.sort_unstable();
602                // Write *.del file, marking the selected files as to delete
603                let mut buf = Vec::with_capacity(
604                    (sst_seq_numbers_to_delete.len()
605                        + meta_seq_numbers_to_delete.len()
606                        + blob_seq_numbers_to_delete.len())
607                        * size_of::<u32>(),
608                );
609                for seq in sst_seq_numbers_to_delete.iter() {
610                    buf.write_u32::<BE>(*seq)?;
611                }
612                for seq in meta_seq_numbers_to_delete.iter() {
613                    buf.write_u32::<BE>(*seq)?;
614                }
615                for seq in blob_seq_numbers_to_delete.iter() {
616                    buf.write_u32::<BE>(*seq)?;
617                }
618                let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
619                file.write_all(&buf)?;
620                file.sync_all()?;
621            }
622
623            let mut current_file = OpenOptions::new()
624                .write(true)
625                .truncate(false)
626                .read(false)
627                .open(self.path.join("CURRENT"))?;
628            current_file.write_u32::<BE>(seq)?;
629            current_file.sync_all()?;
630
631            for seq in sst_seq_numbers_to_delete.iter() {
632                fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
633            }
634            for seq in meta_seq_numbers_to_delete.iter() {
635                fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
636            }
637            for seq in blob_seq_numbers_to_delete.iter() {
638                fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
639            }
640
641            {
642                let mut log = self.open_log()?;
643                writeln!(log, "Time {time}")?;
644                let span = time.until(Timestamp::now())?;
645                writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
646                writeln!(log, "FAM | META SEQ | SST SEQ         | RANGE")?;
647                for (meta_seq, family, ssts, obsolete) in new_meta_info {
648                    for (seq, min, max, size, flags) in ssts {
649                        writeln!(
650                            log,
651                            "{family:3} | {meta_seq:08} | {seq:08} SST    | {} ({} MiB, {})",
652                            range_to_str(min, max),
653                            size / 1024 / 1024,
654                            flags
655                        )?;
656                    }
657                    for obsolete in obsolete.chunks(15) {
658                        write!(log, "{family:3} | {meta_seq:08} |")?;
659                        for seq in obsolete {
660                            write!(log, " {seq:08}")?;
661                        }
662                        writeln!(log, " OBSOLETE SST")?;
663                    }
664                }
665
666                fn write_seq_numbers<W: std::io::Write, T, I>(
667                    log: &mut W,
668                    items: I,
669                    label: &str,
670                    extract_seq: fn(&T) -> u32,
671                ) -> std::io::Result<()>
672                where
673                    I: IntoIterator<Item = T>,
674                {
675                    let items: Vec<T> = items.into_iter().collect();
676                    for chunk in items.chunks(15) {
677                        write!(log, "    |          |")?;
678                        for item in chunk {
679                            write!(log, " {:08}", extract_seq(item))?;
680                        }
681                        writeln!(log, " {}", label)?;
682                    }
683                    Ok(())
684                }
685
686                new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
687                write_seq_numbers(&mut log, new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
688                write_seq_numbers(
689                    &mut log,
690                    blob_seq_numbers_to_delete,
691                    "BLOB DELETED",
692                    |&seq| seq,
693                )?;
694                write_seq_numbers(&mut log, sst_seq_numbers_to_delete, "SST DELETED", |&seq| {
695                    seq
696                })?;
697                write_seq_numbers(
698                    &mut log,
699                    meta_seq_numbers_to_delete,
700                    "META DELETED",
701                    |&seq| seq,
702                )?;
703                #[cfg(feature = "verbose_log")]
704                {
705                    writeln!(log, "New database state:")?;
706                    writeln!(log, "FAM | META SEQ | SST SEQ  FLAGS | RANGE")?;
707                    let inner = self.inner.read();
708                    let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
709                        let mut set = HashSet::new();
710                        move |family| set.insert(*family)
711                    });
712                    for family in families {
713                        for meta in inner.meta_files.iter() {
714                            if meta.family() != family {
715                                continue;
716                            }
717                            let meta_seq = meta.sequence_number();
718                            for entry in meta.entries().iter() {
719                                let seq = entry.sequence_number();
720                                let range = entry.range();
721                                writeln!(
722                                    log,
723                                    "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
724                                    entry.flags(),
725                                    range_to_str(range.min_hash, range.max_hash)
726                                )?;
727                            }
728                        }
729                    }
730                }
731            }
732            anyhow::Ok(())
733        })?;
734        Ok(())
735    }
736
737    /// Runs a full compaction on the database. This will rewrite all SST files, removing all
738    /// duplicate keys and separating all key ranges into unique files.
739    pub fn full_compact(&self) -> Result<()> {
740        self.compact(&CompactConfig {
741            min_merge_count: 2,
742            optimal_merge_count: usize::MAX,
743            max_merge_count: usize::MAX,
744            max_merge_bytes: u64::MAX,
745            min_merge_duplication_bytes: 0,
746            optimal_merge_duplication_bytes: u64::MAX,
747            max_merge_segment_count: usize::MAX,
748        })?;
749        Ok(())
750    }
751
752    /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
753    /// files is above the given threshold. The coverage is the average number of SST files that
754    /// need to be read to find a key. It also limits the maximum number of SST files that are
755    /// merged at once, which is the main factor for the runtime of the compaction.
756    pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
757        if self.read_only {
758            bail!("Compaction is not allowed on a read only database");
759        }
760        let _span = tracing::info_span!("compact database").entered();
761        if self
762            .active_write_operation
763            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
764            .is_err()
765        {
766            bail!(
767                "Another write batch or compaction is already active (Only a single write \
768                 operations is allowed at a time)"
769            );
770        }
771
772        let mut sequence_number;
773        let mut new_meta_files = Vec::new();
774        let mut new_sst_files = Vec::new();
775        let mut sst_seq_numbers_to_delete = Vec::new();
776        let mut blob_seq_numbers_to_delete = Vec::new();
777        let mut keys_written = 0;
778
779        {
780            let inner = self.inner.read();
781            sequence_number = AtomicU32::new(inner.current_sequence_number);
782            self.compact_internal(
783                &inner.meta_files,
784                &sequence_number,
785                &mut new_meta_files,
786                &mut new_sst_files,
787                &mut sst_seq_numbers_to_delete,
788                &mut blob_seq_numbers_to_delete,
789                &mut keys_written,
790                compact_config,
791            )
792            .context("Failed to compact database")?;
793        }
794
795        let has_changes = !new_meta_files.is_empty();
796        if has_changes {
797            self.commit(CommitOptions {
798                new_meta_files,
799                new_sst_files,
800                new_blob_files: Vec::new(),
801                sst_seq_numbers_to_delete,
802                blob_seq_numbers_to_delete,
803                sequence_number: *sequence_number.get_mut(),
804                keys_written,
805            })
806            .context("Failed to commit the database compaction")?;
807        }
808
809        self.active_write_operation.store(false, Ordering::Release);
810
811        Ok(has_changes)
812    }
813
814    /// Internal function to perform a compaction.
815    fn compact_internal(
816        &self,
817        meta_files: &[MetaFile],
818        sequence_number: &AtomicU32,
819        new_meta_files: &mut Vec<(u32, File)>,
820        new_sst_files: &mut Vec<(u32, File)>,
821        sst_seq_numbers_to_delete: &mut Vec<u32>,
822        blob_seq_numbers_to_delete: &mut Vec<u32>,
823        keys_written: &mut u64,
824        compact_config: &CompactConfig,
825    ) -> Result<()> {
826        if meta_files.is_empty() {
827            return Ok(());
828        }
829
830        struct SstWithRange {
831            meta_index: usize,
832            index_in_meta: u32,
833            seq: u32,
834            range: StaticSortedFileRange,
835            size: u64,
836            flags: MetaEntryFlags,
837        }
838
839        impl Compactable for SstWithRange {
840            fn range(&self) -> RangeInclusive<u64> {
841                self.range.min_hash..=self.range.max_hash
842            }
843
844            fn size(&self) -> u64 {
845                self.size
846            }
847
848            fn category(&self) -> u8 {
849                // Cold and non-cold files are placed separately so we pass different category
850                // values to ensure they are not merged together.
851                if self.flags.cold() { 1 } else { 0 }
852            }
853        }
854
855        let ssts_with_ranges = meta_files
856            .iter()
857            .enumerate()
858            .flat_map(|(meta_index, meta)| {
859                meta.entries()
860                    .iter()
861                    .enumerate()
862                    .map(move |(index_in_meta, entry)| SstWithRange {
863                        meta_index,
864                        index_in_meta: index_in_meta as u32,
865                        seq: entry.sequence_number(),
866                        range: entry.range(),
867                        size: entry.size(),
868                        flags: entry.flags(),
869                    })
870            })
871            .collect::<Vec<_>>();
872
873        let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
874
875        for sst in ssts_with_ranges {
876            sst_by_family[sst.range.family as usize].push(sst);
877        }
878
879        let key_block_cache = &self.key_block_cache;
880        let value_block_cache = &self.value_block_cache;
881        let path = &self.path;
882
883        let log_mutex = Mutex::new(());
884
885        struct PartialResultPerFamily {
886            new_meta_file: Option<(u32, File)>,
887            new_sst_files: Vec<(u32, File)>,
888            sst_seq_numbers_to_delete: Vec<u32>,
889            blob_seq_numbers_to_delete: Vec<u32>,
890            keys_written: u64,
891        }
892
893        let mut compact_config = compact_config.clone();
894        let merge_jobs = sst_by_family
895            .into_iter()
896            .enumerate()
897            .filter_map(|(family, ssts_with_ranges)| {
898                if compact_config.max_merge_segment_count == 0 {
899                    return None;
900                }
901                let (merge_jobs, real_merge_job_size) =
902                    get_merge_segments(&ssts_with_ranges, &compact_config);
903                compact_config.max_merge_segment_count -= real_merge_job_size;
904                Some((family, ssts_with_ranges, merge_jobs))
905            })
906            .collect::<Vec<_>>();
907
908        let mut used_key_hashes = [(); FAMILIES].map(|_| Vec::new());
909
910        {
911            for &(family, ..) in merge_jobs.iter() {
912                used_key_hashes[family].extend(
913                    meta_files
914                        .iter()
915                        .filter(|m| m.family() == family as u32)
916                        .filter_map(|meta_file| {
917                            meta_file.deserialize_used_key_hashes_amqf().transpose()
918                        })
919                        .collect::<Result<Vec<_>>>()?,
920                );
921            }
922        }
923
924        let result = self
925            .parallel_scheduler
926            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
927                merge_jobs,
928                |(family, ssts_with_ranges, merge_jobs)| {
929                    let family = family as u32;
930
931                    if merge_jobs.is_empty() {
932                        return Ok(PartialResultPerFamily {
933                            new_meta_file: None,
934                            new_sst_files: Vec::new(),
935                            sst_seq_numbers_to_delete: Vec::new(),
936                            blob_seq_numbers_to_delete: Vec::new(),
937                            keys_written: 0,
938                        });
939                    }
940
941                    // Later we will remove the merged files
942                    let sst_seq_numbers_to_delete = merge_jobs
943                        .iter()
944                        .filter(|l| l.len() > 1)
945                        .flat_map(|l| l.iter().copied())
946                        .map(|index| ssts_with_ranges[index].seq)
947                        .collect::<Vec<_>>();
948
949                    // Merge SST files
950                    let span = tracing::trace_span!("merge files");
951                    enum PartialMergeResult<'l> {
952                        Merged {
953                            new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
954                            blob_seq_numbers_to_delete: Vec<u32>,
955                            keys_written: u64,
956                            indicies: SmallVec<[usize; 1]>,
957                        },
958                        Move {
959                            seq: u32,
960                            meta: StaticSortedFileBuilderMeta<'l>,
961                        },
962                    }
963                    let merge_result = self
964                        .parallel_scheduler
965                        .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
966                            merge_jobs,
967                            |indicies| {
968                                let _span = span.clone().entered();
969                                if indicies.len() == 1 {
970                                    // If we only have one file, we can just move it
971                                    let index = indicies[0];
972                                    let meta_index = ssts_with_ranges[index].meta_index;
973                                    let index_in_meta = ssts_with_ranges[index].index_in_meta;
974                                    let meta_file = &meta_files[meta_index];
975                                    let entry = meta_file.entry(index_in_meta);
976                                    let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
977                                    let meta = StaticSortedFileBuilderMeta {
978                                        min_hash: entry.min_hash(),
979                                        max_hash: entry.max_hash(),
980                                        amqf,
981                                        key_compression_dictionary_length: entry
982                                            .key_compression_dictionary_length(),
983                                        block_count: entry.block_count(),
984                                        size: entry.size(),
985                                        flags: entry.flags(),
986                                        entries: 0,
987                                    };
988                                    return Ok(PartialMergeResult::Move {
989                                        seq: entry.sequence_number(),
990                                        meta,
991                                    });
992                                }
993
994                                fn create_sst_file<'l, S: ParallelScheduler>(
995                                    parallel_scheduler: &S,
996                                    entries: &[LookupEntry<'l>],
997                                    total_key_size: usize,
998                                    path: &Path,
999                                    seq: u32,
1000                                    flags: MetaEntryFlags,
1001                                ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
1002                                {
1003                                    let _span =
1004                                        tracing::trace_span!("write merged sst file").entered();
1005                                    let (meta, file) = parallel_scheduler.block_in_place(|| {
1006                                        write_static_stored_file(
1007                                            entries,
1008                                            total_key_size,
1009                                            &path.join(format!("{seq:08}.sst")),
1010                                            flags,
1011                                        )
1012                                    })?;
1013                                    Ok((seq, file, meta))
1014                                }
1015
1016                                // Iterate all SST files
1017                                let iters = indicies
1018                                    .iter()
1019                                    .map(|&index| {
1020                                        let meta_index = ssts_with_ranges[index].meta_index;
1021                                        let index_in_meta = ssts_with_ranges[index].index_in_meta;
1022                                        let meta = &meta_files[meta_index];
1023                                        meta.entry(index_in_meta)
1024                                            .sst(meta)?
1025                                            .iter(key_block_cache, value_block_cache)
1026                                    })
1027                                    .collect::<Result<Vec<_>>>()?;
1028
1029                                let iter = MergeIter::new(iters.into_iter())?;
1030
1031                                // TODO figure out how to delete blobs when they are no longer
1032                                // referenced
1033                                let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1034
1035                                let mut keys_written = 0;
1036
1037                                let mut current: Option<LookupEntry<'_>> = None;
1038
1039                                #[derive(Default)]
1040                                struct Collector<'l> {
1041                                    entries: Vec<LookupEntry<'l>>,
1042                                    total_key_size: usize,
1043                                    total_value_size: usize,
1044                                    last_entries: Vec<LookupEntry<'l>>,
1045                                    last_entries_total_key_size: usize,
1046                                    new_sst_files:
1047                                        Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1048                                }
1049                                let mut used_collector = Collector::default();
1050                                let mut unused_collector = Collector::default();
1051                                for entry in iter {
1052                                    let entry = entry?;
1053
1054                                    // Remove duplicates
1055                                    if let Some(current) = current.take() {
1056                                        if current.key != entry.key {
1057                                            let is_used =
1058                                                used_key_hashes[family as usize].iter().any(
1059                                                    |amqf| amqf.contains_fingerprint(current.hash),
1060                                                );
1061                                            let collector = if is_used {
1062                                                &mut used_collector
1063                                            } else {
1064                                                &mut unused_collector
1065                                            };
1066                                            let key_size = current.key.len();
1067                                            let value_size =
1068                                                current.value.uncompressed_size_in_sst();
1069                                            collector.total_key_size += key_size;
1070                                            collector.total_value_size += value_size;
1071
1072                                            if collector.total_key_size + collector.total_value_size
1073                                                > DATA_THRESHOLD_PER_COMPACTED_FILE
1074                                                || collector.entries.len()
1075                                                    >= MAX_ENTRIES_PER_COMPACTED_FILE
1076                                            {
1077                                                let selected_total_key_size =
1078                                                    collector.last_entries_total_key_size;
1079                                                swap(
1080                                                    &mut collector.entries,
1081                                                    &mut collector.last_entries,
1082                                                );
1083                                                collector.last_entries_total_key_size =
1084                                                    collector.total_key_size - key_size;
1085                                                collector.total_key_size = key_size;
1086                                                collector.total_value_size = value_size;
1087
1088                                                if !collector.entries.is_empty() {
1089                                                    let seq = sequence_number
1090                                                        .fetch_add(1, Ordering::SeqCst)
1091                                                        + 1;
1092
1093                                                    keys_written += collector.entries.len() as u64;
1094
1095                                                    let mut flags = MetaEntryFlags::default();
1096                                                    flags.set_cold(!is_used);
1097                                                    collector.new_sst_files.push(create_sst_file(
1098                                                        &self.parallel_scheduler,
1099                                                        &collector.entries,
1100                                                        selected_total_key_size,
1101                                                        path,
1102                                                        seq,
1103                                                        flags,
1104                                                    )?);
1105
1106                                                    collector.entries.clear();
1107                                                }
1108                                            }
1109
1110                                            collector.entries.push(current);
1111                                        } else {
1112                                            // Override value
1113                                            // TODO delete blob file
1114                                        }
1115                                    }
1116                                    current = Some(entry);
1117                                }
1118                                if let Some(entry) = current {
1119                                    let is_used = used_key_hashes[family as usize]
1120                                        .iter()
1121                                        .any(|amqf| amqf.contains_fingerprint(entry.hash));
1122                                    let collector = if is_used {
1123                                        &mut used_collector
1124                                    } else {
1125                                        &mut unused_collector
1126                                    };
1127
1128                                    collector.total_key_size += entry.key.len();
1129                                    // Obsolete as we no longer need total_value_size
1130                                    // total_value_size += entry.value.uncompressed_size_in_sst();
1131                                    collector.entries.push(entry);
1132                                }
1133
1134                                // If we have one set of entries left, write them to a new SST file
1135                                for (collector, flags) in [
1136                                    (&mut used_collector, MetaEntryFlags::WARM),
1137                                    (&mut unused_collector, MetaEntryFlags::COLD),
1138                                ] {
1139                                    if collector.last_entries.is_empty()
1140                                        && !collector.entries.is_empty()
1141                                    {
1142                                        let seq =
1143                                            sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1144
1145                                        keys_written += collector.entries.len() as u64;
1146                                        collector.new_sst_files.push(create_sst_file(
1147                                            &self.parallel_scheduler,
1148                                            &collector.entries,
1149                                            collector.total_key_size,
1150                                            path,
1151                                            seq,
1152                                            flags,
1153                                        )?);
1154                                    } else
1155                                    // If we have two sets of entries left, merge them and
1156                                    // split it into two SST files, to avoid having a
1157                                    // single SST file that is very small.
1158                                    if !collector.last_entries.is_empty() {
1159                                        collector.last_entries.append(&mut collector.entries);
1160
1161                                        collector.last_entries_total_key_size +=
1162                                            collector.total_key_size;
1163
1164                                        let (part1, part2) = collector
1165                                            .last_entries
1166                                            .split_at(collector.last_entries.len() / 2);
1167
1168                                        let seq1 =
1169                                            sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1170                                        let seq2 =
1171                                            sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1172
1173                                        keys_written += part1.len() as u64;
1174                                        collector.new_sst_files.push(create_sst_file(
1175                                            &self.parallel_scheduler,
1176                                            part1,
1177                                            // We don't know the exact sizes so we estimate them
1178                                            collector.last_entries_total_key_size / 2,
1179                                            path,
1180                                            seq1,
1181                                            flags,
1182                                        )?);
1183
1184                                        keys_written += part2.len() as u64;
1185                                        collector.new_sst_files.push(create_sst_file(
1186                                            &self.parallel_scheduler,
1187                                            part2,
1188                                            collector.last_entries_total_key_size / 2,
1189                                            path,
1190                                            seq2,
1191                                            flags,
1192                                        )?);
1193                                    }
1194                                }
1195                                let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1196                                new_sst_files.append(&mut used_collector.new_sst_files);
1197                                Ok(PartialMergeResult::Merged {
1198                                    new_sst_files,
1199                                    blob_seq_numbers_to_delete,
1200                                    keys_written,
1201                                    indicies,
1202                                })
1203                            },
1204                        )
1205                        .with_context(|| {
1206                            format!("Failed to merge database files for family {family}")
1207                        })?;
1208
1209                    let Some((sst_files_len, blob_delete_len)) = merge_result
1210                        .iter()
1211                        .map(|r| {
1212                            if let PartialMergeResult::Merged {
1213                                new_sst_files,
1214                                blob_seq_numbers_to_delete,
1215                                indicies: _,
1216                                keys_written: _,
1217                            } = r
1218                            {
1219                                (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1220                            } else {
1221                                (0, 0)
1222                            }
1223                        })
1224                        .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1225                    else {
1226                        unreachable!()
1227                    };
1228
1229                    let mut new_sst_files = Vec::with_capacity(sst_files_len);
1230                    let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1231
1232                    let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1233                    let mut meta_file_builder = MetaFileBuilder::new(family);
1234
1235                    let mut keys_written = 0;
1236                    self.parallel_scheduler.block_in_place(|| {
1237                        let guard = log_mutex.lock();
1238                        let mut log = self.open_log()?;
1239                        writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1240                        for result in merge_result {
1241                            match result {
1242                                PartialMergeResult::Merged {
1243                                    new_sst_files: merged_new_sst_files,
1244                                    blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1245                                    keys_written: merged_keys_written,
1246                                    indicies,
1247                                } => {
1248                                    writeln!(
1249                                        log,
1250                                        "{family:3} | {meta_seq:08} | MERGE \
1251                                         ({merged_keys_written} keys):"
1252                                    )?;
1253                                    for i in indicies.iter() {
1254                                        let seq = ssts_with_ranges[*i].seq;
1255                                        let (min, max) = ssts_with_ranges[*i].range().into_inner();
1256                                        writeln!(
1257                                            log,
1258                                            "{family:3} | {meta_seq:08} | {seq:08} INPUT  | {}",
1259                                            range_to_str(min, max)
1260                                        )?;
1261                                    }
1262                                    for (seq, file, meta) in merged_new_sst_files {
1263                                        let min = meta.min_hash;
1264                                        let max = meta.max_hash;
1265                                        writeln!(
1266                                            log,
1267                                            "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1268                                             ({})",
1269                                            range_to_str(min, max),
1270                                            meta.flags
1271                                        )?;
1272
1273                                        meta_file_builder.add(seq, meta);
1274                                        new_sst_files.push((seq, file));
1275                                    }
1276                                    blob_seq_numbers_to_delete
1277                                        .extend(merged_blob_seq_numbers_to_delete);
1278                                    keys_written += merged_keys_written;
1279                                }
1280                                PartialMergeResult::Move { seq, meta } => {
1281                                    let min = meta.min_hash;
1282                                    let max = meta.max_hash;
1283                                    writeln!(
1284                                        log,
1285                                        "{family:3} | {meta_seq:08} | {seq:08} MOVED  | {}",
1286                                        range_to_str(min, max)
1287                                    )?;
1288
1289                                    meta_file_builder.add(seq, meta);
1290                                }
1291                            }
1292                        }
1293                        drop(log);
1294                        drop(guard);
1295
1296                        anyhow::Ok(())
1297                    })?;
1298
1299                    for &seq in sst_seq_numbers_to_delete.iter() {
1300                        meta_file_builder.add_obsolete_sst_file(seq);
1301                    }
1302
1303                    let meta_file = {
1304                        let _span = tracing::trace_span!("write meta file").entered();
1305                        self.parallel_scheduler
1306                            .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1307                    };
1308
1309                    Ok(PartialResultPerFamily {
1310                        new_meta_file: Some((meta_seq, meta_file)),
1311                        new_sst_files,
1312                        sst_seq_numbers_to_delete,
1313                        blob_seq_numbers_to_delete,
1314                        keys_written,
1315                    })
1316                },
1317            )?;
1318
1319        for PartialResultPerFamily {
1320            new_meta_file: inner_new_meta_file,
1321            new_sst_files: mut inner_new_sst_files,
1322            sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1323            blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1324            keys_written: inner_keys_written,
1325        } in result
1326        {
1327            new_meta_files.extend(inner_new_meta_file);
1328            new_sst_files.append(&mut inner_new_sst_files);
1329            sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1330            blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1331            *keys_written += inner_keys_written;
1332        }
1333
1334        Ok(())
1335    }
1336
1337    /// Get a value from the database. Returns None if the key is not found. The returned value
1338    /// might hold onto a block of the database and it should not be hold long-term.
1339    pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1340        debug_assert!(family < FAMILIES, "Family index out of bounds");
1341        let hash = hash_key(key);
1342        let inner = self.inner.read();
1343        for meta in inner.meta_files.iter().rev() {
1344            match meta.lookup(
1345                family as u32,
1346                hash,
1347                key,
1348                &self.amqf_cache,
1349                &self.key_block_cache,
1350                &self.value_block_cache,
1351            )? {
1352                MetaLookupResult::FamilyMiss => {
1353                    #[cfg(feature = "stats")]
1354                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1355                }
1356                MetaLookupResult::RangeMiss => {
1357                    #[cfg(feature = "stats")]
1358                    self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1359                }
1360                MetaLookupResult::QuickFilterMiss => {
1361                    #[cfg(feature = "stats")]
1362                    self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1363                }
1364                MetaLookupResult::SstLookup(result) => match result {
1365                    SstLookupResult::Found(result) => {
1366                        inner.accessed_key_hashes[family].insert(hash);
1367                        match result {
1368                            LookupValue::Deleted => {
1369                                #[cfg(feature = "stats")]
1370                                self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1371                                return Ok(None);
1372                            }
1373                            LookupValue::Slice { value } => {
1374                                #[cfg(feature = "stats")]
1375                                self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1376                                return Ok(Some(value));
1377                            }
1378                            LookupValue::Blob { sequence_number } => {
1379                                #[cfg(feature = "stats")]
1380                                self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1381                                let blob = self.read_blob(sequence_number)?;
1382                                return Ok(Some(blob));
1383                            }
1384                        }
1385                    }
1386                    SstLookupResult::NotFound => {
1387                        #[cfg(feature = "stats")]
1388                        self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1389                    }
1390                },
1391            }
1392        }
1393        #[cfg(feature = "stats")]
1394        self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1395        Ok(None)
1396    }
1397
1398    /// Returns database statistics.
1399    #[cfg(feature = "stats")]
1400    pub fn statistics(&self) -> Statistics {
1401        let inner = self.inner.read();
1402        Statistics {
1403            meta_files: inner.meta_files.len(),
1404            sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1405            key_block_cache: CacheStatistics::new(&self.key_block_cache),
1406            value_block_cache: CacheStatistics::new(&self.value_block_cache),
1407            amqf_cache: CacheStatistics::new(&self.amqf_cache),
1408            hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1409                + self.stats.hits_small.load(Ordering::Relaxed)
1410                + self.stats.hits_blob.load(Ordering::Relaxed),
1411            misses: self.stats.miss_global.load(Ordering::Relaxed),
1412            miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1413            miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1414            miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1415            miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1416        }
1417    }
1418
1419    pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1420        Ok(self
1421            .inner
1422            .read()
1423            .meta_files
1424            .iter()
1425            .rev()
1426            .map(|meta_file| {
1427                let entries = meta_file
1428                    .entries()
1429                    .iter()
1430                    .map(|entry| {
1431                        let amqf = entry.raw_amqf(meta_file.amqf_data());
1432                        MetaFileEntryInfo {
1433                            sequence_number: entry.sequence_number(),
1434                            min_hash: entry.min_hash(),
1435                            max_hash: entry.max_hash(),
1436                            sst_size: entry.size(),
1437                            flags: entry.flags(),
1438                            amqf_size: entry.amqf_size(),
1439                            amqf_entries: amqf.len(),
1440                            key_compression_dictionary_size: entry
1441                                .key_compression_dictionary_length(),
1442                            block_count: entry.block_count(),
1443                        }
1444                    })
1445                    .collect();
1446                MetaFileInfo {
1447                    sequence_number: meta_file.sequence_number(),
1448                    family: meta_file.family(),
1449                    obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1450                    entries,
1451                }
1452            })
1453            .collect())
1454    }
1455
1456    /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
1457    pub fn shutdown(&self) -> Result<()> {
1458        #[cfg(feature = "print_stats")]
1459        println!("{:#?}", self.statistics());
1460        Ok(())
1461    }
1462}
1463
1464fn range_to_str(min: u64, max: u64) -> String {
1465    use std::fmt::Write;
1466    const DISPLAY_SIZE: usize = 100;
1467    const TOTAL_SIZE: u64 = u64::MAX;
1468    let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1469    let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1470    let mut range_str = String::new();
1471    for i in 0..DISPLAY_SIZE {
1472        if i == start_pos && i == end_pos {
1473            range_str.push('O');
1474        } else if i == start_pos {
1475            range_str.push('[');
1476        } else if i == end_pos {
1477            range_str.push(']');
1478        } else if i > start_pos && i < end_pos {
1479            range_str.push('=');
1480        } else {
1481            range_str.push(' ');
1482        }
1483    }
1484    write!(range_str, " | {min:016x}-{max:016x}").unwrap();
1485    range_str
1486}
1487
1488pub struct MetaFileInfo {
1489    pub sequence_number: u32,
1490    pub family: u32,
1491    pub obsolete_sst_files: Vec<u32>,
1492    pub entries: Vec<MetaFileEntryInfo>,
1493}
1494
1495pub struct MetaFileEntryInfo {
1496    pub sequence_number: u32,
1497    pub min_hash: u64,
1498    pub max_hash: u64,
1499    pub amqf_size: u32,
1500    pub amqf_entries: usize,
1501    pub sst_size: u64,
1502    pub flags: MetaEntryFlags,
1503    pub key_compression_dictionary_size: u16,
1504    pub block_count: u16,
1505}