Skip to main content

turbo_persistence/
db.rs

1use std::{
2    borrow::Cow,
3    collections::HashSet,
4    fs::{self, File, OpenOptions, ReadDir},
5    io::{BufWriter, Write},
6    mem::take,
7    ops::RangeInclusive,
8    path::{Path, PathBuf},
9    sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use dashmap::DashSet;
15use jiff::Timestamp;
16use memmap2::Mmap;
17use nohash_hasher::BuildNoHashHasher;
18use parking_lot::{Mutex, RwLock};
19use smallvec::SmallVec;
20use tracing::span::EnteredSpan;
21
22pub use crate::compaction::selector::CompactConfig;
23use crate::{
24    DbConfig, FamilyKind, QueryKey,
25    arc_bytes::ArcBytes,
26    compaction::selector::{Compactable, get_merge_segments},
27    compression::{checksum_block, decompress_into_arc},
28    constants::{
29        DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE,
30        MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE,
31    },
32    key::{StoreKey, hash_key},
33    lookup_entry::{IterValue, LookupEntry, LookupValue},
34    merge_iter::MergeIter,
35    meta_file::{MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
36    meta_file_builder::MetaFileBuilder,
37    mmap_helper::advise_mmap_for_persistence,
38    parallel_scheduler::ParallelScheduler,
39    rc_bytes::RcBytes,
40    sst_filter::SstFilter,
41    static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFileIter},
42    static_sorted_file_builder::{StaticSortedFileBuilderMeta, StreamingSstWriter},
43    write_batch::{FinishResult, WriteBatch},
44};
45
46#[cfg(feature = "stats")]
47#[derive(Debug)]
48pub struct CacheStatistics {
49    pub hit_rate: f32,
50    pub fill: f32,
51    pub items: usize,
52    pub size: u64,
53    pub hits: u64,
54    pub misses: u64,
55}
56
57#[cfg(feature = "stats")]
58impl CacheStatistics {
59    fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
60    where
61        Key: Eq + std::hash::Hash,
62        Val: Clone,
63        We: quick_cache::Weighter<Key, Val> + Clone,
64        B: std::hash::BuildHasher + Clone,
65        L: quick_cache::Lifecycle<Key, Val> + Clone,
66    {
67        let size = cache.weight();
68        let hits = cache.hits();
69        let misses = cache.misses();
70        Self {
71            hit_rate: hits as f32 / (hits + misses) as f32,
72            fill: size as f32 / cache.capacity() as f32,
73            items: cache.len(),
74            size,
75            hits,
76            misses,
77        }
78    }
79}
80
81#[cfg(feature = "stats")]
82#[derive(Debug)]
83pub struct Statistics {
84    pub meta_files: usize,
85    pub sst_files: usize,
86    pub key_block_cache: CacheStatistics,
87    pub value_block_cache: CacheStatistics,
88    pub hits: u64,
89    pub misses: u64,
90    pub miss_family: u64,
91    pub miss_range: u64,
92    pub miss_amqf: u64,
93    pub miss_key: u64,
94}
95
96#[cfg(feature = "stats")]
97#[derive(Default)]
98struct TrackedStats {
99    hits_deleted: std::sync::atomic::AtomicU64,
100    hits_small: std::sync::atomic::AtomicU64,
101    hits_blob: std::sync::atomic::AtomicU64,
102    miss_family: std::sync::atomic::AtomicU64,
103    miss_range: std::sync::atomic::AtomicU64,
104    miss_amqf: std::sync::atomic::AtomicU64,
105    miss_key: std::sync::atomic::AtomicU64,
106    miss_global: std::sync::atomic::AtomicU64,
107}
108
109/// State of the active write slot.
110enum ActiveWriteState {
111    /// A write operation or compaction is in progress.
112    /// The string is a human-readable name used in error messages.
113    Active(&'static str),
114    /// A previous write or compaction failed and recovery also failed.
115    /// No further writes are possible.
116    Error,
117}
118
119/// A single superseded file whose deletion failed and is being retried.
120///
121/// On Linux/macOS, deleting a memory-mapped file is safe and this list is
122/// normally empty. On Windows, open memory maps prevent deletion; failed files
123/// are collected here and retried on the next commit or shutdown.
124enum DeferredDeletion {
125    Sst(u32),
126    Meta(u32),
127    Blob(u32),
128}
129
130/// RAII guard for an active write operation.
131///
132/// When dropped without [`WriteOperationGuard::success`] being called first, the guard rolls back
133/// the operation by deleting any files whose sequence number exceeds `seq_before` (the sequence
134/// number at the time the operation started). If rollback itself fails the write slot is set to
135/// [`ActiveWriteState::Error`], permanently disabling further writes.
136pub(crate) struct WriteOperationGuard<'a> {
137    /// Reference to the active-write-operation slot, so we can clear or error it on drop.
138    active: &'a Mutex<Option<ActiveWriteState>>,
139    /// Database directory path, needed for orphan-file deletion during rollback.
140    path: &'a Path,
141    /// Sequence number at the time the operation started (= the last committed seq on disk).
142    /// Files with seq > this were created by the current operation and must be deleted on
143    /// rollback.
144    seq_before: u32,
145    /// Set to `true` by [`WriteOperationGuard::success`] to skip rollback on drop.
146    succeeded: bool,
147}
148
149impl WriteOperationGuard<'_> {
150    /// Mark the operation as successfully completed.
151    ///
152    /// After this call the guard's `Drop` impl will release the write slot without rolling back.
153    pub(crate) fn success(&mut self) {
154        self.succeeded = true;
155    }
156}
157
158/// Deletes all files in `path` whose numeric stem is greater than `seq_before`.
159///
160/// Called on rollback to clean up any SST, meta, blob, or del files written during a
161/// failed write operation or compaction.
162fn delete_orphan_files(path: &Path, seq_before: u32) -> Result<()> {
163    // Restore CURRENT to seq_before first. The failure may have happened mid-write
164    // to CURRENT, leaving it partially written. Writing seq_before makes the
165    // on-disk state consistent before we start deleting orphan files.
166    let mut current_file = OpenOptions::new()
167        .write(true)
168        .truncate(false)
169        .read(false)
170        .open(path.join("CURRENT"))?;
171    current_file.write_u32::<BE>(seq_before)?;
172    current_file.sync_all()?;
173
174    for entry in fs::read_dir(path)? {
175        let entry = entry?;
176        let path = entry.path();
177        if let Some(ext) = path.extension().and_then(|s| s.to_str())
178            && let Some(seq) = path
179                .file_stem()
180                .and_then(|s| s.to_str())
181                .and_then(|s| s.parse::<u32>().ok())
182            && seq > seq_before
183        {
184            match ext {
185                "sst" | "meta" | "blob" | "del" => fs::remove_file(&path)?,
186                _ => {}
187            }
188        }
189    }
190    Ok(())
191}
192
193impl Drop for WriteOperationGuard<'_> {
194    fn drop(&mut self) {
195        if self.succeeded {
196            // Happy path: just release the slot.
197            *self.active.lock() = None;
198            return;
199        }
200
201        // Unhappy path: the operation failed (or was dropped without commit).
202        // Delete every file that was created during this operation (seq > seq_before).
203        match delete_orphan_files(self.path, self.seq_before) {
204            Ok(()) => *self.active.lock() = None,
205            Err(_) => *self.active.lock() = Some(ActiveWriteState::Error),
206        }
207    }
208}
209
210/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
211/// using a single write batch. It allows for concurrent reads.
212pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
213    parallel_scheduler: S,
214    /// The path to the directory where the database is stored
215    path: PathBuf,
216    /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
217    /// no modification on the database is performed.
218    read_only: bool,
219    /// The inner state of the database. Writing will update that.
220    inner: RwLock<Inner<FAMILIES>>,
221    /// A flag to indicate if the database is empty (no meta files). This is an atomic mirror of
222    /// `inner.meta_files.is_empty()` to avoid taking a lock on the hot path.
223    is_empty: AtomicBool,
224    /// Tracks whether a write operation is in progress or has permanently failed.
225    /// `None` = idle, `Some(Active)` = in progress, `Some(Error)` = permanently disabled.
226    active_write_operation: Mutex<Option<ActiveWriteState>>,
227    /// Files from superseded commits whose deletion failed (e.g. on Windows due to open memory
228    /// maps) and will be retried on the next commit or at shutdown.
229    /// Protected by `active_write_operation` (only mutated inside a write operation).
230    deferred_deletions: Mutex<Vec<DeferredDeletion>>,
231    /// A cache for decompressed key blocks.
232    key_block_cache: BlockCache,
233    /// A cache for decompressed value blocks.
234    value_block_cache: BlockCache,
235    /// Per-family configuration for file limits.
236    config: DbConfig<FAMILIES>,
237    /// Statistics for the database.
238    #[cfg(feature = "stats")]
239    stats: TrackedStats,
240}
241
242/// The inner state of the database.
243struct Inner<const FAMILIES: usize> {
244    /// The list of meta files in the database. This is used to derive the SST files.
245    meta_files: Vec<MetaFile>,
246    /// The current sequence number for the database.
247    current_sequence_number: u32,
248    /// The in progress set of hashes of keys that have been accessed.
249    /// It will be flushed onto disk (into a meta file) on next commit.
250    /// It's a dashset to allow modification while only tracking a read lock on Inner.
251    accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
252}
253
254pub struct CommitOptions {
255    new_meta_files: Vec<(u32, File)>,
256    new_sst_files: Vec<(u32, File)>,
257    new_blob_files: Vec<(u32, File)>,
258    sst_seq_numbers_to_delete: Vec<u32>,
259    blob_seq_numbers_to_delete: Vec<u32>,
260    sequence_number: u32,
261    keys_written: u64,
262}
263
264impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
265    /// Open a TurboPersistence database at the given path.
266    /// This will read the directory and might performance cleanup when the database was not closed
267    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
268    /// files, so it's fast.
269    pub fn open(path: PathBuf) -> Result<Self> {
270        Self::open_with_parallel_scheduler(path, Default::default())
271    }
272
273    /// Open a TurboPersistence database at the given path with custom per-family configuration.
274    pub fn open_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
275        Self::open_with_config_and_parallel_scheduler(path, config, Default::default())
276    }
277
278    /// Open a TurboPersistence database at the given path in read only mode.
279    /// This will read the directory. No Cleanup is performed.
280    pub fn open_read_only_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
281        Self::open_read_only_with_parallel_scheduler(path, config, Default::default())
282    }
283}
284
285impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
286    fn new(
287        path: PathBuf,
288        read_only: bool,
289        parallel_scheduler: S,
290        config: DbConfig<FAMILIES>,
291    ) -> Self {
292        Self {
293            parallel_scheduler,
294            path,
295            read_only,
296            inner: RwLock::new(Inner {
297                meta_files: Vec::new(),
298                current_sequence_number: 0,
299                accessed_key_hashes: [(); FAMILIES]
300                    .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
301            }),
302            is_empty: AtomicBool::new(true),
303            active_write_operation: Mutex::new(None),
304            deferred_deletions: Mutex::new(Vec::new()),
305            key_block_cache: BlockCache::with(
306                KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
307                KEY_BLOCK_CACHE_SIZE,
308                Default::default(),
309                Default::default(),
310                Default::default(),
311            ),
312            value_block_cache: BlockCache::with(
313                VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
314                VALUE_BLOCK_CACHE_SIZE,
315                Default::default(),
316                Default::default(),
317                Default::default(),
318            ),
319            config,
320            #[cfg(feature = "stats")]
321            stats: TrackedStats::default(),
322        }
323    }
324
325    /// Open a TurboPersistence database at the given path.
326    /// This will read the directory and might performance cleanup when the database was not closed
327    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
328    /// files, so it's fast.
329    pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
330        Self::open_with_config_and_parallel_scheduler(path, DbConfig::default(), parallel_scheduler)
331    }
332
333    /// Open a TurboPersistence database at the given path with custom per-family configuration.
334    pub fn open_with_config_and_parallel_scheduler(
335        path: PathBuf,
336        config: DbConfig<FAMILIES>,
337        parallel_scheduler: S,
338    ) -> Result<Self> {
339        let mut db = Self::new(path, false, parallel_scheduler, config);
340        db.open_directory(false)?;
341        Ok(db)
342    }
343
344    /// Open a TurboPersistence database at the given path in read only mode.
345    /// This will read the directory. No Cleanup is performed.
346    fn open_read_only_with_parallel_scheduler(
347        path: PathBuf,
348        config: DbConfig<FAMILIES>,
349        parallel_scheduler: S,
350    ) -> Result<Self> {
351        let mut db = Self::new(path, true, parallel_scheduler, config);
352        db.open_directory(false)?;
353        Ok(db)
354    }
355
356    /// Performs the initial check on the database directory.
357    fn open_directory(&mut self, read_only: bool) -> Result<()> {
358        match fs::read_dir(&self.path) {
359            Ok(entries) => {
360                if !self
361                    .load_directory(entries, read_only)
362                    .context("Loading persistence directory failed")?
363                {
364                    if read_only {
365                        bail!("Failed to open database");
366                    }
367                    self.init_directory()
368                        .context("Initializing persistence directory failed")?;
369                }
370                Ok(())
371            }
372            Err(e) => {
373                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
374                    self.create_and_init_directory()
375                        .context("Creating and initializing persistence directory failed")?;
376                    Ok(())
377                } else {
378                    Err(e).context("Failed to open database")
379                }
380            }
381        }
382    }
383
384    /// Creates the directory and initializes it.
385    fn create_and_init_directory(&mut self) -> Result<()> {
386        fs::create_dir_all(&self.path)?;
387        self.init_directory()
388    }
389
390    /// Initializes the directory by creating the CURRENT file.
391    fn init_directory(&mut self) -> Result<()> {
392        let mut current = File::create(self.path.join("CURRENT"))?;
393        current.write_u32::<BE>(0)?;
394        current.flush()?;
395        Ok(())
396    }
397
398    /// Loads an existing database directory and performs cleanup if necessary.
399    fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
400        let mut meta_files = Vec::new();
401        let mut current_file = match File::open(self.path.join("CURRENT")) {
402            Ok(file) => file,
403            Err(e) => {
404                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
405                    return Ok(false);
406                } else {
407                    return Err(e).context("Failed to open CURRENT file");
408                }
409            }
410        };
411        let current = current_file.read_u32::<BE>()?;
412        drop(current_file);
413
414        let mut deleted_files = HashSet::new();
415        for entry in entries {
416            let entry = entry?;
417            let path = entry.path();
418            if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
419                let seq: u32 = path
420                    .file_stem()
421                    .context("File has no file stem")?
422                    .to_str()
423                    .context("File stem is not valid utf-8")?
424                    .parse()?;
425                if deleted_files.contains(&seq) {
426                    continue;
427                }
428                if seq > current {
429                    if !read_only {
430                        fs::remove_file(&path)?;
431                    }
432                } else {
433                    match ext {
434                        "meta" => {
435                            meta_files.push(seq);
436                        }
437                        "del" => {
438                            let mut content = &*fs::read(&path)?;
439                            let mut no_existing_files = true;
440                            while !content.is_empty() {
441                                let seq = content.read_u32::<BE>()?;
442                                deleted_files.insert(seq);
443                                if !read_only {
444                                    // Remove the files that are marked for deletion
445                                    let sst_file = self.path.join(format!("{seq:08}.sst"));
446                                    let meta_file = self.path.join(format!("{seq:08}.meta"));
447                                    let blob_file = self.path.join(format!("{seq:08}.blob"));
448                                    for path in [sst_file, meta_file, blob_file] {
449                                        if fs::exists(&path)? {
450                                            fs::remove_file(path)?;
451                                            no_existing_files = false;
452                                        }
453                                    }
454                                }
455                            }
456                            if !read_only && no_existing_files {
457                                fs::remove_file(&path)?;
458                            }
459                        }
460                        "blob" | "sst" => {
461                            // ignore blobs and sst, they are read when needed
462                        }
463                        _ => {
464                            if !path
465                                .file_name()
466                                .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
467                            {
468                                bail!("Unexpected file in persistence directory: {:?}", path);
469                            }
470                        }
471                    }
472                }
473            } else {
474                match path.file_stem().and_then(|s| s.to_str()) {
475                    Some("CURRENT") => {
476                        // Already read
477                    }
478                    Some("LOG") => {
479                        // Ignored, write-only
480                    }
481                    _ => {
482                        if !path
483                            .file_name()
484                            .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
485                        {
486                            bail!("Unexpected file in persistence directory: {:?}", path);
487                        }
488                    }
489                }
490            }
491        }
492
493        meta_files.retain(|seq| !deleted_files.contains(seq));
494        meta_files.sort_unstable();
495        let mut meta_files = self
496            .parallel_scheduler
497            .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
498                let meta_file = MetaFile::open(&self.path, seq)?;
499                Ok(meta_file)
500            })?;
501
502        let mut sst_filter = SstFilter::new();
503        for meta_file in meta_files.iter_mut().rev() {
504            sst_filter.apply_filter(meta_file);
505        }
506
507        let inner = self.inner.get_mut();
508        self.is_empty
509            .store(meta_files.is_empty(), Ordering::Relaxed);
510        inner.meta_files = meta_files;
511        inner.current_sequence_number = current;
512        Ok(true)
513    }
514
515    /// Reads and decompresses a blob file. This is not backed by any cache.
516    #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
517    fn read_blob(&self, seq: u32) -> Result<ArcBytes> {
518        let path = self.path.join(format!("{seq:08}.blob"));
519        let file = File::open(&path)
520            .with_context(|| format!("Failed to open blob file {}", path.display()))?;
521        let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
522            format!(
523                "Failed to mmap blob file {} ({} bytes)",
524                path.display(),
525                file.metadata().map(|m| m.len()).unwrap_or(0)
526            )
527        })?;
528        #[cfg(unix)]
529        mmap.advise(memmap2::Advice::Sequential)?;
530        #[cfg(unix)]
531        mmap.advise(memmap2::Advice::WillNeed)?;
532        advise_mmap_for_persistence(&mmap)?;
533        let mut reader = &mmap[..];
534        let uncompressed_length = reader
535            .read_u32::<BE>()
536            .context("Failed to read uncompressed length from blob file")?;
537        let expected_checksum = reader.read_u32::<BE>()?;
538
539        // Verify checksum on the compressed on-disk data before decompression.
540        let actual_checksum = checksum_block(reader);
541        if actual_checksum != expected_checksum {
542            bail!(
543                "Cache corruption detected: checksum mismatch in blob file {:08}.blob (expected \
544                 {:08x}, got {:08x})",
545                seq,
546                expected_checksum,
547                actual_checksum
548            );
549        }
550
551        let buffer = decompress_into_arc(uncompressed_length, reader)?;
552        Ok(ArcBytes::from(buffer))
553    }
554
555    /// Returns true if the database is empty.
556    pub fn is_empty(&self) -> bool {
557        self.is_empty.load(Ordering::Relaxed)
558    }
559
560    /// Returns `true` if a previous write or compaction left the database in an unrecoverable error
561    /// state, permanently disabling further writes.
562    pub fn has_unrecoverable_write_error(&self) -> bool {
563        matches!(
564            *self.active_write_operation.lock(),
565            Some(ActiveWriteState::Error)
566        )
567    }
568
569    /// Acquires the write-operation slot, returning an RAII guard that rolls back and releases it
570    /// on drop. Only one write operation (write batch or compaction) is allowed at a time.
571    /// `name` is a short human-readable label used in error messages (e.g. `"write batch"`).
572    fn acquire_write_operation(&self, name: &'static str) -> Result<WriteOperationGuard<'_>> {
573        if self.read_only {
574            bail!("Cannot perform write operations on a read-only database");
575        }
576        let mut slot = self.active_write_operation.lock();
577        match &*slot {
578            Some(ActiveWriteState::Active(active_name)) => {
579                bail!(
580                    "Another {active_name} is already active (only a single write operation is \
581                     allowed at a time)"
582                );
583            }
584            Some(ActiveWriteState::Error) => {
585                bail!(
586                    "A previous write operation failed with an unrecoverable error; no further \
587                     writes are possible"
588                );
589            }
590            None => {}
591        }
592        *slot = Some(ActiveWriteState::Active(name));
593        drop(slot); // release before acquiring inner read lock
594        let seq_before = self.inner.read().current_sequence_number;
595        Ok(WriteOperationGuard {
596            active: &self.active_write_operation,
597            path: &self.path,
598            seq_before,
599            succeeded: false,
600        })
601    }
602
603    /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a
604    /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
605    /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
606    /// This data will only become visible after the WriteBatch is committed.
607    pub fn write_batch<K: StoreKey + Send + Sync>(&self) -> Result<WriteBatch<'_, K, S, FAMILIES>> {
608        let guard = self.acquire_write_operation("write batch")?;
609        // seq_before is already the current sequence number, no second read needed.
610        let current = guard.seq_before;
611        Ok(WriteBatch::new(
612            guard,
613            self.path.clone(),
614            current,
615            self.parallel_scheduler.clone(),
616            self.config.family_configs,
617        ))
618    }
619
620    /// Clears all caches of the database.
621    pub fn clear_cache(&self) {
622        self.key_block_cache.clear();
623        self.value_block_cache.clear();
624        for meta in self.inner.write().meta_files.iter_mut() {
625            meta.clear_cache();
626        }
627    }
628
629    /// Clears block caches of the database.
630    pub fn clear_block_caches(&self) {
631        self.key_block_cache.clear();
632        self.value_block_cache.clear();
633    }
634
635    /// Prefetches all SST files which are usually lazy loaded. This can be used to reduce latency
636    /// for the first queries after opening the database.
637    pub fn prepare_all_sst_caches(&self) {
638        for meta in self.inner.write().meta_files.iter_mut() {
639            meta.prepare_sst_cache();
640        }
641    }
642
643    fn open_log(&self) -> Result<BufWriter<File>> {
644        if self.read_only {
645            unreachable!("Only write operations can open the log file");
646        }
647        let log_path = self.path.join("LOG");
648        let log_file = OpenOptions::new()
649            .create(true)
650            .append(true)
651            .open(log_path)?;
652        Ok(BufWriter::new(log_file))
653    }
654
655    /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
656    /// visible to readers.
657    pub fn commit_write_batch<K: StoreKey + Send + Sync>(
658        &self,
659        mut write_batch: WriteBatch<'_, K, S, FAMILIES>,
660    ) -> Result<()> {
661        if self.read_only {
662            unreachable!("It's not possible to create a write batch for a read-only database");
663        }
664        let FinishResult {
665            sequence_number,
666            new_meta_files,
667            new_sst_files,
668            new_blob_files,
669            keys_written,
670        } = write_batch.finish(|family| {
671            let inner = self.inner.read();
672            let set = &inner.accessed_key_hashes[family as usize];
673            // len is only a snapshot at that time and it can change while we create the filter.
674            // So we give it 5% more space to make resizes less likely.
675            let initial_capacity = set.len() * 20 / 19;
676            // TODO: Using u64::BITS as fingerprint size is wasteful for a
677            // probabilistic membership filter. A smaller fingerprint (e.g. via
678            // Filter::new with a target fp_rate) would significantly reduce size,
679            // but would make merging slower since mismatched fingerprint sizes
680            // fall back to one-by-one insertion instead of sorted merge.
681            let mut amqf =
682                qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
683                    .unwrap();
684            // This drains items from the set. But due to concurrency it might not be empty
685            // afterwards, but that's fine. It will be part of the next commit.
686            set.retain(|hash| {
687                // Performance-wise it would usually be better to insert sorted fingerprints, but we
688                // assume that hashes are equally distributed, which makes it unnecessary.
689                // Good for cache locality is that we insert in the order of the dashset's buckets.
690                amqf.insert_fingerprint(false, *hash)
691                    .expect("Failed to insert fingerprint");
692                false
693            });
694            amqf
695        })?;
696        self.commit(CommitOptions {
697            new_meta_files,
698            new_sst_files,
699            new_blob_files,
700            sst_seq_numbers_to_delete: vec![],
701            blob_seq_numbers_to_delete: vec![],
702            sequence_number,
703            keys_written,
704        })?;
705        // Mark the guard inside the write batch as succeeded so it skips the rollback on drop.
706        write_batch.mark_succeeded();
707        Ok(())
708    }
709
710    /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the
711    /// new files.
712    fn commit(
713        &self,
714        CommitOptions {
715            mut new_meta_files,
716            new_sst_files,
717            new_blob_files,
718            mut sst_seq_numbers_to_delete,
719            mut blob_seq_numbers_to_delete,
720            sequence_number: mut seq,
721            keys_written,
722        }: CommitOptions,
723    ) -> Result<(), anyhow::Error> {
724        let time = Timestamp::now();
725
726        new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
727
728        let sync_span = tracing::trace_span!("sync new files").entered();
729
730        enum SyncItem {
731            Meta(u32, File),
732            Sst(File),
733            Blob(u32, File),
734        }
735        enum SyncResult {
736            Meta(MetaFile),
737            Sst,
738            Blob(u32, File),
739        }
740
741        let mut sync_items: Vec<SyncItem> =
742            Vec::with_capacity(new_meta_files.len() + new_sst_files.len() + new_blob_files.len());
743        for (seq, file) in new_meta_files {
744            sync_items.push(SyncItem::Meta(seq, file));
745        }
746        for (_, file) in new_sst_files {
747            sync_items.push(SyncItem::Sst(file));
748        }
749        for (seq, file) in new_blob_files {
750            sync_items.push(SyncItem::Blob(seq, file));
751        }
752
753        let results: Vec<SyncResult> = self
754            .parallel_scheduler
755            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(sync_items, |item| match item {
756                SyncItem::Meta(seq, file) => {
757                    file.sync_data()?;
758                    let meta_file = MetaFile::open(&self.path, seq)?;
759                    Ok(SyncResult::Meta(meta_file))
760                }
761                SyncItem::Sst(file) => {
762                    file.sync_data()?;
763                    Ok(SyncResult::Sst)
764                }
765                SyncItem::Blob(seq, file) => {
766                    file.sync_data()?;
767                    Ok(SyncResult::Blob(seq, file))
768                }
769            })?;
770
771        let mut new_meta_files: Vec<MetaFile> = Vec::new();
772        let mut new_blob_files: Vec<(u32, File)> = Vec::new();
773        for result in results {
774            match result {
775                SyncResult::Meta(mf) => new_meta_files.push(mf),
776                SyncResult::Sst => {}
777                SyncResult::Blob(seq, file) => new_blob_files.push((seq, file)),
778            }
779        }
780
781        let mut sst_filter = SstFilter::new();
782        for meta_file in new_meta_files.iter_mut().rev() {
783            sst_filter.apply_filter(meta_file);
784        }
785
786        // Sync the directory to ensure the new directory entries (file name → inode mappings)
787        // are durable before we update CURRENT. Without this, a crash could leave CURRENT pointing
788        // to files whose directory entries were lost even though their data was flushed.
789        File::open(&self.path)?.sync_data()?;
790        drop(sync_span);
791
792        let new_meta_info = new_meta_files
793            .iter()
794            .map(|meta| {
795                let ssts = meta
796                    .entries()
797                    .iter()
798                    .map(|entry| {
799                        let seq = entry.sequence_number();
800                        let range = entry.range();
801                        let size = entry.size();
802                        let flags = entry.flags();
803                        (seq, range.min_hash, range.max_hash, size, flags)
804                    })
805                    .collect::<Vec<_>>();
806                (
807                    meta.sequence_number(),
808                    meta.family(),
809                    ssts,
810                    meta.obsolete_sst_files().to_vec(),
811                )
812            })
813            .collect::<Vec<_>>();
814
815        // ── Phase A: compute what will change without modifying inner. ──
816        //
817        // We need `meta_seq_numbers_to_delete` and `has_delete_file` to write
818        // the .del file BEFORE writing CURRENT. We must not modify `inner` at
819        // all — if a disk error occurs before CURRENT is durable, the
820        // WriteOperationGuard rollback can only clean up orphan files, not undo
821        // in-memory mutations. The MetaFile in-memory optimization
822        // (retain_entries) is deferred to Phase C.
823        let has_delete_file;
824        let mut meta_seq_numbers_to_delete = Vec::new();
825        let entries_to_remove;
826
827        {
828            let inner = self.inner.read();
829
830            // (A1) Run the SST filter on existing meta files. This only
831            // updates the SstFilter state — the MetaFile in-memory layout is
832            // not modified yet (that happens in Phase C via retain_entries).
833            // Collects the set of SST entry sequence numbers to remove from
834            // each meta file, keyed by position in `inner.meta_files`.
835            entries_to_remove = inner
836                .meta_files
837                .iter()
838                .rev()
839                .map(|meta_file| sst_filter.apply_filter_collect(meta_file))
840                .collect::<Vec<_>>();
841
842            // (A2) Determine which meta files are fully obsolete by running
843            // `apply_and_get_remove` in newest-first order. Process new metas
844            // first (they are newer than existing ones) to advance the filter
845            // state, then existing ones. New metas are never candidates for
846            // removal (just created), so only their filter-state side-effects
847            // matter.
848            for meta_file in new_meta_files.iter().rev() {
849                let should_remove = sst_filter.apply_and_get_remove(meta_file);
850                debug_assert!(
851                    !should_remove,
852                    "newly created meta file should never be a candidate for removal"
853                );
854            }
855            for i in (0..inner.meta_files.len()).rev() {
856                if sst_filter.apply_and_get_remove(&inner.meta_files[i]) {
857                    meta_seq_numbers_to_delete.push(inner.meta_files[i].sequence_number());
858                }
859            }
860
861            // (A3) Compute the final sequence number that will be written to
862            // CURRENT. A .del file is created only when there are files to
863            // delete, which consumes one extra sequence number.
864            has_delete_file = !sst_seq_numbers_to_delete.is_empty()
865                || !blob_seq_numbers_to_delete.is_empty()
866                || !meta_seq_numbers_to_delete.is_empty();
867        }
868        if has_delete_file {
869            seq += 1;
870        }
871
872        self.parallel_scheduler.block_in_place(|| {
873            if has_delete_file {
874                sst_seq_numbers_to_delete.sort_unstable();
875                meta_seq_numbers_to_delete.sort_unstable();
876                blob_seq_numbers_to_delete.sort_unstable();
877                // Write *.del file, marking the selected files as to delete
878                let mut buf = Vec::with_capacity(
879                    (sst_seq_numbers_to_delete.len()
880                        + meta_seq_numbers_to_delete.len()
881                        + blob_seq_numbers_to_delete.len())
882                        * size_of::<u32>(),
883                );
884                for seq in sst_seq_numbers_to_delete.iter() {
885                    buf.write_u32::<BE>(*seq)?;
886                }
887                for seq in meta_seq_numbers_to_delete.iter() {
888                    buf.write_u32::<BE>(*seq)?;
889                }
890                for seq in blob_seq_numbers_to_delete.iter() {
891                    buf.write_u32::<BE>(*seq)?;
892                }
893                let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
894                file.write_all(&buf)?;
895                file.sync_data()?;
896            }
897
898            let mut current_file = OpenOptions::new()
899                .write(true)
900                .truncate(false)
901                .read(false)
902                .open(self.path.join("CURRENT"))?;
903            current_file.write_u32::<BE>(seq)?;
904            current_file.sync_data()?;
905
906            // ── Point of no return ──────────────────────────────────────────
907            //
908            // CURRENT has been durably updated. The commit is now visible to
909            // future readers (including after a crash/restart via
910            // `load_directory`). Everything below is best-effort cleanup:
911            //
912            // • Writing the LOG is purely informational.
913            //
914            // • Superseded files are NOT deleted here — Phase C handles that
915            //   after `inner` is updated. On Linux/macOS they are deleted
916            //   immediately; on Windows (where open memory maps prevent
917            //   deletion) they are retried on the next commit or shutdown.
918            //
919            // Errors here must NOT propagate, because the WriteOperationGuard
920            // would then run its rollback and delete the *newly committed*
921            // files, corrupting the database.
922
923            if let Err(e) = (|| {
924                let mut log = self.open_log()?;
925                writeln!(log, "Time {time}")?;
926                let span = time.until(Timestamp::now())?;
927                writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
928                writeln!(log, "FAM | META SEQ | SST SEQ         | RANGE")?;
929                for (meta_seq, family, ssts, obsolete) in new_meta_info {
930                    for (seq, min, max, size, flags) in ssts {
931                        writeln!(
932                            log,
933                            "{family:3} | {meta_seq:08} | {seq:08} SST    | {} ({} MiB, {})",
934                            range_to_str(min, max),
935                            size / 1024 / 1024,
936                            flags
937                        )?;
938                    }
939                    for obsolete in obsolete.chunks(15) {
940                        write!(log, "{family:3} | {meta_seq:08} |")?;
941                        for seq in obsolete {
942                            write!(log, " {seq:08}")?;
943                        }
944                        writeln!(log, " OBSOLETE SST")?;
945                    }
946                }
947
948                fn write_seq_numbers<W: std::io::Write, T>(
949                    log: &mut W,
950                    items: &[T],
951                    label: &str,
952                    extract_seq: fn(&T) -> u32,
953                ) -> std::io::Result<()> {
954                    for chunk in items.chunks(15) {
955                        write!(log, "    |          |")?;
956                        for item in chunk {
957                            write!(log, " {:08}", extract_seq(item))?;
958                        }
959                        writeln!(log, " {}", label)?;
960                    }
961                    Ok(())
962                }
963
964                new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
965                write_seq_numbers(&mut log, &new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
966                write_seq_numbers(
967                    &mut log,
968                    &blob_seq_numbers_to_delete,
969                    "BLOB DELETED",
970                    |&seq| seq,
971                )?;
972                write_seq_numbers(
973                    &mut log,
974                    &sst_seq_numbers_to_delete,
975                    "SST DELETED",
976                    |&seq| seq,
977                )?;
978                write_seq_numbers(
979                    &mut log,
980                    &meta_seq_numbers_to_delete,
981                    "META DELETED",
982                    |&seq| seq,
983                )?;
984                anyhow::Ok(())
985            })() {
986                eprintln!("turbo-persistence: failed to write LOG after commit {seq:08}: {e:#}");
987            }
988
989            anyhow::Ok(())
990        })?;
991
992        // ── Phase C: structurally update inner (CURRENT is already durable). ──
993        //
994        // Between Phase A's read-lock drop and this point no other writer can
995        // run (WriteOperationGuard ensures exclusivity) and readers never mutate
996        // inner, so the snapshot from Phase A is still valid.
997        {
998            let mut inner = self.inner.write();
999
1000            // Apply the deferred MetaFile mutations from Phase A1. apply_filter
1001            // was called read-only earlier; now we actually move superseded
1002            // entries from active to obsolete inside each MetaFile.
1003            // entries_to_remove was collected in reverse order, so iterate it
1004            // in reverse to match the forward order of inner.meta_files.
1005            for (meta_file, to_remove) in inner
1006                .meta_files
1007                .iter_mut()
1008                .zip(entries_to_remove.into_iter().rev())
1009            {
1010                if !to_remove.is_empty() {
1011                    meta_file.retain_entries(|seq| !to_remove.contains(&seq));
1012                }
1013            }
1014
1015            inner.meta_files.append(&mut new_meta_files);
1016            if !meta_seq_numbers_to_delete.is_empty() {
1017                let to_delete: HashSet<u32> = meta_seq_numbers_to_delete.iter().copied().collect();
1018                inner
1019                    .meta_files
1020                    .retain(|meta| !to_delete.contains(&meta.sequence_number()));
1021            }
1022            inner.current_sequence_number = seq;
1023            self.is_empty
1024                .store(inner.meta_files.is_empty(), Ordering::Relaxed);
1025        }
1026
1027        // Try to delete superseded files immediately. On Linux/macOS this always
1028        // works even if readers have the files memory-mapped. On Windows, open
1029        // memory maps prevent deletion; any file that fails is kept in
1030        // `deferred_deletions` and retried on the next commit or at shutdown.
1031        self.deferred_deletions.lock().extend(
1032            Self::try_delete_files(&self.path, &sst_seq_numbers_to_delete, "sst")
1033                .map(DeferredDeletion::Sst)
1034                .chain(
1035                    Self::try_delete_files(&self.path, &meta_seq_numbers_to_delete, "meta")
1036                        .map(DeferredDeletion::Meta),
1037                )
1038                .chain(
1039                    Self::try_delete_files(&self.path, &blob_seq_numbers_to_delete, "blob")
1040                        .map(DeferredDeletion::Blob),
1041                ),
1042        );
1043
1044        // Retry any deletions that failed in earlier commits.
1045        self.retry_deferred_deletions();
1046
1047        // Best-effort verbose log of the new database state after Phase C.
1048        #[cfg(feature = "verbose_log")]
1049        {
1050            let _: Result<(), _> = (|| -> anyhow::Result<()> {
1051                let mut log = self.open_log()?;
1052                writeln!(log, "New database state:")?;
1053                writeln!(log, "FAM | META SEQ | SST SEQ  FLAGS | RANGE")?;
1054                let inner = self.inner.read();
1055                let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
1056                    let mut set = HashSet::new();
1057                    move |family| set.insert(*family)
1058                });
1059                for family in families {
1060                    for meta in inner.meta_files.iter() {
1061                        if meta.family() != family {
1062                            continue;
1063                        }
1064                        let meta_seq = meta.sequence_number();
1065                        for entry in meta.entries().iter() {
1066                            let seq = entry.sequence_number();
1067                            let range = entry.range();
1068                            writeln!(
1069                                log,
1070                                "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
1071                                entry.flags(),
1072                                range_to_str(range.min_hash, range.max_hash)
1073                            )?;
1074                        }
1075                    }
1076                }
1077                Ok(())
1078            })();
1079        }
1080
1081        Ok(())
1082    }
1083
1084    /// Runs a full compaction on the database. This will rewrite all SST files, removing all
1085    /// duplicate keys and separating all key ranges into unique files.
1086    pub fn full_compact(&self) -> Result<()> {
1087        self.compact(&CompactConfig {
1088            min_merge_count: 2,
1089            optimal_merge_count: usize::MAX,
1090            max_merge_count: usize::MAX,
1091            max_merge_bytes: u64::MAX,
1092            min_merge_duplication_bytes: 0,
1093            optimal_merge_duplication_bytes: u64::MAX,
1094            max_merge_segment_count: usize::MAX,
1095        })?;
1096        Ok(())
1097    }
1098
1099    /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
1100    /// files is above the given threshold. The coverage is the average number of SST files that
1101    /// need to be read to find a key. It also limits the maximum number of SST files that are
1102    /// merged at once, which is the main factor for the runtime of the compaction.
1103    pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
1104        let mut guard = self.acquire_write_operation("compaction")?;
1105
1106        // Free block caches and SST mmaps before compaction. The block caches
1107        // are not used during compaction (we iterate uncached), and any cached
1108        // SST mmaps would use MADV_RANDOM which is wrong for sequential scans.
1109        // Clearing them upfront frees memory for the merge work.
1110        self.clear_cache();
1111
1112        let mut sequence_number;
1113        let mut new_meta_files = Vec::new();
1114        let mut new_sst_files = Vec::new();
1115        let mut sst_seq_numbers_to_delete = Vec::new();
1116        let mut blob_seq_numbers_to_delete = Vec::new();
1117        let mut keys_written = 0;
1118
1119        {
1120            let inner = self.inner.read();
1121            sequence_number = AtomicU32::new(inner.current_sequence_number);
1122            self.compact_internal(
1123                &inner.meta_files,
1124                &sequence_number,
1125                &mut new_meta_files,
1126                &mut new_sst_files,
1127                &mut sst_seq_numbers_to_delete,
1128                &mut blob_seq_numbers_to_delete,
1129                &mut keys_written,
1130                compact_config,
1131            )
1132            .context("Failed to compact database")?;
1133        }
1134
1135        let has_changes = !new_meta_files.is_empty();
1136        if has_changes {
1137            self.commit(CommitOptions {
1138                new_meta_files,
1139                new_sst_files,
1140                new_blob_files: Vec::new(),
1141                sst_seq_numbers_to_delete,
1142                blob_seq_numbers_to_delete,
1143                sequence_number: *sequence_number.get_mut(),
1144                keys_written,
1145            })
1146            .context("Failed to commit the database compaction")?;
1147        }
1148
1149        guard.success();
1150        Ok(has_changes)
1151    }
1152
1153    /// Internal function to perform a compaction.
1154    fn compact_internal(
1155        &self,
1156        meta_files: &[MetaFile],
1157        sequence_number: &AtomicU32,
1158        new_meta_files: &mut Vec<(u32, File)>,
1159        new_sst_files: &mut Vec<(u32, File)>,
1160        sst_seq_numbers_to_delete: &mut Vec<u32>,
1161        blob_seq_numbers_to_delete: &mut Vec<u32>,
1162        keys_written: &mut u64,
1163        compact_config: &CompactConfig,
1164    ) -> Result<()> {
1165        if meta_files.is_empty() {
1166            return Ok(());
1167        }
1168
1169        struct SstWithRange {
1170            meta_index: usize,
1171            index_in_meta: u32,
1172            seq: u32,
1173            range: StaticSortedFileRange,
1174            size: u64,
1175            flags: MetaEntryFlags,
1176        }
1177
1178        impl Compactable for SstWithRange {
1179            fn range(&self) -> RangeInclusive<u64> {
1180                self.range.min_hash..=self.range.max_hash
1181            }
1182
1183            fn size(&self) -> u64 {
1184                self.size
1185            }
1186
1187            fn category(&self) -> u8 {
1188                // Cold and non-cold files are placed separately so we pass different category
1189                // values to ensure they are not merged together.
1190                if self.flags.cold() { 1 } else { 0 }
1191            }
1192        }
1193
1194        let ssts_with_ranges = meta_files
1195            .iter()
1196            .enumerate()
1197            .flat_map(|(meta_index, meta)| {
1198                meta.entries()
1199                    .iter()
1200                    .enumerate()
1201                    .map(move |(index_in_meta, entry)| SstWithRange {
1202                        meta_index,
1203                        index_in_meta: index_in_meta as u32,
1204                        seq: entry.sequence_number(),
1205                        range: entry.range(),
1206                        size: entry.size(),
1207                        flags: entry.flags(),
1208                    })
1209            })
1210            .collect::<Vec<_>>();
1211
1212        let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
1213
1214        for sst in ssts_with_ranges {
1215            sst_by_family[sst.range.family as usize].push(sst);
1216        }
1217
1218        let path = &self.path;
1219
1220        let log_mutex = Mutex::new(());
1221
1222        struct PartialResultPerFamily {
1223            new_meta_file: Option<(u32, File)>,
1224            new_sst_files: Vec<(u32, File)>,
1225            sst_seq_numbers_to_delete: Vec<u32>,
1226            blob_seq_numbers_to_delete: Vec<u32>,
1227            keys_written: u64,
1228        }
1229
1230        let mut compact_config = compact_config.clone();
1231        let merge_jobs = sst_by_family
1232            .into_iter()
1233            .enumerate()
1234            .filter_map(|(family, ssts_with_ranges)| {
1235                if compact_config.max_merge_segment_count == 0 {
1236                    return None;
1237                }
1238                let (merge_jobs, real_merge_job_size) =
1239                    get_merge_segments(&ssts_with_ranges, &compact_config);
1240                compact_config.max_merge_segment_count -= real_merge_job_size;
1241                Some((family, ssts_with_ranges, merge_jobs))
1242            })
1243            .collect::<Vec<_>>();
1244
1245        let result = self
1246            .parallel_scheduler
1247            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
1248                merge_jobs,
1249                |(family, ssts_with_ranges, merge_jobs)| {
1250                    let family = family as u32;
1251
1252                    if merge_jobs.is_empty() {
1253                        return Ok(PartialResultPerFamily {
1254                            new_meta_file: None,
1255                            new_sst_files: Vec::new(),
1256                            sst_seq_numbers_to_delete: Vec::new(),
1257                            blob_seq_numbers_to_delete: Vec::new(),
1258                            keys_written: 0,
1259                        });
1260                    }
1261
1262                    // Deserialize and merge used key hash filters per-family into
1263                    // a single filter. This avoids O(entries × N) filter probes
1264                    // during the merge loop. Empty filters (from commits with no
1265                    // reads) are discarded.
1266                    let used_key_hashes: Option<qfilter::Filter> = {
1267                        let filters: Vec<qfilter::FilterRef<'_>> = meta_files
1268                            .iter()
1269                            .filter(|m| m.family() == family)
1270                            .filter_map(|meta_file| {
1271                                meta_file.deserialize_used_key_hashes_amqf().transpose()
1272                            })
1273                            .collect::<Result<Vec<_>>>()?
1274                            .into_iter()
1275                            .filter(|amqf| !amqf.is_empty())
1276                            .collect();
1277                        if filters.is_empty() {
1278                            None
1279                        } else if filters.len() == 1 {
1280                            // Just directly use the single item
1281                            Some(filters[0].to_owned())
1282                        } else {
1283                            let total_len: u64 = filters.iter().map(|f| f.len()).sum();
1284                            // Fingerprint size must match the source filters to
1285                            // enable the efficient sorted merge path in qfilter.
1286                            let mut merged =
1287                                qfilter::Filter::with_fingerprint_size(total_len, u64::BITS as u8)
1288                                    .expect("Failed to create merged AMQF filter");
1289                            for filter in &filters {
1290                                merged
1291                                    .merge(false, filter)
1292                                    .expect("Failed to merge AMQF filters");
1293                            }
1294                            merged.shrink_to_fit();
1295                            Some(merged)
1296                        }
1297                    };
1298
1299                    // Later we will remove the merged files
1300                    let sst_seq_numbers_to_delete = merge_jobs
1301                        .iter()
1302                        .filter(|l| l.len() > 1)
1303                        .flat_map(|l| l.iter().copied())
1304                        .map(|index| ssts_with_ranges[index].seq)
1305                        .collect::<Vec<_>>();
1306
1307                    // Merge SST files
1308                    let span = tracing::trace_span!(
1309                        "merge files",
1310                        family = self.config.family_configs[family as usize].name
1311                    );
1312                    enum PartialMergeResult<'l> {
1313                        Merged {
1314                            new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1315                            blob_seq_numbers_to_delete: Vec<u32>,
1316                            keys_written: u64,
1317                            indices: SmallVec<[usize; 1]>,
1318                        },
1319                        Move {
1320                            seq: u32,
1321                            meta: StaticSortedFileBuilderMeta<'l>,
1322                        },
1323                    }
1324                    let merge_result = self
1325                        .parallel_scheduler
1326                        .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
1327                            let _span = span.clone().entered();
1328                            if indices.len() == 1 {
1329                                // If we only have one file, we can just move it
1330                                let index = indices[0];
1331                                let meta_index = ssts_with_ranges[index].meta_index;
1332                                let index_in_meta = ssts_with_ranges[index].index_in_meta;
1333                                let meta_file = &meta_files[meta_index];
1334                                let entry = meta_file.entry(index_in_meta);
1335                                let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
1336                                let meta = StaticSortedFileBuilderMeta {
1337                                    min_hash: entry.min_hash(),
1338                                    max_hash: entry.max_hash(),
1339                                    amqf,
1340                                    block_count: entry.block_count(),
1341                                    size: entry.size(),
1342                                    flags: entry.flags(),
1343                                    entries: 0,
1344                                };
1345                                return Ok(PartialMergeResult::Move {
1346                                    seq: entry.sequence_number(),
1347                                    meta,
1348                                });
1349                            }
1350
1351                            // Open SST files independently for compaction.
1352                            // Uses MADV_SEQUENTIAL for better OS page management
1353                            // and avoids caching mmaps on MetaEntry's OnceLock.
1354                            let iters = indices
1355                                .iter()
1356                                .map(|&index| {
1357                                    let meta_index = ssts_with_ranges[index].meta_index;
1358                                    let index_in_meta = ssts_with_ranges[index].index_in_meta;
1359                                    let entry = meta_files[meta_index].entry(index_in_meta);
1360                                    StaticSortedFileIter::open(path, entry.sst_metadata())
1361                                })
1362                                .collect::<Result<Vec<_>>>()?;
1363
1364                            let iter = MergeIter::new(iters.into_iter())?;
1365
1366                            let mut blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1367
1368                            struct Collector {
1369                                /// The active writer and its sequence number. `None` if no
1370                                /// entries have been added since the last flush. We defer
1371                                /// allocation to avoid creating empty SST files for collectors
1372                                /// that receive no entries (e.g., the unused_collector when
1373                                /// all keys are in the
1374                                /// used set).
1375                                writer: Option<(u32, StreamingSstWriter<LookupEntry>)>,
1376                                flags: MetaEntryFlags,
1377                                new_sst_files:
1378                                    Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1379                                /// Hash of the last key added. Used to ensure we only split
1380                                /// SST files at key boundaries (not mid-key-group for MultiValue).
1381                                last_hash: Option<u64>,
1382                            }
1383                            impl Collector {
1384                                fn new(flags: MetaEntryFlags) -> Self {
1385                                    Self {
1386                                        writer: None,
1387                                        flags,
1388                                        new_sst_files: Vec::new(),
1389                                        last_hash: None,
1390                                    }
1391                                }
1392
1393                                /// Ensures a writer is open, creating one if needed.
1394                                fn ensure_writer(
1395                                    &mut self,
1396                                    path: &Path,
1397                                    sequence_number: &AtomicU32,
1398                                ) -> Result<&mut StreamingSstWriter<LookupEntry>>
1399                                {
1400                                    if self.writer.is_none() {
1401                                        let seq =
1402                                            sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1403                                        let sst_path = path.join(format!("{seq:08}.sst"));
1404                                        let writer = StreamingSstWriter::new(
1405                                            &sst_path,
1406                                            self.flags,
1407                                            MAX_ENTRIES_PER_COMPACTED_FILE as u64,
1408                                        )?;
1409                                        self.writer = Some((seq, writer));
1410                                    }
1411                                    Ok(&mut self.writer.as_mut().unwrap().1)
1412                                }
1413
1414                                /// Closes the current SST file (flushing remaining blocks and
1415                                /// writing the index) and records it in the completed files
1416                                /// list.
1417                                fn close_sst_file(&mut self, keys_written: &mut u64) -> Result<()> {
1418                                    if let Some((seq, writer)) = self.writer.take() {
1419                                        let _span =
1420                                            tracing::trace_span!("close merged sst file").entered();
1421                                        let (meta, file) = writer.close()?;
1422                                        *keys_written += meta.entries;
1423                                        self.new_sst_files.push((seq, file, meta));
1424                                    }
1425                                    Ok(())
1426                                }
1427
1428                                /// Adds an entry to the collector. Only splits the SST file at
1429                                /// key boundaries to avoid breaking key groups for MultiValue
1430                                /// families.
1431                                fn add_entry(
1432                                    &mut self,
1433                                    entry: LookupEntry,
1434                                    path: &Path,
1435                                    sequence_number: &AtomicU32,
1436                                    keys_written: &mut u64,
1437                                ) -> Result<()> {
1438                                    let key_changed = self.last_hash != Some(entry.hash);
1439                                    // Only check fullness at key boundaries to avoid splitting
1440                                    // a key group across two SST files.
1441                                    if key_changed
1442                                        && let Some((_, ref writer)) = self.writer
1443                                        && writer.is_full(
1444                                            MAX_ENTRIES_PER_COMPACTED_FILE,
1445                                            DATA_THRESHOLD_PER_COMPACTED_FILE,
1446                                        )
1447                                    {
1448                                        self.close_sst_file(keys_written)?;
1449                                    }
1450                                    self.last_hash = Some(entry.hash);
1451                                    let writer = self.ensure_writer(path, sequence_number)?;
1452                                    writer.add(entry)?;
1453                                    Ok(())
1454                                }
1455                            }
1456                            #[cfg(debug_assertions)]
1457                            impl Drop for Collector {
1458                                fn drop(&mut self) {
1459                                    if !std::thread::panicking() {
1460                                        assert!(
1461                                            self.writer.is_none(),
1462                                            "Collector dropped with an open writer"
1463                                        );
1464                                    }
1465                                }
1466                            }
1467                            let mut used_collector = Collector::new(MetaEntryFlags::WARM);
1468                            let mut unused_collector = Collector::new(MetaEntryFlags::COLD);
1469                            let mut current_key: Option<RcBytes> = None;
1470                            let mut keys_written = 0;
1471
1472                            // MergeIter yields entries from newer SSTs first (by SST sequence
1473                            // number). Within each SST, tombstones sort last within key groups.
1474                            // Use a skip flag to handle:
1475                            // - SingleValue: skip all older entries after writing the first
1476                            // - MultiValue: skip all older entries after encountering a tombstone
1477                            //   (which signals deletion of all prior values for this key)
1478                            let mut skip_remaining_for_this_key = false;
1479                            let family_config = &self.config.family_configs[family as usize];
1480
1481                            for entry in iter {
1482                                let entry = entry?;
1483                                if current_key.as_ref() != Some(&entry.key) {
1484                                    // we changed keys so undo this flag
1485                                    skip_remaining_for_this_key = false;
1486                                    current_key = Some(entry.key.clone());
1487                                }
1488                                if !skip_remaining_for_this_key {
1489                                    let is_used = used_key_hashes
1490                                        .as_ref()
1491                                        .is_some_and(|amqf| amqf.contains_fingerprint(entry.hash));
1492                                    let collector = if is_used {
1493                                        &mut used_collector
1494                                    } else {
1495                                        &mut unused_collector
1496                                    };
1497                                    match family_config.kind {
1498                                        FamilyKind::MultiValue => {
1499                                            // For MultiValue families we only skip remaining if we
1500                                            // see a tombstone
1501                                            if matches!(entry.value, IterValue::Deleted) {
1502                                                skip_remaining_for_this_key = true;
1503                                            }
1504                                        }
1505                                        FamilyKind::SingleValue => {
1506                                            // Since MergeItr is in newest to oldest order anything
1507                                            // else that comes out must be skipped
1508                                            skip_remaining_for_this_key = true;
1509                                        }
1510                                    }
1511                                    collector.add_entry(
1512                                        entry,
1513                                        path,
1514                                        sequence_number,
1515                                        &mut keys_written,
1516                                    )?;
1517                                } else {
1518                                    // Entry is being dropped (superseded by newer entry or
1519                                    // pruned by tombstone). If it references a blob file,
1520                                    // mark that blob for deletion.
1521                                    if let IterValue::Blob { sequence_number } = &entry.value {
1522                                        blob_seq_numbers_to_delete.push(*sequence_number);
1523                                    }
1524                                }
1525                            }
1526
1527                            // Close remaining writers
1528                            used_collector.close_sst_file(&mut keys_written)?;
1529                            unused_collector.close_sst_file(&mut keys_written)?;
1530
1531                            let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1532                            new_sst_files.append(&mut used_collector.new_sst_files);
1533                            Ok(PartialMergeResult::Merged {
1534                                new_sst_files,
1535                                blob_seq_numbers_to_delete,
1536                                keys_written,
1537                                indices,
1538                            })
1539                        })
1540                        .with_context(|| {
1541                            format!("Failed to merge database files for family {family}")
1542                        })?;
1543
1544                    let Some((sst_files_len, blob_delete_len)) = merge_result
1545                        .iter()
1546                        .map(|r| {
1547                            if let PartialMergeResult::Merged {
1548                                new_sst_files,
1549                                blob_seq_numbers_to_delete,
1550                                indices: _,
1551                                keys_written: _,
1552                            } = r
1553                            {
1554                                (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1555                            } else {
1556                                (0, 0)
1557                            }
1558                        })
1559                        .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1560                    else {
1561                        unreachable!()
1562                    };
1563
1564                    let mut new_sst_files = Vec::with_capacity(sst_files_len);
1565                    let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1566
1567                    let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1568                    let mut meta_file_builder = MetaFileBuilder::new(family);
1569
1570                    let mut keys_written = 0;
1571                    self.parallel_scheduler.block_in_place(|| {
1572                        let guard = log_mutex.lock();
1573                        let mut log = self.open_log()?;
1574                        writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1575                        for result in merge_result {
1576                            match result {
1577                                PartialMergeResult::Merged {
1578                                    new_sst_files: merged_new_sst_files,
1579                                    blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1580                                    keys_written: merged_keys_written,
1581                                    indices,
1582                                } => {
1583                                    writeln!(
1584                                        log,
1585                                        "{family:3} | {meta_seq:08} | MERGE \
1586                                         ({merged_keys_written} keys):"
1587                                    )?;
1588                                    for i in indices.iter() {
1589                                        let seq = ssts_with_ranges[*i].seq;
1590                                        let (min, max) = ssts_with_ranges[*i].range().into_inner();
1591                                        writeln!(
1592                                            log,
1593                                            "{family:3} | {meta_seq:08} | {seq:08} INPUT  | {}",
1594                                            range_to_str(min, max)
1595                                        )?;
1596                                    }
1597                                    for (seq, file, meta) in merged_new_sst_files {
1598                                        let min = meta.min_hash;
1599                                        let max = meta.max_hash;
1600                                        writeln!(
1601                                            log,
1602                                            "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1603                                             ({})",
1604                                            range_to_str(min, max),
1605                                            meta.flags
1606                                        )?;
1607
1608                                        meta_file_builder.add(seq, meta);
1609                                        new_sst_files.push((seq, file));
1610                                    }
1611                                    blob_seq_numbers_to_delete
1612                                        .extend(merged_blob_seq_numbers_to_delete);
1613                                    keys_written += merged_keys_written;
1614                                }
1615                                PartialMergeResult::Move { seq, meta } => {
1616                                    let min = meta.min_hash;
1617                                    let max = meta.max_hash;
1618                                    writeln!(
1619                                        log,
1620                                        "{family:3} | {meta_seq:08} | {seq:08} MOVED  | {}",
1621                                        range_to_str(min, max)
1622                                    )?;
1623
1624                                    meta_file_builder.add(seq, meta);
1625                                }
1626                            }
1627                        }
1628                        drop(log);
1629                        drop(guard);
1630
1631                        anyhow::Ok(())
1632                    })?;
1633
1634                    for &seq in sst_seq_numbers_to_delete.iter() {
1635                        meta_file_builder.add_obsolete_sst_file(seq);
1636                    }
1637
1638                    let meta_file = {
1639                        let _span = tracing::trace_span!("write meta file").entered();
1640                        self.parallel_scheduler
1641                            .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1642                    };
1643
1644                    Ok(PartialResultPerFamily {
1645                        new_meta_file: Some((meta_seq, meta_file)),
1646                        new_sst_files,
1647                        sst_seq_numbers_to_delete,
1648                        blob_seq_numbers_to_delete,
1649                        keys_written,
1650                    })
1651                },
1652            )?;
1653
1654        for PartialResultPerFamily {
1655            new_meta_file: inner_new_meta_file,
1656            new_sst_files: mut inner_new_sst_files,
1657            sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1658            blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1659            keys_written: inner_keys_written,
1660        } in result
1661        {
1662            new_meta_files.extend(inner_new_meta_file);
1663            new_sst_files.append(&mut inner_new_sst_files);
1664            sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1665            blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1666            *keys_written += inner_keys_written;
1667        }
1668
1669        Ok(())
1670    }
1671
1672    /// Get a value from the database. Returns None if the key is not found. The returned value
1673    /// might hold onto a block of the database and it should not be hold long-term.
1674    pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcBytes>> {
1675        debug_assert!(family < FAMILIES, "Family index out of bounds");
1676        if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1677            // This is an error in our caller so just panic
1678            panic!(
1679                "only single valued tables can be queried with `get', call `get_multiple` instead"
1680            )
1681        }
1682        let span = tracing::trace_span!(
1683            "database read",
1684            name = self.config.family_configs[family].name,
1685            result_size = tracing::field::Empty
1686        )
1687        .entered();
1688        let results = self.get_impl::<K, false>(family, key, &span)?;
1689        debug_assert!(results.len() <= 1, "get() should return at most one result");
1690        Ok(results.into_iter().next())
1691    }
1692
1693    /// Looks up a key and returns all matching values.
1694    ///
1695    /// This is useful for keyspaces where keys are not unique and multiple mappings are possible.
1696    /// Unlike `get`, which returns only the first match, this method returns all
1697    /// entries with the same key from all SST files.  By default however we assume these
1698    /// collections are small and thus optimize for there being exactly 0 or 1 results.
1699    ///
1700    /// The order of returned values is undefined and duplicates are preserved. Callers must not
1701    /// rely on any particular ordering (neither insertion order nor byte order).
1702    pub fn get_multiple<K: QueryKey>(
1703        &self,
1704        family: usize,
1705        key: &K,
1706    ) -> Result<SmallVec<[ArcBytes; 1]>> {
1707        debug_assert!(family < FAMILIES, "Family index out of bounds");
1708        if self.config.family_configs[family].kind != FamilyKind::MultiValue {
1709            // This is an error in our caller so just panic
1710            panic!("only multi-valued tables can be queried with `get_multiple`")
1711        }
1712        let span = tracing::trace_span!(
1713            "database read multiple",
1714            name = self.config.family_configs[family].name,
1715            result_count = tracing::field::Empty,
1716            result_size = tracing::field::Empty
1717        )
1718        .entered();
1719        let results = self.get_impl::<K, true>(family, key, &span)?;
1720        Ok(results)
1721    }
1722
1723    /// Shared implementation for `get` and `get_multiple`.
1724    ///
1725    /// If `FIND_ALL` is false, stops after finding the first match.
1726    /// If `FIND_ALL` is true, continues to find all matches across all meta files.
1727    fn get_impl<K: QueryKey, const FIND_ALL: bool>(
1728        &self,
1729        family: usize,
1730        key: &K,
1731        span: &EnteredSpan,
1732    ) -> Result<SmallVec<[ArcBytes; 1]>> {
1733        let hash = hash_key(key);
1734        let inner = self.inner.read();
1735        let mut output: SmallVec<[ArcBytes; 1]> = SmallVec::new();
1736        // Track whether we found the key in any SST (even if deleted).
1737        // Used for miss_global stat: only fires if key was never found anywhere.
1738        #[cfg(feature = "stats")]
1739        let mut found_in_sst = false;
1740
1741        let mut size = 0;
1742
1743        for meta in inner.meta_files.iter().rev() {
1744            match meta.lookup::<K, FIND_ALL>(
1745                family as u32,
1746                hash,
1747                key,
1748                &self.key_block_cache,
1749                &self.value_block_cache,
1750            )? {
1751                MetaLookupResult::FamilyMiss => {
1752                    #[cfg(feature = "stats")]
1753                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1754                }
1755                MetaLookupResult::RangeMiss => {
1756                    #[cfg(feature = "stats")]
1757                    self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1758                }
1759                MetaLookupResult::QuickFilterMiss => {
1760                    #[cfg(feature = "stats")]
1761                    self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1762                }
1763                MetaLookupResult::SstLookup(result) => match result {
1764                    SstLookupResult::Found(values) => {
1765                        #[cfg(feature = "stats")]
1766                        {
1767                            found_in_sst = true;
1768                        }
1769                        inner.accessed_key_hashes[family].insert(hash);
1770                        // Process values. Tombstones sort last within a key group,
1771                        // so when we see a tombstone, we can return immediately.
1772                        for value in values {
1773                            match value {
1774                                LookupValue::Deleted => {
1775                                    #[cfg(feature = "stats")]
1776                                    self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1777                                    if !FIND_ALL {
1778                                        span.record("result_size", "deleted");
1779                                        return Ok(SmallVec::new());
1780                                    }
1781                                    // Tombstone is last in key group. Return accumulated
1782                                    // values (from this SST and newer layers). Stop
1783                                    // searching older SSTs.
1784                                    if output.is_empty() {
1785                                        span.record("result_size", "deleted");
1786                                    } else {
1787                                        span.record("result_size", size);
1788                                    }
1789                                    return Ok(output);
1790                                }
1791                                LookupValue::Slice { value } => {
1792                                    #[cfg(feature = "stats")]
1793                                    self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1794                                    if !FIND_ALL {
1795                                        span.record("result_size", value.len());
1796                                        return Ok(SmallVec::from_buf([value]));
1797                                    }
1798                                    size += value.len();
1799                                    output.push(value);
1800                                }
1801                                LookupValue::Blob { sequence_number } => {
1802                                    #[cfg(feature = "stats")]
1803                                    self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1804                                    let blob = self.read_blob(sequence_number)?;
1805                                    if !FIND_ALL {
1806                                        span.record("result_size", blob.len());
1807                                        return Ok(SmallVec::from_buf([blob]));
1808                                    }
1809                                    size += blob.len();
1810                                    output.push(blob);
1811                                }
1812                            }
1813                        }
1814                    }
1815                    SstLookupResult::NotFound => {
1816                        #[cfg(feature = "stats")]
1817                        self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1818                    }
1819                },
1820            }
1821        }
1822
1823        #[cfg(feature = "stats")]
1824        if !found_in_sst {
1825            self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1826        }
1827
1828        if FIND_ALL {
1829            span.record("result_count", output.len());
1830        }
1831        if output.is_empty() {
1832            span.record("result_size", "not_found");
1833        } else {
1834            span.record("result_size", size);
1835        }
1836        Ok(output)
1837    }
1838
1839    pub fn batch_get<K: QueryKey>(
1840        &self,
1841        family: usize,
1842        keys: &[K],
1843    ) -> Result<Vec<Option<ArcBytes>>> {
1844        debug_assert!(family < FAMILIES, "Family index out of bounds");
1845        if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1846            // This is an error in our caller so just panic
1847            panic!("only single valued tables can be queried with `batch_get'")
1848        }
1849        let span = tracing::trace_span!(
1850            "database batch read",
1851            name = self.config.family_configs[family].name,
1852            keys = keys.len(),
1853            not_found = tracing::field::Empty,
1854            deleted = tracing::field::Empty,
1855            result_size = tracing::field::Empty
1856        )
1857        .entered();
1858        let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
1859        let mut empty_cells = keys.len();
1860        for (index, key) in keys.iter().enumerate() {
1861            let hash = hash_key(key);
1862            cells.push((hash, index, None));
1863        }
1864        cells.sort_by_key(|(hash, _, _)| *hash);
1865        let inner = self.inner.read();
1866        for meta in inner.meta_files.iter().rev() {
1867            let _result = meta.batch_lookup(
1868                family as u32,
1869                keys,
1870                &mut cells,
1871                &mut empty_cells,
1872                &self.key_block_cache,
1873                &self.value_block_cache,
1874            )?;
1875
1876            #[cfg(feature = "stats")]
1877            {
1878                let crate::meta_file::MetaBatchLookupResult {
1879                    family_miss,
1880                    range_misses,
1881                    quick_filter_misses,
1882                    sst_misses,
1883                    hits: _,
1884                } = _result;
1885                if family_miss {
1886                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1887                }
1888                if range_misses > 0 {
1889                    self.stats
1890                        .miss_range
1891                        .fetch_add(range_misses as u64, Ordering::Relaxed);
1892                }
1893                if quick_filter_misses > 0 {
1894                    self.stats
1895                        .miss_amqf
1896                        .fetch_add(quick_filter_misses as u64, Ordering::Relaxed);
1897                }
1898                if sst_misses > 0 {
1899                    self.stats
1900                        .miss_key
1901                        .fetch_add(sst_misses as u64, Ordering::Relaxed);
1902                }
1903            }
1904
1905            if empty_cells == 0 {
1906                break;
1907            }
1908        }
1909        let mut deleted = 0;
1910        let mut not_found = 0;
1911        let mut result_size = 0;
1912        let mut results = vec![None; keys.len()];
1913        for (hash, index, result) in cells {
1914            if let Some(result) = result {
1915                inner.accessed_key_hashes[family].insert(hash);
1916                let result = match result {
1917                    LookupValue::Deleted => {
1918                        #[cfg(feature = "stats")]
1919                        self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1920                        deleted += 1;
1921                        None
1922                    }
1923                    LookupValue::Slice { value } => {
1924                        #[cfg(feature = "stats")]
1925                        self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1926                        result_size += value.len();
1927                        Some(value)
1928                    }
1929                    LookupValue::Blob { sequence_number } => {
1930                        #[cfg(feature = "stats")]
1931                        self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1932                        let blob = self.read_blob(sequence_number)?;
1933                        result_size += blob.len();
1934                        Some(blob)
1935                    }
1936                };
1937                results[index] = result;
1938            } else {
1939                #[cfg(feature = "stats")]
1940                self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1941                not_found += 1;
1942            }
1943        }
1944        span.record("not_found", not_found);
1945        span.record("deleted", deleted);
1946        span.record("result_size", result_size);
1947        Ok(results)
1948    }
1949
1950    /// Returns database statistics.
1951    #[cfg(feature = "stats")]
1952    pub fn statistics(&self) -> Statistics {
1953        let inner = self.inner.read();
1954        Statistics {
1955            meta_files: inner.meta_files.len(),
1956            sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1957            key_block_cache: CacheStatistics::new(&self.key_block_cache),
1958            value_block_cache: CacheStatistics::new(&self.value_block_cache),
1959            hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1960                + self.stats.hits_small.load(Ordering::Relaxed)
1961                + self.stats.hits_blob.load(Ordering::Relaxed),
1962            misses: self.stats.miss_global.load(Ordering::Relaxed),
1963            miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1964            miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1965            miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1966            miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1967        }
1968    }
1969
1970    pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1971        Ok(self
1972            .inner
1973            .read()
1974            .meta_files
1975            .iter()
1976            .rev()
1977            .map(|meta_file| {
1978                let entries = meta_file
1979                    .entries()
1980                    .iter()
1981                    .map(|entry| {
1982                        let amqf = entry.raw_amqf(meta_file.amqf_data());
1983                        MetaFileEntryInfo {
1984                            sequence_number: entry.sequence_number(),
1985                            min_hash: entry.min_hash(),
1986                            max_hash: entry.max_hash(),
1987                            sst_size: entry.size(),
1988                            flags: entry.flags(),
1989                            amqf_size: entry.amqf_size(),
1990                            amqf_entries: amqf.len(),
1991                            block_count: entry.block_count(),
1992                        }
1993                    })
1994                    .collect();
1995                MetaFileInfo {
1996                    sequence_number: meta_file.sequence_number(),
1997                    family: meta_file.family(),
1998                    obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1999                    entries,
2000                }
2001            })
2002            .collect())
2003    }
2004
2005    /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
2006    /// Retries deletion of all previously-deferred files and clears successfully deleted batches.
2007    pub fn shutdown(&self) -> Result<()> {
2008        #[cfg(feature = "print_stats")]
2009        println!("{:#?}", self.statistics());
2010        self.retry_deferred_deletions();
2011        Ok(())
2012    }
2013
2014    /// Attempts to delete files with the given extension, returning an iterator of sequence
2015    /// numbers for files that could not be deleted (e.g. due to open memory maps on Windows).
2016    fn try_delete_files<'a>(
2017        dir: &'a Path,
2018        seqs: &'a [u32],
2019        ext: &'a str,
2020    ) -> impl Iterator<Item = u32> + 'a {
2021        seqs.iter()
2022            .copied()
2023            .filter(move |&seq| fs::remove_file(dir.join(format!("{seq:08}.{ext}"))).is_err())
2024    }
2025
2026    /// Retries deletion of files that previously failed (typically due to open memory maps on
2027    /// Windows). Any file that still fails is kept for the next retry.
2028    /// Best-effort: persistent failures are acceptable because `load_directory` cleans up
2029    /// any leftover files on the next open via the `.del` file.
2030    fn retry_deferred_deletions(&self) {
2031        let mut deferred = self.deferred_deletions.lock();
2032        deferred.retain(|entry| {
2033            let (seq, ext) = match *entry {
2034                DeferredDeletion::Sst(seq) => (seq, "sst"),
2035                DeferredDeletion::Meta(seq) => (seq, "meta"),
2036                DeferredDeletion::Blob(seq) => (seq, "blob"),
2037            };
2038            // Keep the entry only if deletion still fails.
2039            fs::remove_file(self.path.join(format!("{seq:08}.{ext}"))).is_err()
2040        });
2041    }
2042}
2043
2044fn range_to_str(min: u64, max: u64) -> String {
2045    use std::fmt::Write;
2046    const DISPLAY_SIZE: usize = 100;
2047    const TOTAL_SIZE: u64 = u64::MAX;
2048    let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2049    let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
2050    let mut range_str = String::new();
2051    for i in 0..DISPLAY_SIZE {
2052        if i == start_pos && i == end_pos {
2053            range_str.push('O');
2054        } else if i == start_pos {
2055            range_str.push('[');
2056        } else if i == end_pos {
2057            range_str.push(']');
2058        } else if i > start_pos && i < end_pos {
2059            range_str.push('=');
2060        } else {
2061            range_str.push(' ');
2062        }
2063    }
2064    write!(range_str, " | {min:016x}-{max:016x}").unwrap();
2065    range_str
2066}
2067
2068pub struct MetaFileInfo {
2069    pub sequence_number: u32,
2070    pub family: u32,
2071    pub obsolete_sst_files: Vec<u32>,
2072    pub entries: Vec<MetaFileEntryInfo>,
2073}
2074
2075pub struct MetaFileEntryInfo {
2076    pub sequence_number: u32,
2077    pub min_hash: u64,
2078    pub max_hash: u64,
2079    pub amqf_size: u32,
2080    pub amqf_entries: usize,
2081    pub sst_size: u64,
2082    pub flags: MetaEntryFlags,
2083    pub block_count: u16,
2084}