Skip to main content

turbo_persistence/
db.rs

1use std::{
2    borrow::Cow,
3    collections::HashSet,
4    fs::{self, File, OpenOptions, ReadDir},
5    io::{BufWriter, Write},
6    mem::take,
7    ops::RangeInclusive,
8    path::{Path, PathBuf},
9    sync::{
10        OnceLock,
11        atomic::{AtomicBool, AtomicU32, Ordering},
12    },
13};
14
15use anyhow::{Context, Result, bail};
16use byteorder::{BE, ReadBytesExt, WriteBytesExt};
17use dashmap::DashSet;
18use jiff::Timestamp;
19use memmap2::Mmap;
20use nohash_hasher::BuildNoHashHasher;
21use parking_lot::{Mutex, RwLock};
22use smallvec::SmallVec;
23use tracing::span::EnteredSpan;
24
25pub use crate::compaction::selector::CompactConfig;
26use crate::{
27    DbConfig, FamilyKind, QueryKey,
28    arc_bytes::ArcBytes,
29    compaction::selector::{Compactable, get_merge_segments},
30    compression::{checksum_block, decompress_into_arc},
31    constants::{
32        DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE,
33        MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE,
34    },
35    key::{StoreKey, hash_key},
36    lookup_entry::{IterValue, LookupEntry, LookupValue},
37    merge_iter::MergeIter,
38    meta_file::{MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
39    meta_file_builder::MetaFileBuilder,
40    mmap_helper::advise_mmap_for_persistence,
41    parallel_scheduler::ParallelScheduler,
42    rc_bytes::RcBytes,
43    sst_filter::SstFilter,
44    static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFileIter},
45    static_sorted_file_builder::{StaticSortedFileBuilderMeta, StreamingSstWriter},
46    write_batch::{FinishResult, WriteBatch},
47};
48
49#[cfg(feature = "stats")]
50#[derive(Debug)]
51pub struct CacheStatistics {
52    pub hit_rate: f32,
53    pub fill: f32,
54    pub items: usize,
55    pub size: u64,
56    pub hits: u64,
57    pub misses: u64,
58}
59
60#[cfg(feature = "stats")]
61impl CacheStatistics {
62    fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
63    where
64        Key: Eq + std::hash::Hash,
65        Val: Clone,
66        We: quick_cache::Weighter<Key, Val> + Clone,
67        B: std::hash::BuildHasher + Clone,
68        L: quick_cache::Lifecycle<Key, Val> + Clone,
69    {
70        let size = cache.weight();
71        let hits = cache.hits();
72        let misses = cache.misses();
73        Self {
74            hit_rate: hits as f32 / (hits + misses) as f32,
75            fill: size as f32 / cache.capacity() as f32,
76            items: cache.len(),
77            size,
78            hits,
79            misses,
80        }
81    }
82}
83
84#[cfg(feature = "stats")]
85#[derive(Debug)]
86pub struct Statistics {
87    pub meta_files: usize,
88    pub sst_files: usize,
89    pub key_block_cache: CacheStatistics,
90    pub value_block_cache: CacheStatistics,
91    pub hits: u64,
92    pub misses: u64,
93    pub miss_family: u64,
94    pub miss_range: u64,
95    pub miss_amqf: u64,
96    pub miss_key: u64,
97}
98
99#[cfg(feature = "stats")]
100#[derive(Default)]
101struct TrackedStats {
102    hits_deleted: std::sync::atomic::AtomicU64,
103    hits_small: std::sync::atomic::AtomicU64,
104    hits_blob: std::sync::atomic::AtomicU64,
105    miss_family: std::sync::atomic::AtomicU64,
106    miss_range: std::sync::atomic::AtomicU64,
107    miss_amqf: std::sync::atomic::AtomicU64,
108    miss_key: std::sync::atomic::AtomicU64,
109    miss_global: std::sync::atomic::AtomicU64,
110}
111
112/// State of the active write slot.
113enum ActiveWriteState {
114    /// A write operation or compaction is in progress.
115    /// The string is a human-readable name used in error messages.
116    Active(&'static str),
117    /// A previous write or compaction failed and recovery also failed.
118    /// No further writes are possible.
119    Error,
120}
121
122/// A single superseded file whose deletion failed and is being retried.
123///
124/// On Linux/macOS, deleting a memory-mapped file is safe and this list is
125/// normally empty. On Windows, open memory maps prevent deletion; failed files
126/// are collected here and retried on the next commit or shutdown.
127enum DeferredDeletion {
128    Sst(u32),
129    Meta(u32),
130    Blob(u32),
131}
132
133/// RAII guard for an active write operation.
134///
135/// When dropped without [`WriteOperationGuard::success`] being called first, the guard rolls back
136/// the operation by deleting any files whose sequence number exceeds `seq_before` (the sequence
137/// number at the time the operation started). If rollback itself fails the write slot is set to
138/// [`ActiveWriteState::Error`], permanently disabling further writes.
139pub(crate) struct WriteOperationGuard<'a> {
140    /// Reference to the active-write-operation slot, so we can clear or error it on drop.
141    active: &'a Mutex<Option<ActiveWriteState>>,
142    /// Database directory path, needed for orphan-file deletion during rollback.
143    path: &'a Path,
144    /// Sequence number at the time the operation started (= the last committed seq on disk).
145    /// Files with seq > this were created by the current operation and must be deleted on
146    /// rollback.
147    seq_before: u32,
148    /// Set to `true` by [`WriteOperationGuard::success`] to skip rollback on drop.
149    succeeded: bool,
150}
151
152impl WriteOperationGuard<'_> {
153    /// Mark the operation as successfully completed.
154    ///
155    /// After this call the guard's `Drop` impl will release the write slot without rolling back.
156    pub(crate) fn success(&mut self) {
157        self.succeeded = true;
158    }
159}
160
161/// Deletes all files in `path` whose numeric stem is greater than `seq_before`.
162///
163/// Called on rollback to clean up any SST, meta, blob, or del files written during a
164/// failed write operation or compaction.
165fn delete_orphan_files(path: &Path, seq_before: u32) -> Result<()> {
166    // Restore CURRENT to seq_before first. The failure may have happened mid-write
167    // to CURRENT, leaving it partially written. Writing seq_before makes the
168    // on-disk state consistent before we start deleting orphan files.
169    let mut current_file = OpenOptions::new()
170        .write(true)
171        .truncate(false)
172        .read(false)
173        .open(path.join("CURRENT"))?;
174    current_file.write_u32::<BE>(seq_before)?;
175    current_file.sync_all()?;
176
177    for entry in fs::read_dir(path)? {
178        let entry = entry?;
179        let path = entry.path();
180        if let Some(ext) = path.extension().and_then(|s| s.to_str())
181            && let Some(seq) = path
182                .file_stem()
183                .and_then(|s| s.to_str())
184                .and_then(|s| s.parse::<u32>().ok())
185            && seq > seq_before
186        {
187            match ext {
188                "sst" | "meta" | "blob" | "del" => fs::remove_file(&path)?,
189                _ => {}
190            }
191        }
192    }
193    Ok(())
194}
195
196impl Drop for WriteOperationGuard<'_> {
197    fn drop(&mut self) {
198        if self.succeeded {
199            // Happy path: just release the slot.
200            *self.active.lock() = None;
201            return;
202        }
203
204        // Unhappy path: the operation failed (or was dropped without commit).
205        // Delete every file that was created during this operation (seq > seq_before).
206        match delete_orphan_files(self.path, self.seq_before) {
207            Ok(()) => *self.active.lock() = None,
208            Err(_) => *self.active.lock() = Some(ActiveWriteState::Error),
209        }
210    }
211}
212
213/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
214/// using a single write batch. It allows for concurrent reads.
215pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
216    parallel_scheduler: S,
217    /// The path to the directory where the database is stored
218    path: PathBuf,
219    /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
220    /// no modification on the database is performed.
221    read_only: bool,
222    /// The inner state of the database. Writing will update that.
223    inner: RwLock<Inner<FAMILIES>>,
224    /// A flag to indicate if the database is empty (no meta files). This is an atomic mirror of
225    /// `inner.meta_files.is_empty()` to avoid taking a lock on the hot path.
226    is_empty: AtomicBool,
227    /// Tracks whether a write operation is in progress or has permanently failed.
228    /// `None` = idle, `Some(Active)` = in progress, `Some(Error)` = permanently disabled.
229    active_write_operation: Mutex<Option<ActiveWriteState>>,
230    /// Files from superseded commits whose deletion failed (e.g. on Windows due to open memory
231    /// maps) and will be retried on the next commit or at shutdown.
232    /// Protected by `active_write_operation` (only mutated inside a write operation).
233    deferred_deletions: Mutex<Vec<DeferredDeletion>>,
234    /// A cache for decompressed key blocks. Allocated lazily on first read via
235    /// [`Self::key_block_cache`] so write-only or empty sessions never pay the cache's fixed
236    /// hash-table overhead.
237    key_block_cache: OnceLock<BlockCache>,
238    /// A cache for decompressed value blocks. Allocated lazily on first read via
239    /// [`Self::value_block_cache`]; see [`Self::key_block_cache`].
240    value_block_cache: OnceLock<BlockCache>,
241    /// Per-family configuration for file limits.
242    config: DbConfig<FAMILIES>,
243    /// Statistics for the database.
244    #[cfg(feature = "stats")]
245    stats: TrackedStats,
246}
247
248/// The inner state of the database.
249struct Inner<const FAMILIES: usize> {
250    /// The list of meta files in the database. This is used to derive the SST files.
251    meta_files: Vec<MetaFile>,
252    /// The current sequence number for the database.
253    current_sequence_number: u32,
254    /// The in progress set of hashes of keys that have been accessed.
255    /// It will be flushed onto disk (into a meta file) on next commit.
256    /// It's a dashset to allow modification while only tracking a read lock on Inner.
257    accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
258}
259
260pub struct CommitOptions {
261    new_meta_files: Vec<(u32, File)>,
262    new_sst_files: Vec<(u32, File)>,
263    new_blob_files: Vec<(u32, File)>,
264    sst_seq_numbers_to_delete: Vec<u32>,
265    blob_seq_numbers_to_delete: Vec<u32>,
266    sequence_number: u32,
267    keys_written: u64,
268}
269
270struct OpenOpts<S: ParallelScheduler, const FAMILIES: usize> {
271    path: PathBuf,
272    read_only: bool,
273    parallel_scheduler: S,
274    config: DbConfig<FAMILIES>,
275}
276
277impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
278    /// Open a TurboPersistence database at the given path.
279    /// This will read the directory and might performance cleanup when the database was not closed
280    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
281    /// files, so it's fast.
282    pub fn open(path: PathBuf) -> Result<Self> {
283        Self::open_with_parallel_scheduler(path, Default::default())
284    }
285
286    /// Open a TurboPersistence database at the given path with custom per-family configuration.
287    pub fn open_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
288        Self::open_with_config_and_parallel_scheduler(path, config, Default::default())
289    }
290
291    /// Open a TurboPersistence database at the given path in read only mode.
292    /// This will read the directory. No Cleanup is performed.
293    pub fn open_read_only_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
294        Self::open_read_only_with_parallel_scheduler(path, config, Default::default())
295    }
296
297    /// Construct an empty, read-only `TurboPersistence` that owns no on-disk state and never
298    /// touches the filesystem. Reads return None; writes bail via the existing `read_only` guard.
299    /// Used to provide a "noop" backing storage with the same concrete type as the real one.
300    pub fn empty_in_memory_with_config(config: DbConfig<FAMILIES>) -> Self {
301        // `path` is `PathBuf::new()` but never read because `meta_files` is empty and
302        // `read_only` is true (so no write/compaction path is reachable).
303        Self::new(OpenOpts {
304            path: PathBuf::new(),
305            read_only: true,
306            parallel_scheduler: Default::default(),
307            config,
308        })
309    }
310}
311
312impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
313    fn new(
314        OpenOpts {
315            path,
316            read_only,
317            parallel_scheduler,
318            config,
319        }: OpenOpts<S, FAMILIES>,
320    ) -> Self {
321        Self {
322            parallel_scheduler,
323            path,
324            read_only,
325            inner: RwLock::new(Inner {
326                meta_files: Vec::new(),
327                current_sequence_number: 0,
328                accessed_key_hashes: [(); FAMILIES]
329                    .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
330            }),
331            is_empty: AtomicBool::new(true),
332            active_write_operation: Mutex::new(None),
333            deferred_deletions: Mutex::new(Vec::new()),
334            key_block_cache: OnceLock::new(),
335            value_block_cache: OnceLock::new(),
336            config,
337            #[cfg(feature = "stats")]
338            stats: TrackedStats::default(),
339        }
340    }
341
342    /// Open a TurboPersistence database at the given path.
343    /// This will read the directory and might performance cleanup when the database was not closed
344    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
345    /// files, so it's fast.
346    pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
347        Self::open_with_config_and_parallel_scheduler(path, DbConfig::default(), parallel_scheduler)
348    }
349
350    /// Open a TurboPersistence database at the given path with custom per-family configuration.
351    pub fn open_with_config_and_parallel_scheduler(
352        path: PathBuf,
353        config: DbConfig<FAMILIES>,
354        parallel_scheduler: S,
355    ) -> Result<Self> {
356        let mut db = Self::new(OpenOpts {
357            path,
358            read_only: false,
359            parallel_scheduler,
360            config,
361        });
362        db.open_directory(false)?;
363        Ok(db)
364    }
365
366    /// Open a TurboPersistence database at the given path in read only mode.
367    /// This will read the directory. No Cleanup is performed.
368    fn open_read_only_with_parallel_scheduler(
369        path: PathBuf,
370        config: DbConfig<FAMILIES>,
371        parallel_scheduler: S,
372    ) -> Result<Self> {
373        let mut db = Self::new(OpenOpts {
374            path,
375            read_only: true,
376            parallel_scheduler,
377            config,
378        });
379        db.open_directory(false)?;
380        Ok(db)
381    }
382
383    /// Performs the initial check on the database directory.
384    fn open_directory(&mut self, read_only: bool) -> Result<()> {
385        match fs::read_dir(&self.path) {
386            Ok(entries) => {
387                if !self
388                    .load_directory(entries, read_only)
389                    .context("Loading persistence directory failed")?
390                {
391                    if read_only {
392                        bail!("Failed to open database");
393                    }
394                    self.init_directory()
395                        .context("Initializing persistence directory failed")?;
396                }
397                Ok(())
398            }
399            Err(e) => {
400                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
401                    self.create_and_init_directory()
402                        .context("Creating and initializing persistence directory failed")?;
403                    Ok(())
404                } else {
405                    Err(e).context("Failed to open database")
406                }
407            }
408        }
409    }
410
411    /// Creates the directory and initializes it.
412    fn create_and_init_directory(&mut self) -> Result<()> {
413        fs::create_dir_all(&self.path)?;
414        self.init_directory()
415    }
416
417    /// Initializes the directory by creating the CURRENT file.
418    fn init_directory(&mut self) -> Result<()> {
419        let mut current = File::create(self.path.join("CURRENT"))?;
420        current.write_u32::<BE>(0)?;
421        current.flush()?;
422        Ok(())
423    }
424
425    /// Loads an existing database directory and performs cleanup if necessary.
426    fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
427        let mut meta_files = Vec::new();
428        let mut current_file = match File::open(self.path.join("CURRENT")) {
429            Ok(file) => file,
430            Err(e) => {
431                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
432                    return Ok(false);
433                } else {
434                    return Err(e).context("Failed to open CURRENT file");
435                }
436            }
437        };
438        let current = current_file.read_u32::<BE>()?;
439        drop(current_file);
440
441        let mut deleted_files = HashSet::new();
442        for entry in entries {
443            let entry = entry?;
444            let path = entry.path();
445            if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
446                let seq: u32 = path
447                    .file_stem()
448                    .context("File has no file stem")?
449                    .to_str()
450                    .context("File stem is not valid utf-8")?
451                    .parse()?;
452                if deleted_files.contains(&seq) {
453                    continue;
454                }
455                if seq > current {
456                    if !read_only {
457                        fs::remove_file(&path)?;
458                    }
459                } else {
460                    match ext {
461                        "meta" => {
462                            meta_files.push(seq);
463                        }
464                        "del" => {
465                            let mut content = &*fs::read(&path)?;
466                            let mut no_existing_files = true;
467                            while !content.is_empty() {
468                                let seq = content.read_u32::<BE>()?;
469                                deleted_files.insert(seq);
470                                if !read_only {
471                                    // Remove the files that are marked for deletion
472                                    let sst_file = self.path.join(format!("{seq:08}.sst"));
473                                    let meta_file = self.path.join(format!("{seq:08}.meta"));
474                                    let blob_file = self.path.join(format!("{seq:08}.blob"));
475                                    for path in [sst_file, meta_file, blob_file] {
476                                        if fs::exists(&path)? {
477                                            fs::remove_file(path)?;
478                                            no_existing_files = false;
479                                        }
480                                    }
481                                }
482                            }
483                            if !read_only && no_existing_files {
484                                fs::remove_file(&path)?;
485                            }
486                        }
487                        "blob" | "sst" => {
488                            // ignore blobs and sst, they are read when needed
489                        }
490                        _ => {
491                            if !path
492                                .file_name()
493                                .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
494                            {
495                                bail!("Unexpected file in persistence directory: {:?}", path);
496                            }
497                        }
498                    }
499                }
500            } else {
501                match path.file_stem().and_then(|s| s.to_str()) {
502                    Some("CURRENT") => {
503                        // Already read
504                    }
505                    Some("LOG") => {
506                        // Ignored, write-only
507                    }
508                    _ => {
509                        if !path
510                            .file_name()
511                            .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
512                        {
513                            bail!("Unexpected file in persistence directory: {:?}", path);
514                        }
515                    }
516                }
517            }
518        }
519
520        meta_files.retain(|seq| !deleted_files.contains(seq));
521        meta_files.sort_unstable();
522        let mut meta_files = self
523            .parallel_scheduler
524            .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
525                let meta_file = MetaFile::open(&self.path, seq)?;
526                Ok(meta_file)
527            })?;
528
529        let mut sst_filter = SstFilter::new();
530        for meta_file in meta_files.iter_mut().rev() {
531            sst_filter.apply_filter(meta_file);
532        }
533
534        let inner = self.inner.get_mut();
535        self.is_empty
536            .store(meta_files.is_empty(), Ordering::Relaxed);
537        inner.meta_files = meta_files;
538        inner.current_sequence_number = current;
539        Ok(true)
540    }
541
542    /// Reads and decompresses a blob file. This is not backed by any cache.
543    #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
544    fn read_blob(&self, seq: u32) -> Result<ArcBytes> {
545        let path = self.path.join(format!("{seq:08}.blob"));
546        let file = File::open(&path)
547            .with_context(|| format!("Failed to open blob file {}", path.display()))?;
548        let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
549            format!(
550                "Failed to mmap blob file {} ({} bytes)",
551                path.display(),
552                file.metadata().map(|m| m.len()).unwrap_or(0)
553            )
554        })?;
555        #[cfg(unix)]
556        mmap.advise(memmap2::Advice::Sequential)?;
557        #[cfg(unix)]
558        mmap.advise(memmap2::Advice::WillNeed)?;
559        advise_mmap_for_persistence(&mmap)?;
560        let mut reader = &mmap[..];
561        let uncompressed_length = reader
562            .read_u32::<BE>()
563            .context("Failed to read uncompressed length from blob file")?;
564        let expected_checksum = reader.read_u32::<BE>()?;
565
566        // Verify checksum on the compressed on-disk data before decompression.
567        let actual_checksum = checksum_block(reader);
568        if actual_checksum != expected_checksum {
569            bail!(
570                "Cache corruption detected: checksum mismatch in blob file {:08}.blob (expected \
571                 {:08x}, got {:08x})",
572                seq,
573                expected_checksum,
574                actual_checksum
575            );
576        }
577
578        let buffer = decompress_into_arc(uncompressed_length, reader)?;
579        Ok(ArcBytes::from(buffer))
580    }
581
582    /// Returns true if the database is empty.
583    pub fn is_empty(&self) -> bool {
584        self.is_empty.load(Ordering::Relaxed)
585    }
586
587    /// Returns `true` if a previous write or compaction left the database in an unrecoverable error
588    /// state, permanently disabling further writes.
589    pub fn has_unrecoverable_write_error(&self) -> bool {
590        matches!(
591            *self.active_write_operation.lock(),
592            Some(ActiveWriteState::Error)
593        )
594    }
595
596    /// Acquires the write-operation slot, returning an RAII guard that rolls back and releases it
597    /// on drop. Only one write operation (write batch or compaction) is allowed at a time.
598    /// `name` is a short human-readable label used in error messages (e.g. `"write batch"`).
599    fn acquire_write_operation(&self, name: &'static str) -> Result<WriteOperationGuard<'_>> {
600        if self.read_only {
601            bail!("Cannot perform write operations on a read-only database");
602        }
603        let mut slot = self.active_write_operation.lock();
604        match &*slot {
605            Some(ActiveWriteState::Active(active_name)) => {
606                bail!(
607                    "Another {active_name} is already active (only a single write operation is \
608                     allowed at a time)"
609                );
610            }
611            Some(ActiveWriteState::Error) => {
612                bail!(
613                    "A previous write operation failed with an unrecoverable error; no further \
614                     writes are possible"
615                );
616            }
617            None => {}
618        }
619        *slot = Some(ActiveWriteState::Active(name));
620        drop(slot); // release before acquiring inner read lock
621        let seq_before = self.inner.read().current_sequence_number;
622        Ok(WriteOperationGuard {
623            active: &self.active_write_operation,
624            path: &self.path,
625            seq_before,
626            succeeded: false,
627        })
628    }
629
630    /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a
631    /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
632    /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
633    /// This data will only become visible after the WriteBatch is committed.
634    pub fn write_batch<K: StoreKey + Send + Sync>(&self) -> Result<WriteBatch<'_, K, S, FAMILIES>> {
635        let guard = self.acquire_write_operation("write batch")?;
636        // seq_before is already the current sequence number, no second read needed.
637        let current = guard.seq_before;
638        Ok(WriteBatch::new(
639            guard,
640            self.path.clone(),
641            current,
642            self.parallel_scheduler.clone(),
643            self.config.family_configs,
644        ))
645    }
646
647    fn key_block_cache(&self) -> &BlockCache {
648        self.key_block_cache.get_or_init(|| {
649            BlockCache::with(
650                KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
651                KEY_BLOCK_CACHE_SIZE,
652                Default::default(),
653                Default::default(),
654                Default::default(),
655            )
656        })
657    }
658
659    fn value_block_cache(&self) -> &BlockCache {
660        self.value_block_cache.get_or_init(|| {
661            BlockCache::with(
662                VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
663                VALUE_BLOCK_CACHE_SIZE,
664                Default::default(),
665                Default::default(),
666                Default::default(),
667            )
668        })
669    }
670
671    /// Clears all caches of the database.
672    pub fn clear_cache(&self) {
673        self.clear_block_caches();
674        for meta in self.inner.write().meta_files.iter_mut() {
675            meta.clear_cache();
676        }
677    }
678
679    /// Clears block caches of the database. Caches that have not been allocated yet are left
680    /// uninitialized, so clearing never forces allocation.
681    pub fn clear_block_caches(&self) {
682        if let Some(cache) = self.key_block_cache.get() {
683            cache.clear();
684        }
685        if let Some(cache) = self.value_block_cache.get() {
686            cache.clear();
687        }
688    }
689
690    /// Prefetches all SST files which are usually lazy loaded. This can be used to reduce latency
691    /// for the first queries after opening the database.
692    pub fn prepare_all_sst_caches(&self) {
693        for meta in self.inner.write().meta_files.iter_mut() {
694            meta.prepare_sst_cache();
695        }
696    }
697
698    fn open_log(&self) -> Result<BufWriter<File>> {
699        if self.read_only {
700            unreachable!("Only write operations can open the log file");
701        }
702        let log_path = self.path.join("LOG");
703        let log_file = OpenOptions::new()
704            .create(true)
705            .append(true)
706            .open(log_path)?;
707        Ok(BufWriter::new(log_file))
708    }
709
710    /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
711    /// visible to readers.
712    pub fn commit_write_batch<K: StoreKey + Send + Sync>(
713        &self,
714        mut write_batch: WriteBatch<'_, K, S, FAMILIES>,
715    ) -> Result<()> {
716        if self.read_only {
717            unreachable!("It's not possible to create a write batch for a read-only database");
718        }
719        let FinishResult {
720            sequence_number,
721            new_meta_files,
722            new_sst_files,
723            new_blob_files,
724            keys_written,
725        } = write_batch.finish(|family| {
726            let inner = self.inner.read();
727            let set = &inner.accessed_key_hashes[family as usize];
728            // len is only a snapshot at that time and it can change while we create the filter.
729            // So we give it 5% more space to make resizes less likely.
730            let initial_capacity = set.len() * 20 / 19;
731            // TODO: Using u64::BITS as fingerprint size is wasteful for a
732            // probabilistic membership filter. A smaller fingerprint (e.g. via
733            // Filter::new with a target fp_rate) would significantly reduce size,
734            // but would make merging slower since mismatched fingerprint sizes
735            // fall back to one-by-one insertion instead of sorted merge.
736            let mut amqf =
737                qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
738                    .unwrap();
739            // This drains items from the set. But due to concurrency it might not be empty
740            // afterwards, but that's fine. It will be part of the next commit.
741            set.retain(|hash| {
742                // Performance-wise it would usually be better to insert sorted fingerprints, but we
743                // assume that hashes are equally distributed, which makes it unnecessary.
744                // Good for cache locality is that we insert in the order of the dashset's buckets.
745                amqf.insert_fingerprint(false, *hash)
746                    .expect("Failed to insert fingerprint");
747                false
748            });
749            amqf
750        })?;
751        self.commit(CommitOptions {
752            new_meta_files,
753            new_sst_files,
754            new_blob_files,
755            sst_seq_numbers_to_delete: vec![],
756            blob_seq_numbers_to_delete: vec![],
757            sequence_number,
758            keys_written,
759        })?;
760        // Mark the guard inside the write batch as succeeded so it skips the rollback on drop.
761        write_batch.mark_succeeded();
762        Ok(())
763    }
764
765    /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the
766    /// new files.
767    fn commit(
768        &self,
769        CommitOptions {
770            mut new_meta_files,
771            new_sst_files,
772            new_blob_files,
773            mut sst_seq_numbers_to_delete,
774            mut blob_seq_numbers_to_delete,
775            sequence_number: mut seq,
776            keys_written,
777        }: CommitOptions,
778    ) -> Result<(), anyhow::Error> {
779        let time = Timestamp::now();
780
781        new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
782
783        let sync_span = tracing::trace_span!("sync new files").entered();
784
785        enum SyncItem {
786            Meta(u32, File),
787            Sst(File),
788            Blob(u32, File),
789        }
790        enum SyncResult {
791            Meta(MetaFile),
792            Sst,
793            Blob(u32, File),
794        }
795
796        let mut sync_items: Vec<SyncItem> =
797            Vec::with_capacity(new_meta_files.len() + new_sst_files.len() + new_blob_files.len());
798        for (seq, file) in new_meta_files {
799            sync_items.push(SyncItem::Meta(seq, file));
800        }
801        for (_, file) in new_sst_files {
802            sync_items.push(SyncItem::Sst(file));
803        }
804        for (seq, file) in new_blob_files {
805            sync_items.push(SyncItem::Blob(seq, file));
806        }
807
808        let results: Vec<SyncResult> = self
809            .parallel_scheduler
810            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(sync_items, |item| match item {
811                SyncItem::Meta(seq, file) => {
812                    file.sync_data()?;
813                    let meta_file = MetaFile::open(&self.path, seq)?;
814                    Ok(SyncResult::Meta(meta_file))
815                }
816                SyncItem::Sst(file) => {
817                    file.sync_data()?;
818                    Ok(SyncResult::Sst)
819                }
820                SyncItem::Blob(seq, file) => {
821                    file.sync_data()?;
822                    Ok(SyncResult::Blob(seq, file))
823                }
824            })?;
825
826        let mut new_meta_files: Vec<MetaFile> = Vec::new();
827        let mut new_blob_files: Vec<(u32, File)> = Vec::new();
828        for result in results {
829            match result {
830                SyncResult::Meta(mf) => new_meta_files.push(mf),
831                SyncResult::Sst => {}
832                SyncResult::Blob(seq, file) => new_blob_files.push((seq, file)),
833            }
834        }
835
836        let mut sst_filter = SstFilter::new();
837        for meta_file in new_meta_files.iter_mut().rev() {
838            sst_filter.apply_filter(meta_file);
839        }
840
841        // Sync the directory to ensure the new directory entries (file name → inode mappings)
842        // are durable before we update CURRENT. Without this, a crash could leave CURRENT pointing
843        // to files whose directory entries were lost even though their data was flushed.
844        File::open(&self.path)?.sync_data()?;
845        drop(sync_span);
846
847        let new_meta_info = new_meta_files
848            .iter()
849            .map(|meta| {
850                let ssts = meta
851                    .entries()
852                    .iter()
853                    .map(|entry| {
854                        let seq = entry.sequence_number();
855                        let range = entry.range();
856                        let size = entry.size();
857                        let flags = entry.flags();
858                        (seq, range.min_hash, range.max_hash, size, flags)
859                    })
860                    .collect::<Vec<_>>();
861                (
862                    meta.sequence_number(),
863                    meta.family(),
864                    ssts,
865                    meta.obsolete_sst_files().to_vec(),
866                )
867            })
868            .collect::<Vec<_>>();
869
870        // ── Phase A: compute what will change without modifying inner. ──
871        //
872        // We need `meta_seq_numbers_to_delete` and `has_delete_file` to write
873        // the .del file BEFORE writing CURRENT. We must not modify `inner` at
874        // all — if a disk error occurs before CURRENT is durable, the
875        // WriteOperationGuard rollback can only clean up orphan files, not undo
876        // in-memory mutations. The MetaFile in-memory optimization
877        // (retain_entries) is deferred to Phase C.
878        let has_delete_file;
879        let mut meta_seq_numbers_to_delete = Vec::new();
880        let entries_to_remove;
881
882        {
883            let inner = self.inner.read();
884
885            // (A1) Run the SST filter on existing meta files. This only
886            // updates the SstFilter state — the MetaFile in-memory layout is
887            // not modified yet (that happens in Phase C via retain_entries).
888            // Collects the set of SST entry sequence numbers to remove from
889            // each meta file, keyed by position in `inner.meta_files`.
890            entries_to_remove = inner
891                .meta_files
892                .iter()
893                .rev()
894                .map(|meta_file| sst_filter.apply_filter_collect(meta_file))
895                .collect::<Vec<_>>();
896
897            // (A2) Determine which meta files are fully obsolete by running
898            // `apply_and_get_remove` in newest-first order. Process new metas
899            // first (they are newer than existing ones) to advance the filter
900            // state, then existing ones. New metas are never candidates for
901            // removal (just created), so only their filter-state side-effects
902            // matter.
903            for meta_file in new_meta_files.iter().rev() {
904                let should_remove = sst_filter.apply_and_get_remove(meta_file);
905                debug_assert!(
906                    !should_remove,
907                    "newly created meta file should never be a candidate for removal"
908                );
909            }
910            for i in (0..inner.meta_files.len()).rev() {
911                if sst_filter.apply_and_get_remove(&inner.meta_files[i]) {
912                    meta_seq_numbers_to_delete.push(inner.meta_files[i].sequence_number());
913                }
914            }
915
916            // (A3) Compute the final sequence number that will be written to
917            // CURRENT. A .del file is created only when there are files to
918            // delete, which consumes one extra sequence number.
919            has_delete_file = !sst_seq_numbers_to_delete.is_empty()
920                || !blob_seq_numbers_to_delete.is_empty()
921                || !meta_seq_numbers_to_delete.is_empty();
922        }
923        if has_delete_file {
924            seq += 1;
925        }
926
927        self.parallel_scheduler.block_in_place(|| {
928            if has_delete_file {
929                sst_seq_numbers_to_delete.sort_unstable();
930                meta_seq_numbers_to_delete.sort_unstable();
931                blob_seq_numbers_to_delete.sort_unstable();
932                // Write *.del file, marking the selected files as to delete
933                let mut buf = Vec::with_capacity(
934                    (sst_seq_numbers_to_delete.len()
935                        + meta_seq_numbers_to_delete.len()
936                        + blob_seq_numbers_to_delete.len())
937                        * size_of::<u32>(),
938                );
939                for seq in sst_seq_numbers_to_delete.iter() {
940                    buf.write_u32::<BE>(*seq)?;
941                }
942                for seq in meta_seq_numbers_to_delete.iter() {
943                    buf.write_u32::<BE>(*seq)?;
944                }
945                for seq in blob_seq_numbers_to_delete.iter() {
946                    buf.write_u32::<BE>(*seq)?;
947                }
948                let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
949                file.write_all(&buf)?;
950                file.sync_data()?;
951            }
952
953            let mut current_file = OpenOptions::new()
954                .write(true)
955                .truncate(false)
956                .read(false)
957                .open(self.path.join("CURRENT"))?;
958            current_file.write_u32::<BE>(seq)?;
959            current_file.sync_data()?;
960
961            // ── Point of no return ──────────────────────────────────────────
962            //
963            // CURRENT has been durably updated. The commit is now visible to
964            // future readers (including after a crash/restart via
965            // `load_directory`). Everything below is best-effort cleanup:
966            //
967            // • Writing the LOG is purely informational.
968            //
969            // • Superseded files are NOT deleted here — Phase C handles that
970            //   after `inner` is updated. On Linux/macOS they are deleted
971            //   immediately; on Windows (where open memory maps prevent
972            //   deletion) they are retried on the next commit or shutdown.
973            //
974            // Errors here must NOT propagate, because the WriteOperationGuard
975            // would then run its rollback and delete the *newly committed*
976            // files, corrupting the database.
977
978            if let Err(e) = (|| {
979                let mut log = self.open_log()?;
980                writeln!(log, "Time {time}")?;
981                let span = time.until(Timestamp::now())?;
982                writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
983                writeln!(log, "FAM | META SEQ | SST SEQ         | RANGE")?;
984                for (meta_seq, family, ssts, obsolete) in new_meta_info {
985                    for (seq, min, max, size, flags) in ssts {
986                        writeln!(
987                            log,
988                            "{family:3} | {meta_seq:08} | {seq:08} SST    | {} ({} MiB, {})",
989                            range_to_str(min, max),
990                            size / 1024 / 1024,
991                            flags
992                        )?;
993                    }
994                    for obsolete in obsolete.chunks(15) {
995                        write!(log, "{family:3} | {meta_seq:08} |")?;
996                        for seq in obsolete {
997                            write!(log, " {seq:08}")?;
998                        }
999                        writeln!(log, " OBSOLETE SST")?;
1000                    }
1001                }
1002
1003                fn write_seq_numbers<W: std::io::Write, T>(
1004                    log: &mut W,
1005                    items: &[T],
1006                    label: &str,
1007                    extract_seq: fn(&T) -> u32,
1008                ) -> std::io::Result<()> {
1009                    for chunk in items.chunks(15) {
1010                        write!(log, "    |          |")?;
1011                        for item in chunk {
1012                            write!(log, " {:08}", extract_seq(item))?;
1013                        }
1014                        writeln!(log, " {}", label)?;
1015                    }
1016                    Ok(())
1017                }
1018
1019                new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
1020                write_seq_numbers(&mut log, &new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
1021                write_seq_numbers(
1022                    &mut log,
1023                    &blob_seq_numbers_to_delete,
1024                    "BLOB DELETED",
1025                    |&seq| seq,
1026                )?;
1027                write_seq_numbers(
1028                    &mut log,
1029                    &sst_seq_numbers_to_delete,
1030                    "SST DELETED",
1031                    |&seq| seq,
1032                )?;
1033                write_seq_numbers(
1034                    &mut log,
1035                    &meta_seq_numbers_to_delete,
1036                    "META DELETED",
1037                    |&seq| seq,
1038                )?;
1039                anyhow::Ok(())
1040            })() {
1041                eprintln!("turbo-persistence: failed to write LOG after commit {seq:08}: {e:#}");
1042            }
1043
1044            anyhow::Ok(())
1045        })?;
1046
1047        // ── Phase C: structurally update inner (CURRENT is already durable). ──
1048        //
1049        // Between Phase A's read-lock drop and this point no other writer can
1050        // run (WriteOperationGuard ensures exclusivity) and readers never mutate
1051        // inner, so the snapshot from Phase A is still valid.
1052        {
1053            let mut inner = self.inner.write();
1054
1055            // Apply the deferred MetaFile mutations from Phase A1. apply_filter
1056            // was called read-only earlier; now we actually move superseded
1057            // entries from active to obsolete inside each MetaFile.
1058            // entries_to_remove was collected in reverse order, so iterate it
1059            // in reverse to match the forward order of inner.meta_files.
1060            for (meta_file, to_remove) in inner
1061                .meta_files
1062                .iter_mut()
1063                .zip(entries_to_remove.into_iter().rev())
1064            {
1065                if !to_remove.is_empty() {
1066                    meta_file.retain_entries(|seq| !to_remove.contains(&seq));
1067                }
1068            }
1069
1070            inner.meta_files.append(&mut new_meta_files);
1071            if !meta_seq_numbers_to_delete.is_empty() {
1072                let to_delete: HashSet<u32> = meta_seq_numbers_to_delete.iter().copied().collect();
1073                inner
1074                    .meta_files
1075                    .retain(|meta| !to_delete.contains(&meta.sequence_number()));
1076            }
1077            inner.current_sequence_number = seq;
1078            self.is_empty
1079                .store(inner.meta_files.is_empty(), Ordering::Relaxed);
1080        }
1081
1082        // Try to delete superseded files immediately. On Linux/macOS this always
1083        // works even if readers have the files memory-mapped. On Windows, open
1084        // memory maps prevent deletion; any file that fails is kept in
1085        // `deferred_deletions` and retried on the next commit or at shutdown.
1086        self.deferred_deletions.lock().extend(
1087            Self::try_delete_files(&self.path, &sst_seq_numbers_to_delete, "sst")
1088                .map(DeferredDeletion::Sst)
1089                .chain(
1090                    Self::try_delete_files(&self.path, &meta_seq_numbers_to_delete, "meta")
1091                        .map(DeferredDeletion::Meta),
1092                )
1093                .chain(
1094                    Self::try_delete_files(&self.path, &blob_seq_numbers_to_delete, "blob")
1095                        .map(DeferredDeletion::Blob),
1096                ),
1097        );
1098
1099        // Retry any deletions that failed in earlier commits.
1100        self.retry_deferred_deletions();
1101
1102        // Best-effort verbose log of the new database state after Phase C.
1103        #[cfg(feature = "verbose_log")]
1104        {
1105            let _: Result<(), _> = (|| -> anyhow::Result<()> {
1106                let mut log = self.open_log()?;
1107                writeln!(log, "New database state:")?;
1108                writeln!(log, "FAM | META SEQ | SST SEQ  FLAGS | RANGE")?;
1109                let inner = self.inner.read();
1110                let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
1111                    let mut set = HashSet::new();
1112                    move |family| set.insert(*family)
1113                });
1114                for family in families {
1115                    for meta in inner.meta_files.iter() {
1116                        if meta.family() != family {
1117                            continue;
1118                        }
1119                        let meta_seq = meta.sequence_number();
1120                        for entry in meta.entries().iter() {
1121                            let seq = entry.sequence_number();
1122                            let range = entry.range();
1123                            writeln!(
1124                                log,
1125                                "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
1126                                entry.flags(),
1127                                range_to_str(range.min_hash, range.max_hash)
1128                            )?;
1129                        }
1130                    }
1131                }
1132                Ok(())
1133            })();
1134        }
1135
1136        Ok(())
1137    }
1138
1139    /// Runs a full compaction on the database. This will rewrite all SST files, removing all
1140    /// duplicate keys and separating all key ranges into unique files.
1141    pub fn full_compact(&self) -> Result<()> {
1142        self.compact(&CompactConfig {
1143            min_merge_count: 2,
1144            optimal_merge_count: usize::MAX,
1145            max_merge_count: usize::MAX,
1146            max_merge_bytes: u64::MAX,
1147            min_merge_duplication_bytes: 0,
1148            optimal_merge_duplication_bytes: u64::MAX,
1149            max_merge_segment_count: usize::MAX,
1150        })?;
1151        Ok(())
1152    }
1153
1154    /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
1155    /// files is above the given threshold. The coverage is the average number of SST files that
1156    /// need to be read to find a key. It also limits the maximum number of SST files that are
1157    /// merged at once, which is the main factor for the runtime of the compaction.
1158    pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
1159        let mut guard = self.acquire_write_operation("compaction")?;
1160
1161        // Free block caches and SST mmaps before compaction. The block caches
1162        // are not used during compaction (we iterate uncached), and any cached
1163        // SST mmaps would use MADV_RANDOM which is wrong for sequential scans.
1164        // Clearing them upfront frees memory for the merge work.
1165        self.clear_cache();
1166
1167        let mut sequence_number;
1168        let mut new_meta_files = Vec::new();
1169        let mut new_sst_files = Vec::new();
1170        let mut sst_seq_numbers_to_delete = Vec::new();
1171        let mut blob_seq_numbers_to_delete = Vec::new();
1172        let mut keys_written = 0;
1173
1174        {
1175            let inner = self.inner.read();
1176            sequence_number = AtomicU32::new(inner.current_sequence_number);
1177            self.compact_internal(
1178                &inner.meta_files,
1179                &sequence_number,
1180                &mut new_meta_files,
1181                &mut new_sst_files,
1182                &mut sst_seq_numbers_to_delete,
1183                &mut blob_seq_numbers_to_delete,
1184                &mut keys_written,
1185                compact_config,
1186            )
1187            .context("Failed to compact database")?;
1188        }
1189
1190        let has_changes = !new_meta_files.is_empty();
1191        if has_changes {
1192            self.commit(CommitOptions {
1193                new_meta_files,
1194                new_sst_files,
1195                new_blob_files: Vec::new(),
1196                sst_seq_numbers_to_delete,
1197                blob_seq_numbers_to_delete,
1198                sequence_number: *sequence_number.get_mut(),
1199                keys_written,
1200            })
1201            .context("Failed to commit the database compaction")?;
1202        }
1203
1204        guard.success();
1205        Ok(has_changes)
1206    }
1207
1208    /// Internal function to perform a compaction.
1209    fn compact_internal(
1210        &self,
1211        meta_files: &[MetaFile],
1212        sequence_number: &AtomicU32,
1213        new_meta_files: &mut Vec<(u32, File)>,
1214        new_sst_files: &mut Vec<(u32, File)>,
1215        sst_seq_numbers_to_delete: &mut Vec<u32>,
1216        blob_seq_numbers_to_delete: &mut Vec<u32>,
1217        keys_written: &mut u64,
1218        compact_config: &CompactConfig,
1219    ) -> Result<()> {
1220        if meta_files.is_empty() {
1221            return Ok(());
1222        }
1223
1224        struct SstWithRange {
1225            meta_index: usize,
1226            index_in_meta: u32,
1227            seq: u32,
1228            range: StaticSortedFileRange,
1229            size: u64,
1230            flags: MetaEntryFlags,
1231        }
1232
1233        impl Compactable for SstWithRange {
1234            fn range(&self) -> RangeInclusive<u64> {
1235                self.range.min_hash..=self.range.max_hash
1236            }
1237
1238            fn size(&self) -> u64 {
1239                self.size
1240            }
1241
1242            fn category(&self) -> u8 {
1243                // Cold and non-cold files are placed separately so we pass different category
1244                // values to ensure they are not merged together.
1245                if self.flags.cold() { 1 } else { 0 }
1246            }
1247        }
1248
1249        let ssts_with_ranges = meta_files
1250            .iter()
1251            .enumerate()
1252            .flat_map(|(meta_index, meta)| {
1253                meta.entries()
1254                    .iter()
1255                    .enumerate()
1256                    .map(move |(index_in_meta, entry)| SstWithRange {
1257                        meta_index,
1258                        index_in_meta: index_in_meta as u32,
1259                        seq: entry.sequence_number(),
1260                        range: entry.range(),
1261                        size: entry.size(),
1262                        flags: entry.flags(),
1263                    })
1264            })
1265            .collect::<Vec<_>>();
1266
1267        let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
1268
1269        for sst in ssts_with_ranges {
1270            sst_by_family[sst.range.family as usize].push(sst);
1271        }
1272
1273        let path = &self.path;
1274
1275        let log_mutex = Mutex::new(());
1276
1277        struct PartialResultPerFamily {
1278            new_meta_file: Option<(u32, File)>,
1279            new_sst_files: Vec<(u32, File)>,
1280            sst_seq_numbers_to_delete: Vec<u32>,
1281            blob_seq_numbers_to_delete: Vec<u32>,
1282            keys_written: u64,
1283        }
1284
1285        let mut compact_config = compact_config.clone();
1286        let merge_jobs = sst_by_family
1287            .into_iter()
1288            .enumerate()
1289            .filter_map(|(family, ssts_with_ranges)| {
1290                if compact_config.max_merge_segment_count == 0 {
1291                    return None;
1292                }
1293                let (merge_jobs, real_merge_job_size) =
1294                    get_merge_segments(&ssts_with_ranges, &compact_config);
1295                compact_config.max_merge_segment_count -= real_merge_job_size;
1296                Some((family, ssts_with_ranges, merge_jobs))
1297            })
1298            .collect::<Vec<_>>();
1299
1300        let result = self
1301            .parallel_scheduler
1302            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
1303                merge_jobs,
1304                |(family, ssts_with_ranges, merge_jobs)| {
1305                    let family = family as u32;
1306
1307                    if merge_jobs.is_empty() {
1308                        return Ok(PartialResultPerFamily {
1309                            new_meta_file: None,
1310                            new_sst_files: Vec::new(),
1311                            sst_seq_numbers_to_delete: Vec::new(),
1312                            blob_seq_numbers_to_delete: Vec::new(),
1313                            keys_written: 0,
1314                        });
1315                    }
1316
1317                    // Deserialize and merge used key hash filters per-family into
1318                    // a single filter. This avoids O(entries × N) filter probes
1319                    // during the merge loop. Empty filters (from commits with no
1320                    // reads) are discarded.
1321                    let used_key_hashes: Option<qfilter::Filter> = {
1322                        let filters: Vec<qfilter::FilterRef<'_>> = meta_files
1323                            .iter()
1324                            .filter(|m| m.family() == family)
1325                            .filter_map(|meta_file| {
1326                                meta_file.deserialize_used_key_hashes_amqf().transpose()
1327                            })
1328                            .collect::<Result<Vec<_>>>()?
1329                            .into_iter()
1330                            .filter(|amqf| !amqf.is_empty())
1331                            .collect();
1332                        if filters.is_empty() {
1333                            None
1334                        } else if filters.len() == 1 {
1335                            // Just directly use the single item
1336                            Some(filters[0].to_owned())
1337                        } else {
1338                            let total_len: u64 = filters.iter().map(|f| f.len()).sum();
1339                            // Fingerprint size must match the source filters to
1340                            // enable the efficient sorted merge path in qfilter.
1341                            let mut merged =
1342                                qfilter::Filter::with_fingerprint_size(total_len, u64::BITS as u8)
1343                                    .expect("Failed to create merged AMQF filter");
1344                            for filter in &filters {
1345                                merged
1346                                    .merge(false, filter)
1347                                    .expect("Failed to merge AMQF filters");
1348                            }
1349                            merged.shrink_to_fit();
1350                            Some(merged)
1351                        }
1352                    };
1353
1354                    // Later we will remove the merged files
1355                    let sst_seq_numbers_to_delete = merge_jobs
1356                        .iter()
1357                        .filter(|l| l.len() > 1)
1358                        .flat_map(|l| l.iter().copied())
1359                        .map(|index| ssts_with_ranges[index].seq)
1360                        .collect::<Vec<_>>();
1361
1362                    // Merge SST files
1363                    let span = tracing::trace_span!(
1364                        "merge files",
1365                        family = self.config.family_configs[family as usize].name
1366                    );
1367                    enum PartialMergeResult<'l> {
1368                        Merged {
1369                            new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1370                            blob_seq_numbers_to_delete: Vec<u32>,
1371                            keys_written: u64,
1372                            indices: SmallVec<[usize; 1]>,
1373                        },
1374                        Move {
1375                            seq: u32,
1376                            meta: StaticSortedFileBuilderMeta<'l>,
1377                        },
1378                    }
1379                    let merge_result = self
1380                        .parallel_scheduler
1381                        .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
1382                            let _span = span.clone().entered();
1383                            if indices.len() == 1 {
1384                                // If we only have one file, we can just move it
1385                                let index = indices[0];
1386                                let meta_index = ssts_with_ranges[index].meta_index;
1387                                let index_in_meta = ssts_with_ranges[index].index_in_meta;
1388                                let meta_file = &meta_files[meta_index];
1389                                let entry = meta_file.entry(index_in_meta);
1390                                let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
1391                                let meta = StaticSortedFileBuilderMeta {
1392                                    min_hash: entry.min_hash(),
1393                                    max_hash: entry.max_hash(),
1394                                    amqf,
1395                                    block_count: entry.block_count(),
1396                                    size: entry.size(),
1397                                    flags: entry.flags(),
1398                                    entries: 0,
1399                                };
1400                                return Ok(PartialMergeResult::Move {
1401                                    seq: entry.sequence_number(),
1402                                    meta,
1403                                });
1404                            }
1405
1406                            // Open SST files independently for compaction.
1407                            // Uses MADV_SEQUENTIAL for better OS page management
1408                            // and avoids caching mmaps on MetaEntry's OnceLock.
1409                            let iters = indices
1410                                .iter()
1411                                .map(|&index| {
1412                                    let meta_index = ssts_with_ranges[index].meta_index;
1413                                    let index_in_meta = ssts_with_ranges[index].index_in_meta;
1414                                    let entry = meta_files[meta_index].entry(index_in_meta);
1415                                    StaticSortedFileIter::open(path, entry.sst_metadata())
1416                                })
1417                                .collect::<Result<Vec<_>>>()?;
1418
1419                            let iter = MergeIter::new(iters.into_iter())?;
1420
1421                            let mut blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1422
1423                            struct Collector {
1424                                /// The active writer and its sequence number. `None` if no
1425                                /// entries have been added since the last flush. We defer
1426                                /// allocation to avoid creating empty SST files for collectors
1427                                /// that receive no entries (e.g., the unused_collector when
1428                                /// all keys are in the
1429                                /// used set).
1430                                writer: Option<(u32, StreamingSstWriter<LookupEntry>)>,
1431                                flags: MetaEntryFlags,
1432                                new_sst_files:
1433                                    Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1434                                /// Hash of the last key added. Used to ensure we only split
1435                                /// SST files at key boundaries (not mid-key-group for MultiValue).
1436                                last_hash: Option<u64>,
1437                            }
1438                            impl Collector {
1439                                fn new(flags: MetaEntryFlags) -> Self {
1440                                    Self {
1441                                        writer: None,
1442                                        flags,
1443                                        new_sst_files: Vec::new(),
1444                                        last_hash: None,
1445                                    }
1446                                }
1447
1448                                /// Ensures a writer is open, creating one if needed.
1449                                fn ensure_writer(
1450                                    &mut self,
1451                                    path: &Path,
1452                                    sequence_number: &AtomicU32,
1453                                ) -> Result<&mut StreamingSstWriter<LookupEntry>>
1454                                {
1455                                    if self.writer.is_none() {
1456                                        let seq =
1457                                            sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1458                                        let sst_path = path.join(format!("{seq:08}.sst"));
1459                                        let writer = StreamingSstWriter::new(
1460                                            &sst_path,
1461                                            self.flags,
1462                                            MAX_ENTRIES_PER_COMPACTED_FILE as u64,
1463                                        )?;
1464                                        self.writer = Some((seq, writer));
1465                                    }
1466                                    Ok(&mut self.writer.as_mut().unwrap().1)
1467                                }
1468
1469                                /// Closes the current SST file (flushing remaining blocks and
1470                                /// writing the index) and records it in the completed files
1471                                /// list.
1472                                fn close_sst_file(&mut self, keys_written: &mut u64) -> Result<()> {
1473                                    if let Some((seq, writer)) = self.writer.take() {
1474                                        let _span =
1475                                            tracing::trace_span!("close merged sst file").entered();
1476                                        let (meta, file) = writer.close()?;
1477                                        *keys_written += meta.entries;
1478                                        self.new_sst_files.push((seq, file, meta));
1479                                    }
1480                                    Ok(())
1481                                }
1482
1483                                /// Adds an entry to the collector. Only splits the SST file at
1484                                /// key boundaries to avoid breaking key groups for MultiValue
1485                                /// families.
1486                                fn add_entry(
1487                                    &mut self,
1488                                    entry: LookupEntry,
1489                                    path: &Path,
1490                                    sequence_number: &AtomicU32,
1491                                    keys_written: &mut u64,
1492                                ) -> Result<()> {
1493                                    let key_changed = self.last_hash != Some(entry.hash);
1494                                    // Only check fullness at key boundaries to avoid splitting
1495                                    // a key group across two SST files.
1496                                    if key_changed
1497                                        && let Some((_, ref writer)) = self.writer
1498                                        && writer.is_full(
1499                                            MAX_ENTRIES_PER_COMPACTED_FILE,
1500                                            DATA_THRESHOLD_PER_COMPACTED_FILE,
1501                                        )
1502                                    {
1503                                        self.close_sst_file(keys_written)?;
1504                                    }
1505                                    self.last_hash = Some(entry.hash);
1506                                    let writer = self.ensure_writer(path, sequence_number)?;
1507                                    writer.add(entry)?;
1508                                    Ok(())
1509                                }
1510                            }
1511                            #[cfg(debug_assertions)]
1512                            impl Drop for Collector {
1513                                fn drop(&mut self) {
1514                                    if !std::thread::panicking() {
1515                                        assert!(
1516                                            self.writer.is_none(),
1517                                            "Collector dropped with an open writer"
1518                                        );
1519                                    }
1520                                }
1521                            }
1522                            let mut used_collector = Collector::new(MetaEntryFlags::WARM);
1523                            let mut unused_collector = Collector::new(MetaEntryFlags::COLD);
1524                            let mut current_key: Option<RcBytes> = None;
1525                            let mut keys_written = 0;
1526
1527                            // MergeIter yields entries from newer SSTs first (by SST sequence
1528                            // number). Within each SST, tombstones sort last within key groups.
1529                            // Use a skip flag to handle:
1530                            // - SingleValue: skip all older entries after writing the first
1531                            // - MultiValue: skip all older entries after encountering a tombstone
1532                            //   (which signals deletion of all prior values for this key)
1533                            let mut skip_remaining_for_this_key = false;
1534                            let family_config = &self.config.family_configs[family as usize];
1535
1536                            for entry in iter {
1537                                let entry = entry?;
1538                                if current_key.as_ref() != Some(&entry.key) {
1539                                    // we changed keys so undo this flag
1540                                    skip_remaining_for_this_key = false;
1541                                    current_key = Some(entry.key.clone());
1542                                }
1543                                if !skip_remaining_for_this_key {
1544                                    let is_used = used_key_hashes
1545                                        .as_ref()
1546                                        .is_some_and(|amqf| amqf.contains_fingerprint(entry.hash));
1547                                    let collector = if is_used {
1548                                        &mut used_collector
1549                                    } else {
1550                                        &mut unused_collector
1551                                    };
1552                                    match family_config.kind {
1553                                        FamilyKind::MultiValue => {
1554                                            // For MultiValue families we only skip remaining if we
1555                                            // see a tombstone
1556                                            if matches!(entry.value, IterValue::Deleted) {
1557                                                skip_remaining_for_this_key = true;
1558                                            }
1559                                        }
1560                                        FamilyKind::SingleValue => {
1561                                            // Since MergeItr is in newest to oldest order anything
1562                                            // else that comes out must be skipped
1563                                            skip_remaining_for_this_key = true;
1564                                        }
1565                                    }
1566                                    collector.add_entry(
1567                                        entry,
1568                                        path,
1569                                        sequence_number,
1570                                        &mut keys_written,
1571                                    )?;
1572                                } else {
1573                                    // Entry is being dropped (superseded by newer entry or
1574                                    // pruned by tombstone). If it references a blob file,
1575                                    // mark that blob for deletion.
1576                                    if let IterValue::Blob { sequence_number } = &entry.value {
1577                                        blob_seq_numbers_to_delete.push(*sequence_number);
1578                                    }
1579                                }
1580                            }
1581
1582                            // Close remaining writers
1583                            used_collector.close_sst_file(&mut keys_written)?;
1584                            unused_collector.close_sst_file(&mut keys_written)?;
1585
1586                            let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1587                            new_sst_files.append(&mut used_collector.new_sst_files);
1588                            Ok(PartialMergeResult::Merged {
1589                                new_sst_files,
1590                                blob_seq_numbers_to_delete,
1591                                keys_written,
1592                                indices,
1593                            })
1594                        })
1595                        .with_context(|| {
1596                            format!("Failed to merge database files for family {family}")
1597                        })?;
1598
1599                    let Some((sst_files_len, blob_delete_len)) = merge_result
1600                        .iter()
1601                        .map(|r| {
1602                            if let PartialMergeResult::Merged {
1603                                new_sst_files,
1604                                blob_seq_numbers_to_delete,
1605                                indices: _,
1606                                keys_written: _,
1607                            } = r
1608                            {
1609                                (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1610                            } else {
1611                                (0, 0)
1612                            }
1613                        })
1614                        .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1615                    else {
1616                        unreachable!()
1617                    };
1618
1619                    let mut new_sst_files = Vec::with_capacity(sst_files_len);
1620                    let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1621
1622                    let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1623                    let mut meta_file_builder = MetaFileBuilder::new(family);
1624
1625                    let mut keys_written = 0;
1626                    self.parallel_scheduler.block_in_place(|| {
1627                        let guard = log_mutex.lock();
1628                        let mut log = self.open_log()?;
1629                        writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1630                        for result in merge_result {
1631                            match result {
1632                                PartialMergeResult::Merged {
1633                                    new_sst_files: merged_new_sst_files,
1634                                    blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1635                                    keys_written: merged_keys_written,
1636                                    indices,
1637                                } => {
1638                                    writeln!(
1639                                        log,
1640                                        "{family:3} | {meta_seq:08} | MERGE \
1641                                         ({merged_keys_written} keys):"
1642                                    )?;
1643                                    for i in indices.iter() {
1644                                        let seq = ssts_with_ranges[*i].seq;
1645                                        let (min, max) = ssts_with_ranges[*i].range().into_inner();
1646                                        writeln!(
1647                                            log,
1648                                            "{family:3} | {meta_seq:08} | {seq:08} INPUT  | {}",
1649                                            range_to_str(min, max)
1650                                        )?;
1651                                    }
1652                                    for (seq, file, meta) in merged_new_sst_files {
1653                                        let min = meta.min_hash;
1654                                        let max = meta.max_hash;
1655                                        writeln!(
1656                                            log,
1657                                            "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1658                                             ({})",
1659                                            range_to_str(min, max),
1660                                            meta.flags
1661                                        )?;
1662
1663                                        meta_file_builder.add(seq, meta);
1664                                        new_sst_files.push((seq, file));
1665                                    }
1666                                    blob_seq_numbers_to_delete
1667                                        .extend(merged_blob_seq_numbers_to_delete);
1668                                    keys_written += merged_keys_written;
1669                                }
1670                                PartialMergeResult::Move { seq, meta } => {
1671                                    let min = meta.min_hash;
1672                                    let max = meta.max_hash;
1673                                    writeln!(
1674                                        log,
1675                                        "{family:3} | {meta_seq:08} | {seq:08} MOVED  | {}",
1676                                        range_to_str(min, max)
1677                                    )?;
1678
1679                                    meta_file_builder.add(seq, meta);
1680                                }
1681                            }
1682                        }
1683                        drop(log);
1684                        drop(guard);
1685
1686                        anyhow::Ok(())
1687                    })?;
1688
1689                    for &seq in sst_seq_numbers_to_delete.iter() {
1690                        meta_file_builder.add_obsolete_sst_file(seq);
1691                    }
1692
1693                    let meta_file = {
1694                        let _span = tracing::trace_span!("write meta file").entered();
1695                        self.parallel_scheduler
1696                            .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1697                    };
1698
1699                    Ok(PartialResultPerFamily {
1700                        new_meta_file: Some((meta_seq, meta_file)),
1701                        new_sst_files,
1702                        sst_seq_numbers_to_delete,
1703                        blob_seq_numbers_to_delete,
1704                        keys_written,
1705                    })
1706                },
1707            )?;
1708
1709        for PartialResultPerFamily {
1710            new_meta_file: inner_new_meta_file,
1711            new_sst_files: mut inner_new_sst_files,
1712            sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1713            blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1714            keys_written: inner_keys_written,
1715        } in result
1716        {
1717            new_meta_files.extend(inner_new_meta_file);
1718            new_sst_files.append(&mut inner_new_sst_files);
1719            sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1720            blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1721            *keys_written += inner_keys_written;
1722        }
1723
1724        Ok(())
1725    }
1726
1727    /// Get a value from the database. Returns None if the key is not found. The returned value
1728    /// might hold onto a block of the database and it should not be hold long-term.
1729    pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcBytes>> {
1730        debug_assert!(family < FAMILIES, "Family index out of bounds");
1731        if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1732            // This is an error in our caller so just panic
1733            panic!(
1734                "only single valued tables can be queried with `get', call `get_multiple` instead"
1735            )
1736        }
1737        let span = tracing::trace_span!(
1738            "database read",
1739            name = self.config.family_configs[family].name,
1740            result_size = tracing::field::Empty
1741        )
1742        .entered();
1743        let results = self.get_impl::<K, false>(family, key, &span)?;
1744        debug_assert!(results.len() <= 1, "get() should return at most one result");
1745        Ok(results.into_iter().next())
1746    }
1747
1748    /// Looks up a key and returns all matching values.
1749    ///
1750    /// This is useful for keyspaces where keys are not unique and multiple mappings are possible.
1751    /// Unlike `get`, which returns only the first match, this method returns all
1752    /// entries with the same key from all SST files.  By default however we assume these
1753    /// collections are small and thus optimize for there being exactly 0 or 1 results.
1754    ///
1755    /// The order of returned values is undefined and duplicates are preserved. Callers must not
1756    /// rely on any particular ordering (neither insertion order nor byte order).
1757    pub fn get_multiple<K: QueryKey>(
1758        &self,
1759        family: usize,
1760        key: &K,
1761    ) -> Result<SmallVec<[ArcBytes; 1]>> {
1762        debug_assert!(family < FAMILIES, "Family index out of bounds");
1763        if self.config.family_configs[family].kind != FamilyKind::MultiValue {
1764            // This is an error in our caller so just panic
1765            panic!("only multi-valued tables can be queried with `get_multiple`")
1766        }
1767        let span = tracing::trace_span!(
1768            "database read multiple",
1769            name = self.config.family_configs[family].name,
1770            result_count = tracing::field::Empty,
1771            result_size = tracing::field::Empty
1772        )
1773        .entered();
1774        let results = self.get_impl::<K, true>(family, key, &span)?;
1775        Ok(results)
1776    }
1777
1778    /// Shared implementation for `get` and `get_multiple`.
1779    ///
1780    /// If `FIND_ALL` is false, stops after finding the first match.
1781    /// If `FIND_ALL` is true, continues to find all matches across all meta files.
1782    fn get_impl<K: QueryKey, const FIND_ALL: bool>(
1783        &self,
1784        family: usize,
1785        key: &K,
1786        span: &EnteredSpan,
1787    ) -> Result<SmallVec<[ArcBytes; 1]>> {
1788        let hash = hash_key(key);
1789        let inner = self.inner.read();
1790        let mut output: SmallVec<[ArcBytes; 1]> = SmallVec::new();
1791        // Track whether we found the key in any SST (even if deleted).
1792        // Used for miss_global stat: only fires if key was never found anywhere.
1793        #[cfg(feature = "stats")]
1794        let mut found_in_sst = false;
1795
1796        let mut size = 0;
1797
1798        for meta in inner.meta_files.iter().rev() {
1799            match meta.lookup::<K, FIND_ALL>(
1800                family as u32,
1801                hash,
1802                key,
1803                self.key_block_cache(),
1804                self.value_block_cache(),
1805            )? {
1806                MetaLookupResult::FamilyMiss => {
1807                    #[cfg(feature = "stats")]
1808                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1809                }
1810                MetaLookupResult::RangeMiss => {
1811                    #[cfg(feature = "stats")]
1812                    self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1813                }
1814                MetaLookupResult::QuickFilterMiss => {
1815                    #[cfg(feature = "stats")]
1816                    self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1817                }
1818                MetaLookupResult::SstLookup(result) => match result {
1819                    SstLookupResult::Found(values) => {
1820                        #[cfg(feature = "stats")]
1821                        {
1822                            found_in_sst = true;
1823                        }
1824                        inner.accessed_key_hashes[family].insert(hash);
1825                        // Process values. Tombstones sort last within a key group,
1826                        // so when we see a tombstone, we can return immediately.
1827                        for value in values {
1828                            match value {
1829                                LookupValue::Deleted => {
1830                                    #[cfg(feature = "stats")]
1831                                    self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1832                                    if !FIND_ALL {
1833                                        span.record("result_size", "deleted");
1834                                        return Ok(SmallVec::new());
1835                                    }
1836                                    // Tombstone is last in key group. Return accumulated
1837                                    // values (from this SST and newer layers). Stop
1838                                    // searching older SSTs.
1839                                    if output.is_empty() {
1840                                        span.record("result_size", "deleted");
1841                                    } else {
1842                                        span.record("result_size", size);
1843                                    }
1844                                    return Ok(output);
1845                                }
1846                                LookupValue::Slice { value } => {
1847                                    #[cfg(feature = "stats")]
1848                                    self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1849                                    if !FIND_ALL {
1850                                        span.record("result_size", value.len());
1851                                        return Ok(SmallVec::from_buf([value]));
1852                                    }
1853                                    size += value.len();
1854                                    output.push(value);
1855                                }
1856                                LookupValue::Blob { sequence_number } => {
1857                                    #[cfg(feature = "stats")]
1858                                    self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1859                                    let blob = self.read_blob(sequence_number)?;
1860                                    if !FIND_ALL {
1861                                        span.record("result_size", blob.len());
1862                                        return Ok(SmallVec::from_buf([blob]));
1863                                    }
1864                                    size += blob.len();
1865                                    output.push(blob);
1866                                }
1867                            }
1868                        }
1869                    }
1870                    SstLookupResult::NotFound => {
1871                        #[cfg(feature = "stats")]
1872                        self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1873                    }
1874                },
1875            }
1876        }
1877
1878        #[cfg(feature = "stats")]
1879        if !found_in_sst {
1880            self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1881        }
1882
1883        if FIND_ALL {
1884            span.record("result_count", output.len());
1885        }
1886        if output.is_empty() {
1887            span.record("result_size", "not_found");
1888        } else {
1889            span.record("result_size", size);
1890        }
1891        Ok(output)
1892    }
1893
1894    pub fn batch_get<K: QueryKey>(
1895        &self,
1896        family: usize,
1897        keys: &[K],
1898    ) -> Result<Vec<Option<ArcBytes>>> {
1899        debug_assert!(family < FAMILIES, "Family index out of bounds");
1900        if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1901            // This is an error in our caller so just panic
1902            panic!("only single valued tables can be queried with `batch_get'")
1903        }
1904        let span = tracing::trace_span!(
1905            "database batch read",
1906            name = self.config.family_configs[family].name,
1907            keys = keys.len(),
1908            not_found = tracing::field::Empty,
1909            deleted = tracing::field::Empty,
1910            result_size = tracing::field::Empty
1911        )
1912        .entered();
1913        let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
1914        let mut empty_cells = keys.len();
1915        for (index, key) in keys.iter().enumerate() {
1916            let hash = hash_key(key);
1917            cells.push((hash, index, None));
1918        }
1919        cells.sort_by_key(|(hash, _, _)| *hash);
1920        let inner = self.inner.read();
1921        for meta in inner.meta_files.iter().rev() {
1922            let _result = meta.batch_lookup(
1923                family as u32,
1924                keys,
1925                &mut cells,
1926                &mut empty_cells,
1927                self.key_block_cache(),
1928                self.value_block_cache(),
1929            )?;
1930
1931            #[cfg(feature = "stats")]
1932            {
1933                let crate::meta_file::MetaBatchLookupResult {
1934                    family_miss,
1935                    range_misses,
1936                    quick_filter_misses,
1937                    sst_misses,
1938                    hits: _,
1939                } = _result;
1940                if family_miss {
1941                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1942                }
1943                if range_misses > 0 {
1944                    self.stats
1945                        .miss_range
1946                        .fetch_add(range_misses as u64, Ordering::Relaxed);
1947                }
1948                if quick_filter_misses > 0 {
1949                    self.stats
1950                        .miss_amqf
1951                        .fetch_add(quick_filter_misses as u64, Ordering::Relaxed);
1952                }
1953                if sst_misses > 0 {
1954                    self.stats
1955                        .miss_key
1956                        .fetch_add(sst_misses as u64, Ordering::Relaxed);
1957                }
1958            }
1959
1960            if empty_cells == 0 {
1961                break;
1962            }
1963        }
1964        let mut deleted = 0;
1965        let mut not_found = 0;
1966        let mut result_size = 0;
1967        let mut results = vec![None; keys.len()];
1968        for (hash, index, result) in cells {
1969            if let Some(result) = result {
1970                inner.accessed_key_hashes[family].insert(hash);
1971                let result = match result {
1972                    LookupValue::Deleted => {
1973                        #[cfg(feature = "stats")]
1974                        self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1975                        deleted += 1;
1976                        None
1977                    }
1978                    LookupValue::Slice { value } => {
1979                        #[cfg(feature = "stats")]
1980                        self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1981                        result_size += value.len();
1982                        Some(value)
1983                    }
1984                    LookupValue::Blob { sequence_number } => {
1985                        #[cfg(feature = "stats")]
1986                        self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1987                        let blob = self.read_blob(sequence_number)?;
1988                        result_size += blob.len();
1989                        Some(blob)
1990                    }
1991                };
1992                results[index] = result;
1993            } else {
1994                #[cfg(feature = "stats")]
1995                self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1996                not_found += 1;
1997            }
1998        }
1999        span.record("not_found", not_found);
2000        span.record("deleted", deleted);
2001        span.record("result_size", result_size);
2002        Ok(results)
2003    }
2004
2005    /// Returns database statistics.
2006    #[cfg(feature = "stats")]
2007    pub fn statistics(&self) -> Statistics {
2008        let inner = self.inner.read();
2009        Statistics {
2010            meta_files: inner.meta_files.len(),
2011            sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
2012            key_block_cache: CacheStatistics::new(self.key_block_cache()),
2013            value_block_cache: CacheStatistics::new(self.value_block_cache()),
2014            hits: self.stats.hits_deleted.load(Ordering::Relaxed)
2015                + self.stats.hits_small.load(Ordering::Relaxed)
2016                + self.stats.hits_blob.load(Ordering::Relaxed),
2017            misses: self.stats.miss_global.load(Ordering::Relaxed),
2018            miss_family: self.stats.miss_family.load(Ordering::Relaxed),
2019            miss_range: self.stats.miss_range.load(Ordering::Relaxed),
2020            miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
2021            miss_key: self.stats.miss_key.load(Ordering::Relaxed),
2022        }
2023    }
2024
2025    pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
2026        Ok(self
2027            .inner
2028            .read()
2029            .meta_files
2030            .iter()
2031            .rev()
2032            .map(|meta_file| {
2033                let entries = meta_file
2034                    .entries()
2035                    .iter()
2036                    .map(|entry| {
2037                        let amqf = entry.raw_amqf(meta_file.amqf_data());
2038                        MetaFileEntryInfo {
2039                            sequence_number: entry.sequence_number(),
2040                            min_hash: entry.min_hash(),
2041                            max_hash: entry.max_hash(),
2042                            sst_size: entry.size(),
2043                            flags: entry.flags(),
2044                            amqf_size: entry.amqf_size(),
2045                            amqf_entries: amqf.len(),
2046                            block_count: entry.block_count(),
2047                        }
2048                    })
2049                    .collect();
2050                MetaFileInfo {
2051                    sequence_number: meta_file.sequence_number(),
2052                    family: meta_file.family(),
2053                    obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
2054                    entries,
2055                }
2056            })
2057            .collect())
2058    }
2059
2060    /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
2061    /// Retries deletion of all previously-deferred files and clears successfully deleted batches.
2062    pub fn shutdown(&self) -> Result<()> {
2063        #[cfg(feature = "print_stats")]
2064        println!("{:#?}", self.statistics());
2065        self.retry_deferred_deletions();
2066        Ok(())
2067    }
2068
2069    /// Attempts to delete files with the given extension, returning an iterator of sequence
2070    /// numbers for files that could not be deleted (e.g. due to open memory maps on Windows).
2071    fn try_delete_files<'a>(
2072        dir: &'a Path,
2073        seqs: &'a [u32],
2074        ext: &'a str,
2075    ) -> impl Iterator<Item = u32> + 'a {
2076        seqs.iter()
2077            .copied()
2078            .filter(move |&seq| fs::remove_file(dir.join(format!("{seq:08}.{ext}"))).is_err())
2079    }
2080
2081    /// Retries deletion of files that previously failed (typically due to open memory maps on
2082    /// Windows). Any file that still fails is kept for the next retry.
2083    /// Best-effort: persistent failures are acceptable because `load_directory` cleans up
2084    /// any leftover files on the next open via the `.del` file.
2085    fn retry_deferred_deletions(&self) {
2086        let mut deferred = self.deferred_deletions.lock();
2087        deferred.retain(|entry| {
2088            let (seq, ext) = match *entry {
2089                DeferredDeletion::Sst(seq) => (seq, "sst"),
2090                DeferredDeletion::Meta(seq) => (seq, "meta"),
2091                DeferredDeletion::Blob(seq) => (seq, "blob"),
2092            };
2093            // Keep the entry only if deletion still fails.
2094            fs::remove_file(self.path.join(format!("{seq:08}.{ext}"))).is_err()
2095        });
2096    }
2097}
2098
2099fn range_to_str(min: u64, max: u64) -> String {
2100    use std::fmt::Write;
2101    const DISPLAY_SIZE: usize = 100;
2102    const TOTAL_SIZE: u64 = u64::MAX;
2103    let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2104    let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2105    let mut range_str = String::new();
2106    for i in 0..DISPLAY_SIZE {
2107        if i == start_pos && i == end_pos {
2108            range_str.push('O');
2109        } else if i == start_pos {
2110            range_str.push('[');
2111        } else if i == end_pos {
2112            range_str.push(']');
2113        } else if i > start_pos && i < end_pos {
2114            range_str.push('=');
2115        } else {
2116            range_str.push(' ');
2117        }
2118    }
2119    write!(range_str, " | {min:016x}-{max:016x}").unwrap();
2120    range_str
2121}
2122
2123pub struct MetaFileInfo {
2124    pub sequence_number: u32,
2125    pub family: u32,
2126    pub obsolete_sst_files: Vec<u32>,
2127    pub entries: Vec<MetaFileEntryInfo>,
2128}
2129
2130pub struct MetaFileEntryInfo {
2131    pub sequence_number: u32,
2132    pub min_hash: u64,
2133    pub max_hash: u64,
2134    pub amqf_size: u32,
2135    pub amqf_entries: usize,
2136    pub sst_size: u64,
2137    pub flags: MetaEntryFlags,
2138    pub block_count: u16,
2139}