Skip to main content

turbo_persistence/
db.rs

1use std::{
2    borrow::Cow,
3    collections::HashSet,
4    fs::{self, File, OpenOptions, ReadDir},
5    io::{BufWriter, Write},
6    mem::take,
7    ops::RangeInclusive,
8    path::{Path, PathBuf},
9    sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use dashmap::DashSet;
15use jiff::Timestamp;
16use memmap2::Mmap;
17use nohash_hasher::BuildNoHashHasher;
18use parking_lot::{Mutex, RwLock};
19use smallvec::SmallVec;
20use tracing::span::EnteredSpan;
21
22pub use crate::compaction::selector::CompactConfig;
23use crate::{
24    DbConfig, FamilyKind, QueryKey,
25    arc_bytes::ArcBytes,
26    compaction::selector::{Compactable, get_merge_segments},
27    compression::{checksum_block, decompress_into_arc},
28    constants::{
29        DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE,
30        MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE,
31    },
32    key::{StoreKey, hash_key},
33    lookup_entry::{LazyLookupValue, LookupEntry, LookupValue},
34    merge_iter::MergeIter,
35    meta_file::{MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
36    meta_file_builder::MetaFileBuilder,
37    mmap_helper::advise_mmap_for_persistence,
38    parallel_scheduler::ParallelScheduler,
39    sst_filter::SstFilter,
40    static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile},
41    static_sorted_file_builder::{StaticSortedFileBuilderMeta, StreamingSstWriter},
42    write_batch::{FinishResult, WriteBatch},
43};
44
45#[cfg(feature = "stats")]
46#[derive(Debug)]
47pub struct CacheStatistics {
48    pub hit_rate: f32,
49    pub fill: f32,
50    pub items: usize,
51    pub size: u64,
52    pub hits: u64,
53    pub misses: u64,
54}
55
56#[cfg(feature = "stats")]
57impl CacheStatistics {
58    fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
59    where
60        Key: Eq + std::hash::Hash,
61        Val: Clone,
62        We: quick_cache::Weighter<Key, Val> + Clone,
63        B: std::hash::BuildHasher + Clone,
64        L: quick_cache::Lifecycle<Key, Val> + Clone,
65    {
66        let size = cache.weight();
67        let hits = cache.hits();
68        let misses = cache.misses();
69        Self {
70            hit_rate: hits as f32 / (hits + misses) as f32,
71            fill: size as f32 / cache.capacity() as f32,
72            items: cache.len(),
73            size,
74            hits,
75            misses,
76        }
77    }
78}
79
80#[cfg(feature = "stats")]
81#[derive(Debug)]
82pub struct Statistics {
83    pub meta_files: usize,
84    pub sst_files: usize,
85    pub key_block_cache: CacheStatistics,
86    pub value_block_cache: CacheStatistics,
87    pub hits: u64,
88    pub misses: u64,
89    pub miss_family: u64,
90    pub miss_range: u64,
91    pub miss_amqf: u64,
92    pub miss_key: u64,
93}
94
95#[cfg(feature = "stats")]
96#[derive(Default)]
97struct TrackedStats {
98    hits_deleted: std::sync::atomic::AtomicU64,
99    hits_small: std::sync::atomic::AtomicU64,
100    hits_blob: std::sync::atomic::AtomicU64,
101    miss_family: std::sync::atomic::AtomicU64,
102    miss_range: std::sync::atomic::AtomicU64,
103    miss_amqf: std::sync::atomic::AtomicU64,
104    miss_key: std::sync::atomic::AtomicU64,
105    miss_global: std::sync::atomic::AtomicU64,
106}
107
108/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
109/// using a single write batch. It allows for concurrent reads.
110pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
111    parallel_scheduler: S,
112    /// The path to the directory where the database is stored
113    path: PathBuf,
114    /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
115    /// no modification on the database is performed.
116    read_only: bool,
117    /// The inner state of the database. Writing will update that.
118    inner: RwLock<Inner<FAMILIES>>,
119    /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent
120    /// write operations.
121    active_write_operation: AtomicBool,
122    /// A cache for decompressed key blocks.
123    key_block_cache: BlockCache,
124    /// A cache for decompressed value blocks.
125    value_block_cache: BlockCache,
126    /// Per-family configuration for file limits.
127    config: DbConfig<FAMILIES>,
128    /// Statistics for the database.
129    #[cfg(feature = "stats")]
130    stats: TrackedStats,
131}
132
133/// The inner state of the database.
134struct Inner<const FAMILIES: usize> {
135    /// The list of meta files in the database. This is used to derive the SST files.
136    meta_files: Vec<MetaFile>,
137    /// The current sequence number for the database.
138    current_sequence_number: u32,
139    /// The in progress set of hashes of keys that have been accessed.
140    /// It will be flushed onto disk (into a meta file) on next commit.
141    /// It's a dashset to allow modification while only tracking a read lock on Inner.
142    accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
143}
144
145pub struct CommitOptions {
146    new_meta_files: Vec<(u32, File)>,
147    new_sst_files: Vec<(u32, File)>,
148    new_blob_files: Vec<(u32, File)>,
149    sst_seq_numbers_to_delete: Vec<u32>,
150    blob_seq_numbers_to_delete: Vec<u32>,
151    sequence_number: u32,
152    keys_written: u64,
153}
154
155impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
156    /// Open a TurboPersistence database at the given path.
157    /// This will read the directory and might performance cleanup when the database was not closed
158    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
159    /// files, so it's fast.
160    pub fn open(path: PathBuf) -> Result<Self> {
161        Self::open_with_parallel_scheduler(path, Default::default())
162    }
163
164    /// Open a TurboPersistence database at the given path with custom per-family configuration.
165    pub fn open_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
166        Self::open_with_config_and_parallel_scheduler(path, config, Default::default())
167    }
168
169    /// Open a TurboPersistence database at the given path in read only mode.
170    /// This will read the directory. No Cleanup is performed.
171    pub fn open_read_only_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
172        Self::open_read_only_with_parallel_scheduler(path, config, Default::default())
173    }
174}
175
176impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
177    fn new(
178        path: PathBuf,
179        read_only: bool,
180        parallel_scheduler: S,
181        config: DbConfig<FAMILIES>,
182    ) -> Self {
183        Self {
184            parallel_scheduler,
185            path,
186            read_only,
187            inner: RwLock::new(Inner {
188                meta_files: Vec::new(),
189                current_sequence_number: 0,
190                accessed_key_hashes: [(); FAMILIES]
191                    .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
192            }),
193            active_write_operation: AtomicBool::new(false),
194            key_block_cache: BlockCache::with(
195                KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
196                KEY_BLOCK_CACHE_SIZE,
197                Default::default(),
198                Default::default(),
199                Default::default(),
200            ),
201            value_block_cache: BlockCache::with(
202                VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
203                VALUE_BLOCK_CACHE_SIZE,
204                Default::default(),
205                Default::default(),
206                Default::default(),
207            ),
208            config,
209            #[cfg(feature = "stats")]
210            stats: TrackedStats::default(),
211        }
212    }
213
214    /// Open a TurboPersistence database at the given path.
215    /// This will read the directory and might performance cleanup when the database was not closed
216    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
217    /// files, so it's fast.
218    pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
219        Self::open_with_config_and_parallel_scheduler(path, DbConfig::default(), parallel_scheduler)
220    }
221
222    /// Open a TurboPersistence database at the given path with custom per-family configuration.
223    pub fn open_with_config_and_parallel_scheduler(
224        path: PathBuf,
225        config: DbConfig<FAMILIES>,
226        parallel_scheduler: S,
227    ) -> Result<Self> {
228        let mut db = Self::new(path, false, parallel_scheduler, config);
229        db.open_directory(false)?;
230        Ok(db)
231    }
232
233    /// Open a TurboPersistence database at the given path in read only mode.
234    /// This will read the directory. No Cleanup is performed.
235    fn open_read_only_with_parallel_scheduler(
236        path: PathBuf,
237        config: DbConfig<FAMILIES>,
238        parallel_scheduler: S,
239    ) -> Result<Self> {
240        let mut db = Self::new(path, true, parallel_scheduler, config);
241        db.open_directory(false)?;
242        Ok(db)
243    }
244
245    /// Performs the initial check on the database directory.
246    fn open_directory(&mut self, read_only: bool) -> Result<()> {
247        match fs::read_dir(&self.path) {
248            Ok(entries) => {
249                if !self
250                    .load_directory(entries, read_only)
251                    .context("Loading persistence directory failed")?
252                {
253                    if read_only {
254                        bail!("Failed to open database");
255                    }
256                    self.init_directory()
257                        .context("Initializing persistence directory failed")?;
258                }
259                Ok(())
260            }
261            Err(e) => {
262                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
263                    self.create_and_init_directory()
264                        .context("Creating and initializing persistence directory failed")?;
265                    Ok(())
266                } else {
267                    Err(e).context("Failed to open database")
268                }
269            }
270        }
271    }
272
273    /// Creates the directory and initializes it.
274    fn create_and_init_directory(&mut self) -> Result<()> {
275        fs::create_dir_all(&self.path)?;
276        self.init_directory()
277    }
278
279    /// Initializes the directory by creating the CURRENT file.
280    fn init_directory(&mut self) -> Result<()> {
281        let mut current = File::create(self.path.join("CURRENT"))?;
282        current.write_u32::<BE>(0)?;
283        current.flush()?;
284        Ok(())
285    }
286
287    /// Loads an existing database directory and performs cleanup if necessary.
288    fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
289        let mut meta_files = Vec::new();
290        let mut current_file = match File::open(self.path.join("CURRENT")) {
291            Ok(file) => file,
292            Err(e) => {
293                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
294                    return Ok(false);
295                } else {
296                    return Err(e).context("Failed to open CURRENT file");
297                }
298            }
299        };
300        let current = current_file.read_u32::<BE>()?;
301        drop(current_file);
302
303        let mut deleted_files = HashSet::new();
304        for entry in entries {
305            let entry = entry?;
306            let path = entry.path();
307            if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
308                let seq: u32 = path
309                    .file_stem()
310                    .context("File has no file stem")?
311                    .to_str()
312                    .context("File stem is not valid utf-8")?
313                    .parse()?;
314                if deleted_files.contains(&seq) {
315                    continue;
316                }
317                if seq > current {
318                    if !read_only {
319                        fs::remove_file(&path)?;
320                    }
321                } else {
322                    match ext {
323                        "meta" => {
324                            meta_files.push(seq);
325                        }
326                        "del" => {
327                            let mut content = &*fs::read(&path)?;
328                            let mut no_existing_files = true;
329                            while !content.is_empty() {
330                                let seq = content.read_u32::<BE>()?;
331                                deleted_files.insert(seq);
332                                if !read_only {
333                                    // Remove the files that are marked for deletion
334                                    let sst_file = self.path.join(format!("{seq:08}.sst"));
335                                    let meta_file = self.path.join(format!("{seq:08}.meta"));
336                                    let blob_file = self.path.join(format!("{seq:08}.blob"));
337                                    for path in [sst_file, meta_file, blob_file] {
338                                        if fs::exists(&path)? {
339                                            fs::remove_file(path)?;
340                                            no_existing_files = false;
341                                        }
342                                    }
343                                }
344                            }
345                            if !read_only && no_existing_files {
346                                fs::remove_file(&path)?;
347                            }
348                        }
349                        "blob" | "sst" => {
350                            // ignore blobs and sst, they are read when needed
351                        }
352                        _ => {
353                            if !path
354                                .file_name()
355                                .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
356                            {
357                                bail!("Unexpected file in persistence directory: {:?}", path);
358                            }
359                        }
360                    }
361                }
362            } else {
363                match path.file_stem().and_then(|s| s.to_str()) {
364                    Some("CURRENT") => {
365                        // Already read
366                    }
367                    Some("LOG") => {
368                        // Ignored, write-only
369                    }
370                    _ => {
371                        if !path
372                            .file_name()
373                            .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
374                        {
375                            bail!("Unexpected file in persistence directory: {:?}", path);
376                        }
377                    }
378                }
379            }
380        }
381
382        meta_files.retain(|seq| !deleted_files.contains(seq));
383        meta_files.sort_unstable();
384        let mut meta_files = self
385            .parallel_scheduler
386            .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
387                let meta_file = MetaFile::open(&self.path, seq)?;
388                Ok(meta_file)
389            })?;
390
391        let mut sst_filter = SstFilter::new();
392        for meta_file in meta_files.iter_mut().rev() {
393            sst_filter.apply_filter(meta_file);
394        }
395
396        let inner = self.inner.get_mut();
397        inner.meta_files = meta_files;
398        inner.current_sequence_number = current;
399        Ok(true)
400    }
401
402    /// Reads and decompresses a blob file. This is not backed by any cache.
403    #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
404    fn read_blob(&self, seq: u32) -> Result<ArcBytes> {
405        let path = self.path.join(format!("{seq:08}.blob"));
406        let file = File::open(&path)
407            .with_context(|| format!("Failed to open blob file {}", path.display()))?;
408        let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
409            format!(
410                "Failed to mmap blob file {} ({} bytes)",
411                path.display(),
412                file.metadata().map(|m| m.len()).unwrap_or(0)
413            )
414        })?;
415        #[cfg(unix)]
416        mmap.advise(memmap2::Advice::Sequential)?;
417        #[cfg(unix)]
418        mmap.advise(memmap2::Advice::WillNeed)?;
419        advise_mmap_for_persistence(&mmap)?;
420        let mut reader = &mmap[..];
421        let uncompressed_length = reader
422            .read_u32::<BE>()
423            .context("Failed to read uncompressed length from blob file")?;
424        let expected_checksum = reader.read_u32::<BE>()?;
425
426        // Verify checksum on the compressed on-disk data before decompression.
427        let actual_checksum = checksum_block(reader);
428        if actual_checksum != expected_checksum {
429            bail!(
430                "Cache corruption detected: checksum mismatch in blob file {:08}.blob (expected \
431                 {:08x}, got {:08x})",
432                seq,
433                expected_checksum,
434                actual_checksum
435            );
436        }
437
438        let buffer = decompress_into_arc(uncompressed_length, reader)?;
439        Ok(ArcBytes::from(buffer))
440    }
441
442    /// Returns true if the database is empty.
443    pub fn is_empty(&self) -> bool {
444        self.inner.read().meta_files.is_empty()
445    }
446
447    /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a
448    /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
449    /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
450    /// This data will only become visible after the WriteBatch is committed.
451    pub fn write_batch<K: StoreKey + Send + Sync>(&self) -> Result<WriteBatch<K, S, FAMILIES>> {
452        if self.read_only {
453            bail!("Cannot write to a read-only database");
454        }
455        if self
456            .active_write_operation
457            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
458            .is_err()
459        {
460            bail!(
461                "Another write batch or compaction is already active (Only a single write \
462                 operations is allowed at a time)"
463            );
464        }
465        let current = self.inner.read().current_sequence_number;
466        Ok(WriteBatch::new(
467            self.path.clone(),
468            current,
469            self.parallel_scheduler.clone(),
470            self.config.family_configs,
471        ))
472    }
473
474    /// Clears all caches of the database.
475    pub fn clear_cache(&self) {
476        self.key_block_cache.clear();
477        self.value_block_cache.clear();
478        for meta in self.inner.write().meta_files.iter_mut() {
479            meta.clear_cache();
480        }
481    }
482
483    /// Clears block caches of the database.
484    pub fn clear_block_caches(&self) {
485        self.key_block_cache.clear();
486        self.value_block_cache.clear();
487    }
488
489    /// Prefetches all SST files which are usually lazy loaded. This can be used to reduce latency
490    /// for the first queries after opening the database.
491    pub fn prepare_all_sst_caches(&self) {
492        for meta in self.inner.write().meta_files.iter_mut() {
493            meta.prepare_sst_cache();
494        }
495    }
496
497    fn open_log(&self) -> Result<BufWriter<File>> {
498        if self.read_only {
499            unreachable!("Only write operations can open the log file");
500        }
501        let log_path = self.path.join("LOG");
502        let log_file = OpenOptions::new()
503            .create(true)
504            .append(true)
505            .open(log_path)?;
506        Ok(BufWriter::new(log_file))
507    }
508
509    /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
510    /// visible to readers.
511    pub fn commit_write_batch<K: StoreKey + Send + Sync>(
512        &self,
513        mut write_batch: WriteBatch<K, S, FAMILIES>,
514    ) -> Result<()> {
515        if self.read_only {
516            unreachable!("It's not possible to create a write batch for a read-only database");
517        }
518        let FinishResult {
519            sequence_number,
520            new_meta_files,
521            new_sst_files,
522            new_blob_files,
523            keys_written,
524        } = write_batch.finish(|family| {
525            let inner = self.inner.read();
526            let set = &inner.accessed_key_hashes[family as usize];
527            // len is only a snapshot at that time and it can change while we create the filter.
528            // So we give it 5% more space to make resizes less likely.
529            let initial_capacity = set.len() * 20 / 19;
530            let mut amqf =
531                qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
532                    .unwrap();
533            // This drains items from the set. But due to concurrency it might not be empty
534            // afterwards, but that's fine. It will be part of the next commit.
535            set.retain(|hash| {
536                // Performance-wise it would usually be better to insert sorted fingerprints, but we
537                // assume that hashes are equally distributed, which makes it unnecessary.
538                // Good for cache locality is that we insert in the order of the dashset's buckets.
539                amqf.insert_fingerprint(false, *hash)
540                    .expect("Failed to insert fingerprint");
541                false
542            });
543            amqf
544        })?;
545        self.commit(CommitOptions {
546            new_meta_files,
547            new_sst_files,
548            new_blob_files,
549            sst_seq_numbers_to_delete: vec![],
550            blob_seq_numbers_to_delete: vec![],
551            sequence_number,
552            keys_written,
553        })?;
554        self.active_write_operation.store(false, Ordering::Release);
555        Ok(())
556    }
557
558    /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the
559    /// new files.
560    fn commit(
561        &self,
562        CommitOptions {
563            mut new_meta_files,
564            new_sst_files,
565            mut new_blob_files,
566            mut sst_seq_numbers_to_delete,
567            mut blob_seq_numbers_to_delete,
568            sequence_number: mut seq,
569            keys_written,
570        }: CommitOptions,
571    ) -> Result<(), anyhow::Error> {
572        let time = Timestamp::now();
573
574        new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
575
576        let sync_span = tracing::info_span!("sync new files").entered();
577        let mut new_meta_files = self
578            .parallel_scheduler
579            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
580                file.sync_all()?;
581                let meta_file = MetaFile::open(&self.path, seq)?;
582                Ok(meta_file)
583            })?;
584
585        let mut sst_filter = SstFilter::new();
586        for meta_file in new_meta_files.iter_mut().rev() {
587            sst_filter.apply_filter(meta_file);
588        }
589
590        self.parallel_scheduler.block_in_place(|| {
591            for (_, file) in new_sst_files.iter() {
592                file.sync_all()?;
593            }
594            for (_, file) in new_blob_files.iter() {
595                file.sync_all()?;
596            }
597            anyhow::Ok(())
598        })?;
599        drop(sync_span);
600
601        let new_meta_info = new_meta_files
602            .iter()
603            .map(|meta| {
604                let ssts = meta
605                    .entries()
606                    .iter()
607                    .map(|entry| {
608                        let seq = entry.sequence_number();
609                        let range = entry.range();
610                        let size = entry.size();
611                        let flags = entry.flags();
612                        (seq, range.min_hash, range.max_hash, size, flags)
613                    })
614                    .collect::<Vec<_>>();
615                (
616                    meta.sequence_number(),
617                    meta.family(),
618                    ssts,
619                    meta.obsolete_sst_files().to_vec(),
620                )
621            })
622            .collect::<Vec<_>>();
623
624        let has_delete_file;
625        let mut meta_seq_numbers_to_delete = Vec::new();
626
627        {
628            let mut inner = self.inner.write();
629            for meta_file in inner.meta_files.iter_mut().rev() {
630                sst_filter.apply_filter(meta_file);
631            }
632            inner.meta_files.append(&mut new_meta_files);
633            // apply_and_get_remove need to run in reverse order
634            inner.meta_files.reverse();
635            inner.meta_files.retain(|meta| {
636                if sst_filter.apply_and_get_remove(meta) {
637                    meta_seq_numbers_to_delete.push(meta.sequence_number());
638                    false
639                } else {
640                    true
641                }
642            });
643            inner.meta_files.reverse();
644            has_delete_file = !sst_seq_numbers_to_delete.is_empty()
645                || !blob_seq_numbers_to_delete.is_empty()
646                || !meta_seq_numbers_to_delete.is_empty();
647            if has_delete_file {
648                seq += 1;
649            }
650            inner.current_sequence_number = seq;
651        }
652
653        self.parallel_scheduler.block_in_place(|| {
654            if has_delete_file {
655                sst_seq_numbers_to_delete.sort_unstable();
656                meta_seq_numbers_to_delete.sort_unstable();
657                blob_seq_numbers_to_delete.sort_unstable();
658                // Write *.del file, marking the selected files as to delete
659                let mut buf = Vec::with_capacity(
660                    (sst_seq_numbers_to_delete.len()
661                        + meta_seq_numbers_to_delete.len()
662                        + blob_seq_numbers_to_delete.len())
663                        * size_of::<u32>(),
664                );
665                for seq in sst_seq_numbers_to_delete.iter() {
666                    buf.write_u32::<BE>(*seq)?;
667                }
668                for seq in meta_seq_numbers_to_delete.iter() {
669                    buf.write_u32::<BE>(*seq)?;
670                }
671                for seq in blob_seq_numbers_to_delete.iter() {
672                    buf.write_u32::<BE>(*seq)?;
673                }
674                let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
675                file.write_all(&buf)?;
676                file.sync_all()?;
677            }
678
679            let mut current_file = OpenOptions::new()
680                .write(true)
681                .truncate(false)
682                .read(false)
683                .open(self.path.join("CURRENT"))?;
684            current_file.write_u32::<BE>(seq)?;
685            current_file.sync_all()?;
686
687            for seq in sst_seq_numbers_to_delete.iter() {
688                fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
689            }
690            for seq in meta_seq_numbers_to_delete.iter() {
691                fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
692            }
693            for seq in blob_seq_numbers_to_delete.iter() {
694                fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
695            }
696
697            {
698                let mut log = self.open_log()?;
699                writeln!(log, "Time {time}")?;
700                let span = time.until(Timestamp::now())?;
701                writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
702                writeln!(log, "FAM | META SEQ | SST SEQ         | RANGE")?;
703                for (meta_seq, family, ssts, obsolete) in new_meta_info {
704                    for (seq, min, max, size, flags) in ssts {
705                        writeln!(
706                            log,
707                            "{family:3} | {meta_seq:08} | {seq:08} SST    | {} ({} MiB, {})",
708                            range_to_str(min, max),
709                            size / 1024 / 1024,
710                            flags
711                        )?;
712                    }
713                    for obsolete in obsolete.chunks(15) {
714                        write!(log, "{family:3} | {meta_seq:08} |")?;
715                        for seq in obsolete {
716                            write!(log, " {seq:08}")?;
717                        }
718                        writeln!(log, " OBSOLETE SST")?;
719                    }
720                }
721
722                fn write_seq_numbers<W: std::io::Write, T, I>(
723                    log: &mut W,
724                    items: I,
725                    label: &str,
726                    extract_seq: fn(&T) -> u32,
727                ) -> std::io::Result<()>
728                where
729                    I: IntoIterator<Item = T>,
730                {
731                    let items: Vec<T> = items.into_iter().collect();
732                    for chunk in items.chunks(15) {
733                        write!(log, "    |          |")?;
734                        for item in chunk {
735                            write!(log, " {:08}", extract_seq(item))?;
736                        }
737                        writeln!(log, " {}", label)?;
738                    }
739                    Ok(())
740                }
741
742                new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
743                write_seq_numbers(&mut log, new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
744                write_seq_numbers(
745                    &mut log,
746                    blob_seq_numbers_to_delete,
747                    "BLOB DELETED",
748                    |&seq| seq,
749                )?;
750                write_seq_numbers(&mut log, sst_seq_numbers_to_delete, "SST DELETED", |&seq| {
751                    seq
752                })?;
753                write_seq_numbers(
754                    &mut log,
755                    meta_seq_numbers_to_delete,
756                    "META DELETED",
757                    |&seq| seq,
758                )?;
759                #[cfg(feature = "verbose_log")]
760                {
761                    writeln!(log, "New database state:")?;
762                    writeln!(log, "FAM | META SEQ | SST SEQ  FLAGS | RANGE")?;
763                    let inner = self.inner.read();
764                    let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
765                        let mut set = HashSet::new();
766                        move |family| set.insert(*family)
767                    });
768                    for family in families {
769                        for meta in inner.meta_files.iter() {
770                            if meta.family() != family {
771                                continue;
772                            }
773                            let meta_seq = meta.sequence_number();
774                            for entry in meta.entries().iter() {
775                                let seq = entry.sequence_number();
776                                let range = entry.range();
777                                writeln!(
778                                    log,
779                                    "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
780                                    entry.flags(),
781                                    range_to_str(range.min_hash, range.max_hash)
782                                )?;
783                            }
784                        }
785                    }
786                }
787            }
788            anyhow::Ok(())
789        })?;
790        Ok(())
791    }
792
793    /// Runs a full compaction on the database. This will rewrite all SST files, removing all
794    /// duplicate keys and separating all key ranges into unique files.
795    pub fn full_compact(&self) -> Result<()> {
796        self.compact(&CompactConfig {
797            min_merge_count: 2,
798            optimal_merge_count: usize::MAX,
799            max_merge_count: usize::MAX,
800            max_merge_bytes: u64::MAX,
801            min_merge_duplication_bytes: 0,
802            optimal_merge_duplication_bytes: u64::MAX,
803            max_merge_segment_count: usize::MAX,
804        })?;
805        Ok(())
806    }
807
808    /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
809    /// files is above the given threshold. The coverage is the average number of SST files that
810    /// need to be read to find a key. It also limits the maximum number of SST files that are
811    /// merged at once, which is the main factor for the runtime of the compaction.
812    pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
813        if self.read_only {
814            bail!("Compaction is not allowed on a read only database");
815        }
816        let _span = tracing::info_span!("compact database").entered();
817        if self
818            .active_write_operation
819            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
820            .is_err()
821        {
822            bail!(
823                "Another write batch or compaction is already active (Only a single write \
824                 operations is allowed at a time)"
825            );
826        }
827
828        // Free block caches and SST mmaps before compaction. The block caches
829        // are not used during compaction (we iterate uncached), and any cached
830        // SST mmaps would use MADV_RANDOM which is wrong for sequential scans.
831        // Clearing them upfront frees memory for the merge work.
832        self.clear_cache();
833
834        let mut sequence_number;
835        let mut new_meta_files = Vec::new();
836        let mut new_sst_files = Vec::new();
837        let mut sst_seq_numbers_to_delete = Vec::new();
838        let mut blob_seq_numbers_to_delete = Vec::new();
839        let mut keys_written = 0;
840
841        {
842            let inner = self.inner.read();
843            sequence_number = AtomicU32::new(inner.current_sequence_number);
844            self.compact_internal(
845                &inner.meta_files,
846                &sequence_number,
847                &mut new_meta_files,
848                &mut new_sst_files,
849                &mut sst_seq_numbers_to_delete,
850                &mut blob_seq_numbers_to_delete,
851                &mut keys_written,
852                compact_config,
853            )
854            .context("Failed to compact database")?;
855        }
856
857        let has_changes = !new_meta_files.is_empty();
858        if has_changes {
859            self.commit(CommitOptions {
860                new_meta_files,
861                new_sst_files,
862                new_blob_files: Vec::new(),
863                sst_seq_numbers_to_delete,
864                blob_seq_numbers_to_delete,
865                sequence_number: *sequence_number.get_mut(),
866                keys_written,
867            })
868            .context("Failed to commit the database compaction")?;
869        }
870
871        self.active_write_operation.store(false, Ordering::Release);
872
873        Ok(has_changes)
874    }
875
876    /// Internal function to perform a compaction.
877    fn compact_internal(
878        &self,
879        meta_files: &[MetaFile],
880        sequence_number: &AtomicU32,
881        new_meta_files: &mut Vec<(u32, File)>,
882        new_sst_files: &mut Vec<(u32, File)>,
883        sst_seq_numbers_to_delete: &mut Vec<u32>,
884        blob_seq_numbers_to_delete: &mut Vec<u32>,
885        keys_written: &mut u64,
886        compact_config: &CompactConfig,
887    ) -> Result<()> {
888        if meta_files.is_empty() {
889            return Ok(());
890        }
891
892        struct SstWithRange {
893            meta_index: usize,
894            index_in_meta: u32,
895            seq: u32,
896            range: StaticSortedFileRange,
897            size: u64,
898            flags: MetaEntryFlags,
899        }
900
901        impl Compactable for SstWithRange {
902            fn range(&self) -> RangeInclusive<u64> {
903                self.range.min_hash..=self.range.max_hash
904            }
905
906            fn size(&self) -> u64 {
907                self.size
908            }
909
910            fn category(&self) -> u8 {
911                // Cold and non-cold files are placed separately so we pass different category
912                // values to ensure they are not merged together.
913                if self.flags.cold() { 1 } else { 0 }
914            }
915        }
916
917        let ssts_with_ranges = meta_files
918            .iter()
919            .enumerate()
920            .flat_map(|(meta_index, meta)| {
921                meta.entries()
922                    .iter()
923                    .enumerate()
924                    .map(move |(index_in_meta, entry)| SstWithRange {
925                        meta_index,
926                        index_in_meta: index_in_meta as u32,
927                        seq: entry.sequence_number(),
928                        range: entry.range(),
929                        size: entry.size(),
930                        flags: entry.flags(),
931                    })
932            })
933            .collect::<Vec<_>>();
934
935        let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
936
937        for sst in ssts_with_ranges {
938            sst_by_family[sst.range.family as usize].push(sst);
939        }
940
941        let path = &self.path;
942
943        let log_mutex = Mutex::new(());
944
945        struct PartialResultPerFamily {
946            new_meta_file: Option<(u32, File)>,
947            new_sst_files: Vec<(u32, File)>,
948            sst_seq_numbers_to_delete: Vec<u32>,
949            blob_seq_numbers_to_delete: Vec<u32>,
950            keys_written: u64,
951        }
952
953        let mut compact_config = compact_config.clone();
954        let merge_jobs = sst_by_family
955            .into_iter()
956            .enumerate()
957            .filter_map(|(family, ssts_with_ranges)| {
958                if compact_config.max_merge_segment_count == 0 {
959                    return None;
960                }
961                let (merge_jobs, real_merge_job_size) =
962                    get_merge_segments(&ssts_with_ranges, &compact_config);
963                compact_config.max_merge_segment_count -= real_merge_job_size;
964                Some((family, ssts_with_ranges, merge_jobs))
965            })
966            .collect::<Vec<_>>();
967
968        let result = self
969            .parallel_scheduler
970            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
971                merge_jobs,
972                |(family, ssts_with_ranges, merge_jobs)| {
973                    let family = family as u32;
974
975                    if merge_jobs.is_empty() {
976                        return Ok(PartialResultPerFamily {
977                            new_meta_file: None,
978                            new_sst_files: Vec::new(),
979                            sst_seq_numbers_to_delete: Vec::new(),
980                            blob_seq_numbers_to_delete: Vec::new(),
981                            keys_written: 0,
982                        });
983                    }
984
985                    // Deserialize and merge used key hash filters per-family into
986                    // a single filter. This avoids O(entries × N) filter probes
987                    // during the merge loop. Empty filters (from commits with no
988                    // reads) are discarded.
989                    let used_key_hashes: Option<qfilter::Filter> = {
990                        let filters: Vec<qfilter::Filter> = meta_files
991                            .iter()
992                            .filter(|m| m.family() == family)
993                            .filter_map(|meta_file| {
994                                meta_file.deserialize_used_key_hashes_amqf().transpose()
995                            })
996                            .collect::<Result<Vec<_>>>()?
997                            .into_iter()
998                            .filter(|amqf| !amqf.is_empty())
999                            .collect();
1000                        if filters.is_empty() {
1001                            None
1002                        } else if filters.len() == 1 {
1003                            // Just directly use the single item
1004                            filters.into_iter().next()
1005                        } else {
1006                            let total_len: u64 = filters.iter().map(|f| f.len()).sum();
1007                            let mut merged =
1008                                qfilter::Filter::with_fingerprint_size(total_len, u64::BITS as u8)
1009                                    .expect("Failed to create merged AMQF filter");
1010                            for filter in &filters {
1011                                merged
1012                                    .merge(false, filter)
1013                                    .expect("Failed to merge AMQF filters");
1014                            }
1015                            merged.shrink_to_fit();
1016                            Some(merged)
1017                        }
1018                    };
1019
1020                    // Later we will remove the merged files
1021                    let sst_seq_numbers_to_delete = merge_jobs
1022                        .iter()
1023                        .filter(|l| l.len() > 1)
1024                        .flat_map(|l| l.iter().copied())
1025                        .map(|index| ssts_with_ranges[index].seq)
1026                        .collect::<Vec<_>>();
1027
1028                    // Merge SST files
1029                    let span = tracing::trace_span!("merge files");
1030                    enum PartialMergeResult<'l> {
1031                        Merged {
1032                            new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1033                            blob_seq_numbers_to_delete: Vec<u32>,
1034                            keys_written: u64,
1035                            indices: SmallVec<[usize; 1]>,
1036                        },
1037                        Move {
1038                            seq: u32,
1039                            meta: StaticSortedFileBuilderMeta<'l>,
1040                        },
1041                    }
1042                    let merge_result = self
1043                        .parallel_scheduler
1044                        .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
1045                            let _span = span.clone().entered();
1046                            if indices.len() == 1 {
1047                                // If we only have one file, we can just move it
1048                                let index = indices[0];
1049                                let meta_index = ssts_with_ranges[index].meta_index;
1050                                let index_in_meta = ssts_with_ranges[index].index_in_meta;
1051                                let meta_file = &meta_files[meta_index];
1052                                let entry = meta_file.entry(index_in_meta);
1053                                let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
1054                                let meta = StaticSortedFileBuilderMeta {
1055                                    min_hash: entry.min_hash(),
1056                                    max_hash: entry.max_hash(),
1057                                    amqf,
1058                                    block_count: entry.block_count(),
1059                                    size: entry.size(),
1060                                    flags: entry.flags(),
1061                                    entries: 0,
1062                                };
1063                                return Ok(PartialMergeResult::Move {
1064                                    seq: entry.sequence_number(),
1065                                    meta,
1066                                });
1067                            }
1068
1069                            // Open SST files independently for compaction.
1070                            // Uses MADV_SEQUENTIAL for better OS page management
1071                            // and avoids caching mmaps on MetaEntry's OnceLock.
1072                            let iters = indices
1073                                .iter()
1074                                .map(|&index| {
1075                                    let meta_index = ssts_with_ranges[index].meta_index;
1076                                    let index_in_meta = ssts_with_ranges[index].index_in_meta;
1077                                    let entry = meta_files[meta_index].entry(index_in_meta);
1078                                    StaticSortedFile::open_for_compaction(
1079                                        path,
1080                                        entry.sst_metadata(),
1081                                    )?
1082                                    .try_into_iter()
1083                                })
1084                                .collect::<Result<Vec<_>>>()?;
1085
1086                            let iter = MergeIter::new(iters.into_iter())?;
1087
1088                            // TODO figure out how to delete blobs when they are no longer
1089                            // referenced
1090                            let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1091
1092                            struct Collector {
1093                                /// The active writer and its sequence number. `None` if no
1094                                /// entries have been added since the last flush. We defer
1095                                /// allocation to avoid creating empty SST files for collectors
1096                                /// that receive no entries (e.g., the unused_collector when
1097                                /// all keys are in the
1098                                /// used set).
1099                                writer: Option<(u32, StreamingSstWriter<LookupEntry>)>,
1100                                flags: MetaEntryFlags,
1101                                new_sst_files:
1102                                    Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1103                                /// Hash of the last key added. Used to ensure we only split
1104                                /// SST files at key boundaries (not mid-key-group for MultiValue).
1105                                last_hash: Option<u64>,
1106                            }
1107                            impl Collector {
1108                                fn new(flags: MetaEntryFlags) -> Self {
1109                                    Self {
1110                                        writer: None,
1111                                        flags,
1112                                        new_sst_files: Vec::new(),
1113                                        last_hash: None,
1114                                    }
1115                                }
1116
1117                                /// Ensures a writer is open, creating one if needed.
1118                                fn ensure_writer(
1119                                    &mut self,
1120                                    path: &Path,
1121                                    sequence_number: &AtomicU32,
1122                                ) -> Result<&mut StreamingSstWriter<LookupEntry>>
1123                                {
1124                                    if self.writer.is_none() {
1125                                        let seq =
1126                                            sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1127                                        let sst_path = path.join(format!("{seq:08}.sst"));
1128                                        let writer = StreamingSstWriter::new(
1129                                            &sst_path,
1130                                            self.flags,
1131                                            MAX_ENTRIES_PER_COMPACTED_FILE as u64,
1132                                        )?;
1133                                        self.writer = Some((seq, writer));
1134                                    }
1135                                    Ok(&mut self.writer.as_mut().unwrap().1)
1136                                }
1137
1138                                /// Closes the current SST file (flushing remaining blocks and
1139                                /// writing the index) and records it in the completed files
1140                                /// list.
1141                                fn close_sst_file(&mut self, keys_written: &mut u64) -> Result<()> {
1142                                    if let Some((seq, writer)) = self.writer.take() {
1143                                        let _span =
1144                                            tracing::trace_span!("close merged sst file").entered();
1145                                        let (meta, file) = writer.close()?;
1146                                        *keys_written += meta.entries;
1147                                        self.new_sst_files.push((seq, file, meta));
1148                                    }
1149                                    Ok(())
1150                                }
1151
1152                                /// Adds an entry to the collector. Only splits the SST file at
1153                                /// key boundaries to avoid breaking key groups for MultiValue
1154                                /// families.
1155                                fn add_entry(
1156                                    &mut self,
1157                                    entry: LookupEntry,
1158                                    path: &Path,
1159                                    sequence_number: &AtomicU32,
1160                                    keys_written: &mut u64,
1161                                ) -> Result<()> {
1162                                    let key_changed = self.last_hash != Some(entry.hash);
1163                                    // Only check fullness at key boundaries to avoid splitting
1164                                    // a key group across two SST files.
1165                                    if key_changed
1166                                        && let Some((_, ref writer)) = self.writer
1167                                        && writer.is_full(
1168                                            MAX_ENTRIES_PER_COMPACTED_FILE,
1169                                            DATA_THRESHOLD_PER_COMPACTED_FILE,
1170                                        )
1171                                    {
1172                                        self.close_sst_file(keys_written)?;
1173                                    }
1174                                    self.last_hash = Some(entry.hash);
1175                                    let writer = self.ensure_writer(path, sequence_number)?;
1176                                    writer.add(entry)?;
1177                                    Ok(())
1178                                }
1179                            }
1180                            #[cfg(debug_assertions)]
1181                            impl Drop for Collector {
1182                                fn drop(&mut self) {
1183                                    if !std::thread::panicking() {
1184                                        assert!(
1185                                            self.writer.is_none(),
1186                                            "Collector dropped with an open writer"
1187                                        );
1188                                    }
1189                                }
1190                            }
1191                            let mut used_collector = Collector::new(MetaEntryFlags::WARM);
1192                            let mut unused_collector = Collector::new(MetaEntryFlags::COLD);
1193                            let mut current_key: Option<ArcBytes> = None;
1194                            let mut keys_written = 0;
1195
1196                            // MergeIter yields entries from newer SSTs first (by SST sequence
1197                            // number). Within each SST, tombstones sort last within key groups.
1198                            // Use a skip flag to handle:
1199                            // - SingleValue: skip all older entries after writing the first
1200                            // - MultiValue: skip all older entries after encountering a tombstone
1201                            //   (which signals deletion of all prior values for this key)
1202                            let mut skip_remaining_for_this_key = false;
1203                            let family_config = &self.config.family_configs[family as usize];
1204
1205                            for entry in iter {
1206                                let entry = entry?;
1207                                if current_key.as_ref() != Some(&entry.key) {
1208                                    // we changed keys so undo this flag
1209                                    skip_remaining_for_this_key = false;
1210                                    current_key = Some(entry.key.clone());
1211                                }
1212                                if !skip_remaining_for_this_key {
1213                                    let is_used = used_key_hashes
1214                                        .as_ref()
1215                                        .is_some_and(|amqf| amqf.contains_fingerprint(entry.hash));
1216                                    let collector = if is_used {
1217                                        &mut used_collector
1218                                    } else {
1219                                        &mut unused_collector
1220                                    };
1221                                    match family_config.kind {
1222                                        FamilyKind::MultiValue => {
1223                                            // For MultiValue families we only skip remaining if we
1224                                            // see a tombstone
1225                                            if matches!(
1226                                                entry.value,
1227                                                LazyLookupValue::Eager(LookupValue::Deleted)
1228                                            ) {
1229                                                skip_remaining_for_this_key = true;
1230                                            }
1231                                        }
1232                                        FamilyKind::SingleValue => {
1233                                            // Since MergeItr is in newest to oldest order anything
1234                                            // else that comes out must be skipped
1235                                            skip_remaining_for_this_key = true;
1236                                        }
1237                                    }
1238                                    collector.add_entry(
1239                                        entry,
1240                                        path,
1241                                        sequence_number,
1242                                        &mut keys_written,
1243                                    )?;
1244                                }
1245                            }
1246
1247                            // Close remaining writers
1248                            used_collector.close_sst_file(&mut keys_written)?;
1249                            unused_collector.close_sst_file(&mut keys_written)?;
1250
1251                            let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1252                            new_sst_files.append(&mut used_collector.new_sst_files);
1253                            Ok(PartialMergeResult::Merged {
1254                                new_sst_files,
1255                                blob_seq_numbers_to_delete,
1256                                keys_written,
1257                                indices,
1258                            })
1259                        })
1260                        .with_context(|| {
1261                            format!("Failed to merge database files for family {family}")
1262                        })?;
1263
1264                    let Some((sst_files_len, blob_delete_len)) = merge_result
1265                        .iter()
1266                        .map(|r| {
1267                            if let PartialMergeResult::Merged {
1268                                new_sst_files,
1269                                blob_seq_numbers_to_delete,
1270                                indices: _,
1271                                keys_written: _,
1272                            } = r
1273                            {
1274                                (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1275                            } else {
1276                                (0, 0)
1277                            }
1278                        })
1279                        .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1280                    else {
1281                        unreachable!()
1282                    };
1283
1284                    let mut new_sst_files = Vec::with_capacity(sst_files_len);
1285                    let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1286
1287                    let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1288                    let mut meta_file_builder = MetaFileBuilder::new(family);
1289
1290                    let mut keys_written = 0;
1291                    self.parallel_scheduler.block_in_place(|| {
1292                        let guard = log_mutex.lock();
1293                        let mut log = self.open_log()?;
1294                        writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1295                        for result in merge_result {
1296                            match result {
1297                                PartialMergeResult::Merged {
1298                                    new_sst_files: merged_new_sst_files,
1299                                    blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1300                                    keys_written: merged_keys_written,
1301                                    indices,
1302                                } => {
1303                                    writeln!(
1304                                        log,
1305                                        "{family:3} | {meta_seq:08} | MERGE \
1306                                         ({merged_keys_written} keys):"
1307                                    )?;
1308                                    for i in indices.iter() {
1309                                        let seq = ssts_with_ranges[*i].seq;
1310                                        let (min, max) = ssts_with_ranges[*i].range().into_inner();
1311                                        writeln!(
1312                                            log,
1313                                            "{family:3} | {meta_seq:08} | {seq:08} INPUT  | {}",
1314                                            range_to_str(min, max)
1315                                        )?;
1316                                    }
1317                                    for (seq, file, meta) in merged_new_sst_files {
1318                                        let min = meta.min_hash;
1319                                        let max = meta.max_hash;
1320                                        writeln!(
1321                                            log,
1322                                            "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1323                                             ({})",
1324                                            range_to_str(min, max),
1325                                            meta.flags
1326                                        )?;
1327
1328                                        meta_file_builder.add(seq, meta);
1329                                        new_sst_files.push((seq, file));
1330                                    }
1331                                    blob_seq_numbers_to_delete
1332                                        .extend(merged_blob_seq_numbers_to_delete);
1333                                    keys_written += merged_keys_written;
1334                                }
1335                                PartialMergeResult::Move { seq, meta } => {
1336                                    let min = meta.min_hash;
1337                                    let max = meta.max_hash;
1338                                    writeln!(
1339                                        log,
1340                                        "{family:3} | {meta_seq:08} | {seq:08} MOVED  | {}",
1341                                        range_to_str(min, max)
1342                                    )?;
1343
1344                                    meta_file_builder.add(seq, meta);
1345                                }
1346                            }
1347                        }
1348                        drop(log);
1349                        drop(guard);
1350
1351                        anyhow::Ok(())
1352                    })?;
1353
1354                    for &seq in sst_seq_numbers_to_delete.iter() {
1355                        meta_file_builder.add_obsolete_sst_file(seq);
1356                    }
1357
1358                    let meta_file = {
1359                        let _span = tracing::trace_span!("write meta file").entered();
1360                        self.parallel_scheduler
1361                            .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1362                    };
1363
1364                    Ok(PartialResultPerFamily {
1365                        new_meta_file: Some((meta_seq, meta_file)),
1366                        new_sst_files,
1367                        sst_seq_numbers_to_delete,
1368                        blob_seq_numbers_to_delete,
1369                        keys_written,
1370                    })
1371                },
1372            )?;
1373
1374        for PartialResultPerFamily {
1375            new_meta_file: inner_new_meta_file,
1376            new_sst_files: mut inner_new_sst_files,
1377            sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1378            blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1379            keys_written: inner_keys_written,
1380        } in result
1381        {
1382            new_meta_files.extend(inner_new_meta_file);
1383            new_sst_files.append(&mut inner_new_sst_files);
1384            sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1385            blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1386            *keys_written += inner_keys_written;
1387        }
1388
1389        Ok(())
1390    }
1391
1392    /// Get a value from the database. Returns None if the key is not found. The returned value
1393    /// might hold onto a block of the database and it should not be hold long-term.
1394    pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcBytes>> {
1395        debug_assert!(family < FAMILIES, "Family index out of bounds");
1396        if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1397            // This is an error in our caller so just panic
1398            panic!(
1399                "only single valued tables can be queried with `get', call `get_multiple` instead"
1400            )
1401        }
1402        let span = tracing::trace_span!(
1403            "database read",
1404            name = family,
1405            result_size = tracing::field::Empty
1406        )
1407        .entered();
1408        let results = self.get_impl::<K, false>(family, key, &span)?;
1409        debug_assert!(results.len() <= 1, "get() should return at most one result");
1410        Ok(results.into_iter().next())
1411    }
1412
1413    /// Looks up a key and returns all matching values.
1414    ///
1415    /// This is useful for keyspaces where keys are not unique and multiple mappings are possible.
1416    /// Unlike `get`, which returns only the first match, this method returns all
1417    /// entries with the same key from all SST files.  By default however we assume these
1418    /// collections are small and thus optimize for there being exactly 0 or 1 results.
1419    ///
1420    /// The order of returned values is undefined and duplicates are preserved. Callers must not
1421    /// rely on any particular ordering (neither insertion order nor byte order).
1422    pub fn get_multiple<K: QueryKey>(
1423        &self,
1424        family: usize,
1425        key: &K,
1426    ) -> Result<SmallVec<[ArcBytes; 1]>> {
1427        debug_assert!(family < FAMILIES, "Family index out of bounds");
1428        if self.config.family_configs[family].kind != FamilyKind::MultiValue {
1429            // This is an error in our caller so just panic
1430            panic!("only multi-valued tables can be queried with `get_multiple`")
1431        }
1432        let span = tracing::trace_span!(
1433            "database read multiple",
1434            name = family,
1435            result_count = tracing::field::Empty,
1436            result_size = tracing::field::Empty
1437        )
1438        .entered();
1439        let results = self.get_impl::<K, true>(family, key, &span)?;
1440        Ok(results)
1441    }
1442
1443    /// Shared implementation for `get` and `get_multiple`.
1444    ///
1445    /// If `FIND_ALL` is false, stops after finding the first match.
1446    /// If `FIND_ALL` is true, continues to find all matches across all meta files.
1447    fn get_impl<K: QueryKey, const FIND_ALL: bool>(
1448        &self,
1449        family: usize,
1450        key: &K,
1451        span: &EnteredSpan,
1452    ) -> Result<SmallVec<[ArcBytes; 1]>> {
1453        let hash = hash_key(key);
1454        let inner = self.inner.read();
1455        let mut output: SmallVec<[ArcBytes; 1]> = SmallVec::new();
1456        // Track whether we found the key in any SST (even if deleted).
1457        // Used for miss_global stat: only fires if key was never found anywhere.
1458        #[cfg(feature = "stats")]
1459        let mut found_in_sst = false;
1460
1461        let mut size = 0;
1462
1463        for meta in inner.meta_files.iter().rev() {
1464            match meta.lookup::<K, FIND_ALL>(
1465                family as u32,
1466                hash,
1467                key,
1468                &self.key_block_cache,
1469                &self.value_block_cache,
1470            )? {
1471                MetaLookupResult::FamilyMiss => {
1472                    #[cfg(feature = "stats")]
1473                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1474                }
1475                MetaLookupResult::RangeMiss => {
1476                    #[cfg(feature = "stats")]
1477                    self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1478                }
1479                MetaLookupResult::QuickFilterMiss => {
1480                    #[cfg(feature = "stats")]
1481                    self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1482                }
1483                MetaLookupResult::SstLookup(result) => match result {
1484                    SstLookupResult::Found(values) => {
1485                        #[cfg(feature = "stats")]
1486                        {
1487                            found_in_sst = true;
1488                        }
1489                        inner.accessed_key_hashes[family].insert(hash);
1490                        // Process values. Tombstones sort last within a key group,
1491                        // so when we see a tombstone, we can return immediately.
1492                        for value in values {
1493                            match value {
1494                                LookupValue::Deleted => {
1495                                    #[cfg(feature = "stats")]
1496                                    self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1497                                    if !FIND_ALL {
1498                                        span.record("result_size", "deleted");
1499                                        return Ok(SmallVec::new());
1500                                    }
1501                                    // Tombstone is last in key group. Return accumulated
1502                                    // values (from this SST and newer layers). Stop
1503                                    // searching older SSTs.
1504                                    if output.is_empty() {
1505                                        span.record("result_size", "deleted");
1506                                    } else {
1507                                        span.record("result_size", size);
1508                                    }
1509                                    return Ok(output);
1510                                }
1511                                LookupValue::Slice { value } => {
1512                                    #[cfg(feature = "stats")]
1513                                    self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1514                                    if !FIND_ALL {
1515                                        span.record("result_size", value.len());
1516                                        return Ok(SmallVec::from_buf([value]));
1517                                    }
1518                                    size += value.len();
1519                                    output.push(value);
1520                                }
1521                                LookupValue::Blob { sequence_number } => {
1522                                    #[cfg(feature = "stats")]
1523                                    self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1524                                    let blob = self.read_blob(sequence_number)?;
1525                                    if !FIND_ALL {
1526                                        span.record("result_size", blob.len());
1527                                        return Ok(SmallVec::from_buf([blob]));
1528                                    }
1529                                    size += blob.len();
1530                                    output.push(blob);
1531                                }
1532                            }
1533                        }
1534                    }
1535                    SstLookupResult::NotFound => {
1536                        #[cfg(feature = "stats")]
1537                        self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1538                    }
1539                },
1540            }
1541        }
1542
1543        #[cfg(feature = "stats")]
1544        if !found_in_sst {
1545            self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1546        }
1547
1548        if FIND_ALL {
1549            span.record("result_count", output.len());
1550        }
1551        if output.is_empty() {
1552            span.record("result_size", "not_found");
1553        } else {
1554            span.record("result_size", size);
1555        }
1556        Ok(output)
1557    }
1558
1559    pub fn batch_get<K: QueryKey>(
1560        &self,
1561        family: usize,
1562        keys: &[K],
1563    ) -> Result<Vec<Option<ArcBytes>>> {
1564        debug_assert!(family < FAMILIES, "Family index out of bounds");
1565        if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1566            // This is an error in our caller so just panic
1567            panic!("only single valued tables can be queried with `batch_get'")
1568        }
1569        let span = tracing::trace_span!(
1570            "database batch read",
1571            name = family,
1572            keys = keys.len(),
1573            not_found = tracing::field::Empty,
1574            deleted = tracing::field::Empty,
1575            result_size = tracing::field::Empty
1576        )
1577        .entered();
1578        let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
1579        let mut empty_cells = keys.len();
1580        for (index, key) in keys.iter().enumerate() {
1581            let hash = hash_key(key);
1582            cells.push((hash, index, None));
1583        }
1584        cells.sort_by_key(|(hash, _, _)| *hash);
1585        let inner = self.inner.read();
1586        for meta in inner.meta_files.iter().rev() {
1587            let _result = meta.batch_lookup(
1588                family as u32,
1589                keys,
1590                &mut cells,
1591                &mut empty_cells,
1592                &self.key_block_cache,
1593                &self.value_block_cache,
1594            )?;
1595
1596            #[cfg(feature = "stats")]
1597            {
1598                let crate::meta_file::MetaBatchLookupResult {
1599                    family_miss,
1600                    range_misses,
1601                    quick_filter_misses,
1602                    sst_misses,
1603                    hits: _,
1604                } = _result;
1605                if family_miss {
1606                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1607                }
1608                if range_misses > 0 {
1609                    self.stats
1610                        .miss_range
1611                        .fetch_add(range_misses as u64, Ordering::Relaxed);
1612                }
1613                if quick_filter_misses > 0 {
1614                    self.stats
1615                        .miss_amqf
1616                        .fetch_add(quick_filter_misses as u64, Ordering::Relaxed);
1617                }
1618                if sst_misses > 0 {
1619                    self.stats
1620                        .miss_key
1621                        .fetch_add(sst_misses as u64, Ordering::Relaxed);
1622                }
1623            }
1624
1625            if empty_cells == 0 {
1626                break;
1627            }
1628        }
1629        let mut deleted = 0;
1630        let mut not_found = 0;
1631        let mut result_size = 0;
1632        let mut results = vec![None; keys.len()];
1633        for (hash, index, result) in cells {
1634            if let Some(result) = result {
1635                inner.accessed_key_hashes[family].insert(hash);
1636                let result = match result {
1637                    LookupValue::Deleted => {
1638                        #[cfg(feature = "stats")]
1639                        self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1640                        deleted += 1;
1641                        None
1642                    }
1643                    LookupValue::Slice { value } => {
1644                        #[cfg(feature = "stats")]
1645                        self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1646                        result_size += value.len();
1647                        Some(value)
1648                    }
1649                    LookupValue::Blob { sequence_number } => {
1650                        #[cfg(feature = "stats")]
1651                        self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1652                        let blob = self.read_blob(sequence_number)?;
1653                        result_size += blob.len();
1654                        Some(blob)
1655                    }
1656                };
1657                results[index] = result;
1658            } else {
1659                #[cfg(feature = "stats")]
1660                self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1661                not_found += 1;
1662            }
1663        }
1664        span.record("not_found", not_found);
1665        span.record("deleted", deleted);
1666        span.record("result_size", result_size);
1667        Ok(results)
1668    }
1669
1670    /// Returns database statistics.
1671    #[cfg(feature = "stats")]
1672    pub fn statistics(&self) -> Statistics {
1673        let inner = self.inner.read();
1674        Statistics {
1675            meta_files: inner.meta_files.len(),
1676            sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1677            key_block_cache: CacheStatistics::new(&self.key_block_cache),
1678            value_block_cache: CacheStatistics::new(&self.value_block_cache),
1679            hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1680                + self.stats.hits_small.load(Ordering::Relaxed)
1681                + self.stats.hits_blob.load(Ordering::Relaxed),
1682            misses: self.stats.miss_global.load(Ordering::Relaxed),
1683            miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1684            miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1685            miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1686            miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1687        }
1688    }
1689
1690    pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1691        Ok(self
1692            .inner
1693            .read()
1694            .meta_files
1695            .iter()
1696            .rev()
1697            .map(|meta_file| {
1698                let entries = meta_file
1699                    .entries()
1700                    .iter()
1701                    .map(|entry| {
1702                        let amqf = entry.raw_amqf(meta_file.amqf_data());
1703                        MetaFileEntryInfo {
1704                            sequence_number: entry.sequence_number(),
1705                            min_hash: entry.min_hash(),
1706                            max_hash: entry.max_hash(),
1707                            sst_size: entry.size(),
1708                            flags: entry.flags(),
1709                            amqf_size: entry.amqf_size(),
1710                            amqf_entries: amqf.len(),
1711                            block_count: entry.block_count(),
1712                        }
1713                    })
1714                    .collect();
1715                MetaFileInfo {
1716                    sequence_number: meta_file.sequence_number(),
1717                    family: meta_file.family(),
1718                    obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1719                    entries,
1720                }
1721            })
1722            .collect())
1723    }
1724
1725    /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
1726    pub fn shutdown(&self) -> Result<()> {
1727        #[cfg(feature = "print_stats")]
1728        println!("{:#?}", self.statistics());
1729        Ok(())
1730    }
1731}
1732
1733fn range_to_str(min: u64, max: u64) -> String {
1734    use std::fmt::Write;
1735    const DISPLAY_SIZE: usize = 100;
1736    const TOTAL_SIZE: u64 = u64::MAX;
1737    let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1738    let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1739    let mut range_str = String::new();
1740    for i in 0..DISPLAY_SIZE {
1741        if i == start_pos && i == end_pos {
1742            range_str.push('O');
1743        } else if i == start_pos {
1744            range_str.push('[');
1745        } else if i == end_pos {
1746            range_str.push(']');
1747        } else if i > start_pos && i < end_pos {
1748            range_str.push('=');
1749        } else {
1750            range_str.push(' ');
1751        }
1752    }
1753    write!(range_str, " | {min:016x}-{max:016x}").unwrap();
1754    range_str
1755}
1756
1757pub struct MetaFileInfo {
1758    pub sequence_number: u32,
1759    pub family: u32,
1760    pub obsolete_sst_files: Vec<u32>,
1761    pub entries: Vec<MetaFileEntryInfo>,
1762}
1763
1764pub struct MetaFileEntryInfo {
1765    pub sequence_number: u32,
1766    pub min_hash: u64,
1767    pub max_hash: u64,
1768    pub amqf_size: u32,
1769    pub amqf_entries: usize,
1770    pub sst_size: u64,
1771    pub flags: MetaEntryFlags,
1772    pub block_count: u16,
1773}