turbo_persistence/
db.rs

1use std::{
2    any::{Any, TypeId},
3    borrow::Cow,
4    collections::HashSet,
5    fs::{self, File, OpenOptions, ReadDir},
6    io::{BufWriter, Write},
7    mem::{MaybeUninit, swap, transmute},
8    ops::RangeInclusive,
9    path::{Path, PathBuf},
10    sync::{
11        Arc,
12        atomic::{AtomicBool, AtomicU32, Ordering},
13    },
14};
15
16use anyhow::{Context, Result, bail};
17use byteorder::{BE, ReadBytesExt, WriteBytesExt};
18use jiff::Timestamp;
19use lzzzz::lz4::decompress;
20use memmap2::Mmap;
21use parking_lot::{Mutex, RwLock};
22use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
23use tracing::Span;
24
25pub use crate::compaction::selector::CompactConfig;
26use crate::{
27    QueryKey,
28    arc_slice::ArcSlice,
29    compaction::selector::{Compactable, compute_metrics, get_merge_segments},
30    constants::{
31        AQMF_AVG_SIZE, AQMF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
32        KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
33        VALUE_BLOCK_CACHE_SIZE,
34    },
35    key::{StoreKey, hash_key},
36    lookup_entry::{LookupEntry, LookupValue},
37    merge_iter::MergeIter,
38    meta_file::{AqmfCache, MetaFile, MetaLookupResult, StaticSortedFileRange},
39    meta_file_builder::MetaFileBuilder,
40    sst_filter::SstFilter,
41    static_sorted_file::{BlockCache, SstLookupResult},
42    static_sorted_file_builder::{StaticSortedFileBuilder, StaticSortedFileBuilderMeta},
43    write_batch::{FinishResult, WriteBatch},
44};
45
46#[cfg(feature = "stats")]
47#[derive(Debug)]
48pub struct CacheStatistics {
49    pub hit_rate: f32,
50    pub fill: f32,
51    pub items: usize,
52    pub size: u64,
53    pub hits: u64,
54    pub misses: u64,
55}
56
57#[cfg(feature = "stats")]
58impl CacheStatistics {
59    fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
60    where
61        Key: Eq + std::hash::Hash,
62        Val: Clone,
63        We: quick_cache::Weighter<Key, Val> + Clone,
64        B: std::hash::BuildHasher + Clone,
65        L: quick_cache::Lifecycle<Key, Val> + Clone,
66    {
67        let size = cache.weight();
68        let hits = cache.hits();
69        let misses = cache.misses();
70        Self {
71            hit_rate: hits as f32 / (hits + misses) as f32,
72            fill: size as f32 / cache.capacity() as f32,
73            items: cache.len(),
74            size,
75            hits,
76            misses,
77        }
78    }
79}
80
81#[cfg(feature = "stats")]
82#[derive(Debug)]
83pub struct Statistics {
84    pub meta_files: usize,
85    pub sst_files: usize,
86    pub key_block_cache: CacheStatistics,
87    pub value_block_cache: CacheStatistics,
88    pub aqmf_cache: CacheStatistics,
89    pub hits: u64,
90    pub misses: u64,
91    pub miss_family: u64,
92    pub miss_range: u64,
93    pub miss_aqmf: u64,
94    pub miss_key: u64,
95}
96
97#[cfg(feature = "stats")]
98#[derive(Default)]
99struct TrackedStats {
100    hits_deleted: std::sync::atomic::AtomicU64,
101    hits_small: std::sync::atomic::AtomicU64,
102    hits_blob: std::sync::atomic::AtomicU64,
103    miss_family: std::sync::atomic::AtomicU64,
104    miss_range: std::sync::atomic::AtomicU64,
105    miss_aqmf: std::sync::atomic::AtomicU64,
106    miss_key: std::sync::atomic::AtomicU64,
107    miss_global: std::sync::atomic::AtomicU64,
108}
109
110/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
111/// using a single write batch. It allows for concurrent reads.
112pub struct TurboPersistence {
113    /// The path to the directory where the database is stored
114    path: PathBuf,
115    /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
116    /// no modification on the database is performed.
117    read_only: bool,
118    /// The inner state of the database. Writing will update that.
119    inner: RwLock<Inner>,
120    /// A cache for the last WriteBatch. It is used to avoid reallocation of buffers for the
121    /// WriteBatch.
122    idle_write_batch: Mutex<Option<(TypeId, Box<dyn Any + Send + Sync>)>>,
123    /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent
124    /// write operations.
125    active_write_operation: AtomicBool,
126    /// A cache for deserialized AQMF filters.
127    aqmf_cache: AqmfCache,
128    /// A cache for decompressed key blocks.
129    key_block_cache: BlockCache,
130    /// A cache for decompressed value blocks.
131    value_block_cache: BlockCache,
132    /// Statistics for the database.
133    #[cfg(feature = "stats")]
134    stats: TrackedStats,
135}
136
137/// The inner state of the database.
138struct Inner {
139    /// The list of meta files in the database. This is used to derive the SST files.
140    meta_files: Vec<MetaFile>,
141    /// The current sequence number for the database.
142    current_sequence_number: u32,
143}
144
145pub struct CommitOptions {
146    new_meta_files: Vec<(u32, File)>,
147    new_sst_files: Vec<(u32, File)>,
148    new_blob_files: Vec<(u32, File)>,
149    sst_seq_numbers_to_delete: Vec<u32>,
150    blob_seq_numbers_to_delete: Vec<u32>,
151    sequence_number: u32,
152    keys_written: u64,
153}
154
155impl TurboPersistence {
156    fn new(path: PathBuf, read_only: bool) -> Self {
157        Self {
158            path,
159            read_only,
160            inner: RwLock::new(Inner {
161                meta_files: Vec::new(),
162                current_sequence_number: 0,
163            }),
164            idle_write_batch: Mutex::new(None),
165            active_write_operation: AtomicBool::new(false),
166            aqmf_cache: AqmfCache::with(
167                AQMF_CACHE_SIZE as usize / AQMF_AVG_SIZE,
168                AQMF_CACHE_SIZE,
169                Default::default(),
170                Default::default(),
171                Default::default(),
172            ),
173            key_block_cache: BlockCache::with(
174                KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
175                KEY_BLOCK_CACHE_SIZE,
176                Default::default(),
177                Default::default(),
178                Default::default(),
179            ),
180            value_block_cache: BlockCache::with(
181                VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
182                VALUE_BLOCK_CACHE_SIZE,
183                Default::default(),
184                Default::default(),
185                Default::default(),
186            ),
187            #[cfg(feature = "stats")]
188            stats: TrackedStats::default(),
189        }
190    }
191
192    /// Open a TurboPersistence database at the given path.
193    /// This will read the directory and might performance cleanup when the database was not closed
194    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
195    /// files, so it's fast.
196    pub fn open(path: PathBuf) -> Result<Self> {
197        let mut db = Self::new(path, false);
198        db.open_directory(false)?;
199        Ok(db)
200    }
201
202    /// Open a TurboPersistence database at the given path in read only mode.
203    /// This will read the directory. No Cleanup is performed.
204    pub fn open_read_only(path: PathBuf) -> Result<Self> {
205        let mut db = Self::new(path, true);
206        db.open_directory(false)?;
207        Ok(db)
208    }
209
210    /// Performs the initial check on the database directory.
211    fn open_directory(&mut self, read_only: bool) -> Result<()> {
212        match fs::read_dir(&self.path) {
213            Ok(entries) => {
214                if !self
215                    .load_directory(entries, read_only)
216                    .context("Loading persistence directory failed")?
217                {
218                    if read_only {
219                        bail!("Failed to open database");
220                    }
221                    self.init_directory()
222                        .context("Initializing persistence directory failed")?;
223                }
224                Ok(())
225            }
226            Err(e) => {
227                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
228                    self.create_and_init_directory()
229                        .context("Creating and initializing persistence directory failed")?;
230                    Ok(())
231                } else {
232                    Err(e).context("Failed to open database")
233                }
234            }
235        }
236    }
237
238    /// Creates the directory and initializes it.
239    fn create_and_init_directory(&mut self) -> Result<()> {
240        fs::create_dir_all(&self.path)?;
241        self.init_directory()
242    }
243
244    /// Initializes the directory by creating the CURRENT file.
245    fn init_directory(&mut self) -> Result<()> {
246        let mut current = File::create(self.path.join("CURRENT"))?;
247        current.write_u32::<BE>(0)?;
248        current.flush()?;
249        Ok(())
250    }
251
252    /// Loads an existing database directory and performs cleanup if necessary.
253    fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
254        let mut meta_files = Vec::new();
255        let mut current_file = match File::open(self.path.join("CURRENT")) {
256            Ok(file) => file,
257            Err(e) => {
258                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
259                    return Ok(false);
260                } else {
261                    return Err(e).context("Failed to open CURRENT file");
262                }
263            }
264        };
265        let current = current_file.read_u32::<BE>()?;
266        drop(current_file);
267
268        let mut deleted_files = HashSet::new();
269        for entry in entries {
270            let entry = entry?;
271            let path = entry.path();
272            if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
273                let seq: u32 = path
274                    .file_stem()
275                    .context("File has no file stem")?
276                    .to_str()
277                    .context("File stem is not valid utf-8")?
278                    .parse()?;
279                if deleted_files.contains(&seq) {
280                    continue;
281                }
282                if seq > current {
283                    if !read_only {
284                        fs::remove_file(&path)?;
285                    }
286                } else {
287                    match ext {
288                        "meta" => {
289                            meta_files.push(seq);
290                        }
291                        "del" => {
292                            let mut content = &*fs::read(&path)?;
293                            let mut no_existing_files = true;
294                            while !content.is_empty() {
295                                let seq = content.read_u32::<BE>()?;
296                                deleted_files.insert(seq);
297                                if !read_only {
298                                    // Remove the files that are marked for deletion
299                                    let sst_file = self.path.join(format!("{seq:08}.sst"));
300                                    let meta_file = self.path.join(format!("{seq:08}.meta"));
301                                    let blob_file = self.path.join(format!("{seq:08}.blob"));
302                                    for path in [sst_file, meta_file, blob_file] {
303                                        if fs::exists(&path)? {
304                                            fs::remove_file(path)?;
305                                            no_existing_files = false;
306                                        }
307                                    }
308                                }
309                            }
310                            if !read_only && no_existing_files {
311                                fs::remove_file(&path)?;
312                            }
313                        }
314                        "blob" | "sst" => {
315                            // ignore blobs and sst, they are read when needed
316                        }
317                        _ => {
318                            if !path
319                                .file_name()
320                                .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
321                            {
322                                bail!("Unexpected file in persistence directory: {:?}", path);
323                            }
324                        }
325                    }
326                }
327            } else {
328                match path.file_stem().and_then(|s| s.to_str()) {
329                    Some("CURRENT") => {
330                        // Already read
331                    }
332                    Some("LOG") => {
333                        // Ignored, write-only
334                    }
335                    _ => {
336                        if !path
337                            .file_name()
338                            .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
339                        {
340                            bail!("Unexpected file in persistence directory: {:?}", path);
341                        }
342                    }
343                }
344            }
345        }
346
347        meta_files.retain(|seq| !deleted_files.contains(seq));
348        meta_files.sort_unstable();
349        let span = Span::current();
350        let mut meta_files = meta_files
351            .into_par_iter()
352            .with_min_len(1)
353            .map(|seq| {
354                let _span = span.enter();
355                let meta_file = MetaFile::open(&self.path, seq)?;
356                Ok(meta_file)
357            })
358            .collect::<Result<Vec<MetaFile>>>()?;
359
360        let mut sst_filter = SstFilter::new();
361        for meta_file in meta_files.iter_mut().rev() {
362            sst_filter.apply_filter(meta_file);
363        }
364
365        let inner = self.inner.get_mut();
366        inner.meta_files = meta_files;
367        inner.current_sequence_number = current;
368        Ok(true)
369    }
370
371    /// Reads and decompresses a blob file. This is not backed by any cache.
372    fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
373        let path = self.path.join(format!("{seq:08}.blob"));
374        let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
375        #[cfg(unix)]
376        mmap.advise(memmap2::Advice::Sequential)?;
377        #[cfg(unix)]
378        mmap.advise(memmap2::Advice::WillNeed)?;
379        #[cfg(target_os = "linux")]
380        mmap.advise(memmap2::Advice::DontFork)?;
381        #[cfg(target_os = "linux")]
382        mmap.advise(memmap2::Advice::Unmergeable)?;
383        let mut compressed = &mmap[..];
384        let uncompressed_length = compressed.read_u32::<BE>()? as usize;
385
386        let buffer = Arc::new_zeroed_slice(uncompressed_length);
387        // Safety: MaybeUninit<u8> can be safely transmuted to u8.
388        let mut buffer = unsafe { transmute::<Arc<[MaybeUninit<u8>]>, Arc<[u8]>>(buffer) };
389        // Safety: We know that the buffer is not shared yet.
390        let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
391        decompress(compressed, decompressed)?;
392        Ok(ArcSlice::from(buffer))
393    }
394
395    /// Returns true if the database is empty.
396    pub fn is_empty(&self) -> bool {
397        self.inner.read().meta_files.is_empty()
398    }
399
400    /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a
401    /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
402    /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
403    /// This data will only become visible after the WriteBatch is committed.
404    pub fn write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
405        &self,
406    ) -> Result<WriteBatch<K, FAMILIES>> {
407        if self.read_only {
408            bail!("Cannot write to a read-only database");
409        }
410        if self
411            .active_write_operation
412            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
413            .is_err()
414        {
415            bail!(
416                "Another write batch or compaction is already active (Only a single write \
417                 operations is allowed at a time)"
418            );
419        }
420        let current = self.inner.read().current_sequence_number;
421        if let Some((ty, any)) = self.idle_write_batch.lock().take()
422            && ty == TypeId::of::<WriteBatch<K, FAMILIES>>()
423        {
424            let mut write_batch = *any.downcast::<WriteBatch<K, FAMILIES>>().unwrap();
425            write_batch.reset(current);
426            return Ok(write_batch);
427        }
428        Ok(WriteBatch::new(self.path.clone(), current))
429    }
430
431    fn open_log(&self) -> Result<BufWriter<File>> {
432        if self.read_only {
433            unreachable!("Only write operations can open the log file");
434        }
435        let log_path = self.path.join("LOG");
436        let log_file = OpenOptions::new()
437            .create(true)
438            .append(true)
439            .open(log_path)?;
440        Ok(BufWriter::new(log_file))
441    }
442
443    /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
444    /// visible to readers.
445    pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
446        &self,
447        mut write_batch: WriteBatch<K, FAMILIES>,
448    ) -> Result<()> {
449        if self.read_only {
450            unreachable!("It's not possible to create a write batch for a read-only database");
451        }
452        let FinishResult {
453            sequence_number,
454            new_meta_files,
455            new_sst_files,
456            new_blob_files,
457            keys_written,
458        } = write_batch.finish()?;
459        self.commit(CommitOptions {
460            new_meta_files,
461            new_sst_files,
462            new_blob_files,
463            sst_seq_numbers_to_delete: vec![],
464            blob_seq_numbers_to_delete: vec![],
465            sequence_number,
466            keys_written,
467        })?;
468        self.active_write_operation.store(false, Ordering::Release);
469        self.idle_write_batch.lock().replace((
470            TypeId::of::<WriteBatch<K, FAMILIES>>(),
471            Box::new(write_batch),
472        ));
473        Ok(())
474    }
475
476    /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the
477    /// new files.
478    fn commit(
479        &self,
480        CommitOptions {
481            mut new_meta_files,
482            mut new_sst_files,
483            mut new_blob_files,
484            mut sst_seq_numbers_to_delete,
485            mut blob_seq_numbers_to_delete,
486            sequence_number: mut seq,
487            keys_written,
488        }: CommitOptions,
489    ) -> Result<(), anyhow::Error> {
490        let time = Timestamp::now();
491
492        new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
493
494        let mut new_meta_files = new_meta_files
495            .into_par_iter()
496            .with_min_len(1)
497            .map(|(seq, file)| {
498                file.sync_all()?;
499                let meta_file = MetaFile::open(&self.path, seq)?;
500                Ok(meta_file)
501            })
502            .collect::<Result<Vec<_>>>()?;
503
504        let mut sst_filter = SstFilter::new();
505        for meta_file in new_meta_files.iter_mut().rev() {
506            sst_filter.apply_filter(meta_file);
507        }
508
509        for (_, file) in new_sst_files.iter() {
510            file.sync_all()?;
511        }
512        for (_, file) in new_blob_files.iter() {
513            file.sync_all()?;
514        }
515
516        let new_meta_info = new_meta_files
517            .iter()
518            .map(|meta| {
519                let ssts = meta
520                    .entries()
521                    .iter()
522                    .map(|entry| {
523                        let seq = entry.sequence_number();
524                        let range = entry.range();
525                        let size = entry.size();
526                        (seq, range.min_hash, range.max_hash, size)
527                    })
528                    .collect::<Vec<_>>();
529                (
530                    meta.sequence_number(),
531                    meta.family(),
532                    ssts,
533                    meta.obsolete_sst_files().to_vec(),
534                )
535            })
536            .collect::<Vec<_>>();
537
538        let has_delete_file;
539        let mut meta_seq_numbers_to_delete = Vec::new();
540
541        {
542            let mut inner = self.inner.write();
543            for meta_file in inner.meta_files.iter_mut().rev() {
544                sst_filter.apply_filter(meta_file);
545            }
546            inner.meta_files.append(&mut new_meta_files);
547            // apply_and_get_remove need to run in reverse order
548            inner.meta_files.reverse();
549            inner.meta_files.retain(|meta| {
550                if sst_filter.apply_and_get_remove(meta) {
551                    meta_seq_numbers_to_delete.push(meta.sequence_number());
552                    false
553                } else {
554                    true
555                }
556            });
557            inner.meta_files.reverse();
558            has_delete_file = !sst_seq_numbers_to_delete.is_empty()
559                || !blob_seq_numbers_to_delete.is_empty()
560                || !meta_seq_numbers_to_delete.is_empty();
561            if has_delete_file {
562                seq += 1;
563            }
564            inner.current_sequence_number = seq;
565        }
566
567        if has_delete_file {
568            sst_seq_numbers_to_delete.sort_unstable();
569            meta_seq_numbers_to_delete.sort_unstable();
570            blob_seq_numbers_to_delete.sort_unstable();
571            // Write *.del file, marking the selected files as to delete
572            let mut buf = Vec::with_capacity(
573                (sst_seq_numbers_to_delete.len()
574                    + meta_seq_numbers_to_delete.len()
575                    + blob_seq_numbers_to_delete.len())
576                    * size_of::<u32>(),
577            );
578            for seq in sst_seq_numbers_to_delete.iter() {
579                buf.write_u32::<BE>(*seq)?;
580            }
581            for seq in meta_seq_numbers_to_delete.iter() {
582                buf.write_u32::<BE>(*seq)?;
583            }
584            for seq in blob_seq_numbers_to_delete.iter() {
585                buf.write_u32::<BE>(*seq)?;
586            }
587            let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
588            file.write_all(&buf)?;
589            file.sync_all()?;
590        }
591
592        let mut current_file = OpenOptions::new()
593            .write(true)
594            .truncate(false)
595            .read(false)
596            .open(self.path.join("CURRENT"))?;
597        current_file.write_u32::<BE>(seq)?;
598        current_file.sync_all()?;
599
600        for seq in sst_seq_numbers_to_delete.iter() {
601            fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
602        }
603        for seq in meta_seq_numbers_to_delete.iter() {
604            fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
605        }
606        for seq in blob_seq_numbers_to_delete.iter() {
607            fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
608        }
609
610        {
611            let mut log = self.open_log()?;
612            writeln!(log, "Time {time}")?;
613            let span = time.until(Timestamp::now())?;
614            writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
615            for (seq, family, ssts, obsolete) in new_meta_info {
616                writeln!(log, "{seq:08} META family:{family}",)?;
617                for (seq, min, max, size) in ssts {
618                    writeln!(
619                        log,
620                        "  {seq:08} SST  {min:016x}-{max:016x} {} MiB",
621                        size / 1024 / 1024
622                    )?;
623                }
624                for seq in obsolete {
625                    writeln!(log, "  {seq:08} OBSOLETE SST")?;
626                }
627            }
628            new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
629            for (seq, _) in new_sst_files.iter() {
630                writeln!(log, "{seq:08} NEW SST")?;
631            }
632            new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
633            for (seq, _) in new_blob_files.iter() {
634                writeln!(log, "{seq:08} NEW BLOB")?;
635            }
636            for seq in sst_seq_numbers_to_delete.iter() {
637                writeln!(log, "{seq:08} SST DELETED")?;
638            }
639            for seq in meta_seq_numbers_to_delete.iter() {
640                writeln!(log, "{seq:08} META DELETED")?;
641            }
642            for seq in blob_seq_numbers_to_delete.iter() {
643                writeln!(log, "{seq:08} BLOB DELETED")?;
644            }
645        }
646
647        Ok(())
648    }
649
650    /// Runs a full compaction on the database. This will rewrite all SST files, removing all
651    /// duplicate keys and separating all key ranges into unique files.
652    pub fn full_compact(&self) -> Result<()> {
653        self.compact(&CompactConfig {
654            min_merge_count: 2,
655            optimal_merge_count: usize::MAX,
656            max_merge_count: usize::MAX,
657            max_merge_bytes: u64::MAX,
658            min_merge_duplication_bytes: 0,
659            optimal_merge_duplication_bytes: u64::MAX,
660            max_merge_segment_count: usize::MAX,
661        })?;
662        Ok(())
663    }
664
665    /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
666    /// files is above the given threshold. The coverage is the average number of SST files that
667    /// need to be read to find a key. It also limits the maximum number of SST files that are
668    /// merged at once, which is the main factor for the runtime of the compaction.
669    pub fn compact(&self, compact_config: &CompactConfig) -> Result<()> {
670        if self.read_only {
671            bail!("Compaction is not allowed on a read only database");
672        }
673        let _span = tracing::info_span!("compact database").entered();
674        if self
675            .active_write_operation
676            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
677            .is_err()
678        {
679            bail!(
680                "Another write batch or compaction is already active (Only a single write \
681                 operations is allowed at a time)"
682            );
683        }
684
685        let mut sequence_number;
686        let mut new_meta_files = Vec::new();
687        let mut new_sst_files = Vec::new();
688        let mut sst_seq_numbers_to_delete = Vec::new();
689        let mut blob_seq_numbers_to_delete = Vec::new();
690        let mut keys_written = 0;
691
692        {
693            let inner = self.inner.read();
694            sequence_number = AtomicU32::new(inner.current_sequence_number);
695            self.compact_internal(
696                &inner.meta_files,
697                &sequence_number,
698                &mut new_meta_files,
699                &mut new_sst_files,
700                &mut sst_seq_numbers_to_delete,
701                &mut blob_seq_numbers_to_delete,
702                &mut keys_written,
703                compact_config,
704            )
705            .context("Failed to compact database")?;
706        }
707
708        if !new_meta_files.is_empty() {
709            self.commit(CommitOptions {
710                new_meta_files,
711                new_sst_files,
712                new_blob_files: Vec::new(),
713                sst_seq_numbers_to_delete,
714                blob_seq_numbers_to_delete,
715                sequence_number: *sequence_number.get_mut(),
716                keys_written,
717            })
718            .context("Failed to commit the database compaction")?;
719        }
720
721        self.active_write_operation.store(false, Ordering::Release);
722
723        Ok(())
724    }
725
726    /// Internal function to perform a compaction.
727    fn compact_internal(
728        &self,
729        meta_files: &[MetaFile],
730        sequence_number: &AtomicU32,
731        new_meta_files: &mut Vec<(u32, File)>,
732        new_sst_files: &mut Vec<(u32, File)>,
733        sst_seq_numbers_to_delete: &mut Vec<u32>,
734        blob_seq_numbers_to_delete: &mut Vec<u32>,
735        keys_written: &mut u64,
736        compact_config: &CompactConfig,
737    ) -> Result<()> {
738        if meta_files.is_empty() {
739            return Ok(());
740        }
741
742        struct SstWithRange {
743            meta_index: usize,
744            index_in_meta: u32,
745            seq: u32,
746            range: StaticSortedFileRange,
747            size: u64,
748        }
749
750        impl Compactable for SstWithRange {
751            fn range(&self) -> RangeInclusive<u64> {
752                self.range.min_hash..=self.range.max_hash
753            }
754
755            fn size(&self) -> u64 {
756                self.size
757            }
758        }
759
760        let ssts_with_ranges = meta_files
761            .iter()
762            .enumerate()
763            .flat_map(|(meta_index, meta)| {
764                meta.entries()
765                    .iter()
766                    .enumerate()
767                    .map(move |(index_in_meta, entry)| SstWithRange {
768                        meta_index,
769                        index_in_meta: index_in_meta as u32,
770                        seq: entry.sequence_number(),
771                        range: entry.range(),
772                        size: entry.size(),
773                    })
774            })
775            .collect::<Vec<_>>();
776
777        let families = ssts_with_ranges
778            .iter()
779            .map(|s| s.range.family)
780            .max()
781            .unwrap() as usize
782            + 1;
783
784        let mut sst_by_family = Vec::with_capacity(families);
785        sst_by_family.resize_with(families, Vec::new);
786
787        for sst in ssts_with_ranges {
788            sst_by_family[sst.range.family as usize].push(sst);
789        }
790
791        let key_block_cache = &self.key_block_cache;
792        let value_block_cache = &self.value_block_cache;
793        let path = &self.path;
794
795        let log_mutex = Mutex::new(());
796        let span = Span::current();
797
798        struct PartialResultPerFamily {
799            new_meta_file: Option<(u32, File)>,
800            new_sst_files: Vec<(u32, File)>,
801            sst_seq_numbers_to_delete: Vec<u32>,
802            blob_seq_numbers_to_delete: Vec<u32>,
803            keys_written: u64,
804        }
805
806        let result = sst_by_family
807            .into_par_iter()
808            .with_min_len(1)
809            .enumerate()
810            .map(|(family, ssts_with_ranges)| {
811                let family = family as u32;
812                let _span = span.clone().entered();
813
814                let merge_jobs = get_merge_segments(&ssts_with_ranges, compact_config);
815
816                if merge_jobs.is_empty() {
817                    return Ok(PartialResultPerFamily {
818                        new_meta_file: None,
819                        new_sst_files: Vec::new(),
820                        sst_seq_numbers_to_delete: Vec::new(),
821                        blob_seq_numbers_to_delete: Vec::new(),
822                        keys_written: 0,
823                    });
824                }
825
826                {
827                    let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
828                    let guard = log_mutex.lock();
829                    let mut log = self.open_log()?;
830                    writeln!(
831                        log,
832                        "Compaction for family {family} (coverage: {}, overlap: {}, duplication: \
833                         {} / {} MiB):",
834                        metrics.coverage,
835                        metrics.overlap,
836                        metrics.duplication,
837                        metrics.duplicated_size / 1024 / 1024
838                    )?;
839                    for job in merge_jobs.iter() {
840                        writeln!(log, "  merge")?;
841                        for i in job.iter() {
842                            let seq = ssts_with_ranges[*i].seq;
843                            let (min, max) = ssts_with_ranges[*i].range().into_inner();
844                            writeln!(log, "    {seq:08} {min:016x}-{max:016x}")?;
845                        }
846                    }
847                    drop(guard);
848                }
849
850                // Later we will remove the merged files
851                let sst_seq_numbers_to_delete = merge_jobs
852                    .iter()
853                    .filter(|l| l.len() > 1)
854                    .flat_map(|l| l.iter().copied())
855                    .map(|index| ssts_with_ranges[index].seq)
856                    .collect::<Vec<_>>();
857
858                // Merge SST files
859                let span = tracing::trace_span!("merge files");
860                enum PartialMergeResult<'l> {
861                    Merged {
862                        new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
863                        blob_seq_numbers_to_delete: Vec<u32>,
864                        keys_written: u64,
865                    },
866                    Move {
867                        seq: u32,
868                        meta: StaticSortedFileBuilderMeta<'l>,
869                    },
870                }
871                let merge_result = merge_jobs
872                    .into_par_iter()
873                    .with_min_len(1)
874                    .map(|indices| {
875                        let _span = span.clone().entered();
876                        if indices.len() == 1 {
877                            // If we only have one file, we can just move it
878                            let index = indices[0];
879                            let meta_index = ssts_with_ranges[index].meta_index;
880                            let index_in_meta = ssts_with_ranges[index].index_in_meta;
881                            let meta_file = &meta_files[meta_index];
882                            let entry = meta_file.entry(index_in_meta);
883                            let aqmf = Cow::Borrowed(entry.raw_aqmf(meta_file.aqmf_data()));
884                            let meta = StaticSortedFileBuilderMeta {
885                                min_hash: entry.min_hash(),
886                                max_hash: entry.max_hash(),
887                                aqmf,
888                                key_compression_dictionary_length: entry
889                                    .key_compression_dictionary_length(),
890                                value_compression_dictionary_length: entry
891                                    .value_compression_dictionary_length(),
892                                block_count: entry.block_count(),
893                                size: entry.size(),
894                                entries: 0,
895                            };
896                            return Ok(PartialMergeResult::Move {
897                                seq: entry.sequence_number(),
898                                meta,
899                            });
900                        }
901
902                        fn create_sst_file(
903                            entries: &[LookupEntry],
904                            total_key_size: usize,
905                            total_value_size: usize,
906                            path: &Path,
907                            seq: u32,
908                        ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
909                        {
910                            let _span = tracing::trace_span!("write merged sst file").entered();
911                            let builder = StaticSortedFileBuilder::new(
912                                entries,
913                                total_key_size,
914                                total_value_size,
915                            )?;
916                            let (meta, file) =
917                                builder.write(&path.join(format!("{seq:08}.sst")))?;
918                            Ok((seq, file, meta))
919                        }
920
921                        let mut new_sst_files = Vec::new();
922
923                        // Iterate all SST files
924                        let iters = indices
925                            .iter()
926                            .map(|&index| {
927                                let meta_index = ssts_with_ranges[index].meta_index;
928                                let index_in_meta = ssts_with_ranges[index].index_in_meta;
929                                let meta = &meta_files[meta_index];
930                                meta.entry(index_in_meta)
931                                    .sst(meta)?
932                                    .iter(key_block_cache, value_block_cache)
933                            })
934                            .collect::<Result<Vec<_>>>()?;
935
936                        let iter = MergeIter::new(iters.into_iter())?;
937
938                        // TODO figure out how to delete blobs when they are no longer
939                        // referenced
940                        let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
941
942                        let mut keys_written = 0;
943
944                        let mut total_key_size = 0;
945                        let mut total_value_size = 0;
946                        let mut current: Option<LookupEntry> = None;
947                        let mut entries = Vec::new();
948                        let mut last_entries = Vec::new();
949                        let mut last_entries_total_sizes = (0, 0);
950                        for entry in iter {
951                            let entry = entry?;
952
953                            // Remove duplicates
954                            if let Some(current) = current.take() {
955                                if current.key != entry.key {
956                                    let key_size = current.key.len();
957                                    let value_size = current.value.size_in_sst();
958                                    total_key_size += key_size;
959                                    total_value_size += value_size;
960
961                                    if total_key_size + total_value_size
962                                        > DATA_THRESHOLD_PER_COMPACTED_FILE
963                                        || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE
964                                    {
965                                        let (selected_total_key_size, selected_total_value_size) =
966                                            last_entries_total_sizes;
967                                        swap(&mut entries, &mut last_entries);
968                                        last_entries_total_sizes = (
969                                            total_key_size - key_size,
970                                            total_value_size - value_size,
971                                        );
972                                        total_key_size = key_size;
973                                        total_value_size = value_size;
974
975                                        if !entries.is_empty() {
976                                            let seq =
977                                                sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
978
979                                            keys_written += entries.len() as u64;
980                                            new_sst_files.push(create_sst_file(
981                                                &entries,
982                                                selected_total_key_size,
983                                                selected_total_value_size,
984                                                path,
985                                                seq,
986                                            )?);
987
988                                            entries.clear();
989                                        }
990                                    }
991
992                                    entries.push(current);
993                                } else {
994                                    // Override value
995                                }
996                            }
997                            current = Some(entry);
998                        }
999                        if let Some(entry) = current {
1000                            total_key_size += entry.key.len();
1001                            total_value_size += entry.value.size_in_sst();
1002                            entries.push(entry);
1003                        }
1004
1005                        // If we have one set of entries left, write them to a new SST file
1006                        if last_entries.is_empty() && !entries.is_empty() {
1007                            let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1008
1009                            keys_written += entries.len() as u64;
1010                            new_sst_files.push(create_sst_file(
1011                                &entries,
1012                                total_key_size,
1013                                total_value_size,
1014                                path,
1015                                seq,
1016                            )?);
1017                        } else
1018                        // If we have two sets of entries left, merge them and
1019                        // split it into two SST files, to avoid having a
1020                        // single SST file that is very small.
1021                        if !last_entries.is_empty() {
1022                            last_entries.append(&mut entries);
1023
1024                            last_entries_total_sizes.0 += total_key_size;
1025                            last_entries_total_sizes.1 += total_value_size;
1026
1027                            let (part1, part2) = last_entries.split_at(last_entries.len() / 2);
1028
1029                            let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1030                            let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1031
1032                            keys_written += part1.len() as u64;
1033                            new_sst_files.push(create_sst_file(
1034                                part1,
1035                                // We don't know the exact sizes so we estimate them
1036                                last_entries_total_sizes.0 / 2,
1037                                last_entries_total_sizes.1 / 2,
1038                                path,
1039                                seq1,
1040                            )?);
1041
1042                            keys_written += part2.len() as u64;
1043                            new_sst_files.push(create_sst_file(
1044                                part2,
1045                                last_entries_total_sizes.0 / 2,
1046                                last_entries_total_sizes.1 / 2,
1047                                path,
1048                                seq2,
1049                            )?);
1050                        }
1051                        Ok(PartialMergeResult::Merged {
1052                            new_sst_files,
1053                            blob_seq_numbers_to_delete,
1054                            keys_written,
1055                        })
1056                    })
1057                    .collect::<Result<Vec<_>>>()
1058                    .with_context(|| {
1059                        format!("Failed to merge database files for family {family}")
1060                    })?;
1061
1062                let Some((sst_files_len, blob_delete_len)) = merge_result
1063                    .iter()
1064                    .map(|r| {
1065                        if let PartialMergeResult::Merged {
1066                            new_sst_files,
1067                            blob_seq_numbers_to_delete,
1068                            keys_written: _,
1069                        } = r
1070                        {
1071                            (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1072                        } else {
1073                            (0, 0)
1074                        }
1075                    })
1076                    .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1077                else {
1078                    unreachable!()
1079                };
1080
1081                let mut new_sst_files = Vec::with_capacity(sst_files_len);
1082                let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1083
1084                let mut meta_file_builder = MetaFileBuilder::new(family);
1085
1086                let mut keys_written = 0;
1087                for result in merge_result {
1088                    match result {
1089                        PartialMergeResult::Merged {
1090                            new_sst_files: merged_new_sst_files,
1091                            blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1092                            keys_written: merged_keys_written,
1093                        } => {
1094                            for (seq, file, meta) in merged_new_sst_files {
1095                                meta_file_builder.add(seq, meta);
1096                                new_sst_files.push((seq, file));
1097                            }
1098                            blob_seq_numbers_to_delete.extend(merged_blob_seq_numbers_to_delete);
1099                            keys_written += merged_keys_written;
1100                        }
1101                        PartialMergeResult::Move { seq, meta } => {
1102                            meta_file_builder.add(seq, meta);
1103                        }
1104                    }
1105                }
1106
1107                for &seq in sst_seq_numbers_to_delete.iter() {
1108                    meta_file_builder.add_obsolete_sst_file(seq);
1109                }
1110
1111                let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1112                let meta_file = {
1113                    let _span = tracing::trace_span!("write meta file").entered();
1114                    meta_file_builder.write(&self.path, seq)?
1115                };
1116
1117                Ok(PartialResultPerFamily {
1118                    new_meta_file: Some((seq, meta_file)),
1119                    new_sst_files,
1120                    sst_seq_numbers_to_delete,
1121                    blob_seq_numbers_to_delete,
1122                    keys_written,
1123                })
1124            })
1125            .collect::<Result<Vec<_>>>()?;
1126
1127        for PartialResultPerFamily {
1128            new_meta_file: inner_new_meta_file,
1129            new_sst_files: mut inner_new_sst_files,
1130            sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1131            blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1132            keys_written: inner_keys_written,
1133        } in result
1134        {
1135            new_meta_files.extend(inner_new_meta_file);
1136            new_sst_files.append(&mut inner_new_sst_files);
1137            sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1138            blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1139            *keys_written += inner_keys_written;
1140        }
1141
1142        Ok(())
1143    }
1144
1145    /// Get a value from the database. Returns None if the key is not found. The returned value
1146    /// might hold onto a block of the database and it should not be hold long-term.
1147    pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1148        let hash = hash_key(key);
1149        let inner = self.inner.read();
1150        for meta in inner.meta_files.iter().rev() {
1151            match meta.lookup(
1152                family as u32,
1153                hash,
1154                key,
1155                &self.aqmf_cache,
1156                &self.key_block_cache,
1157                &self.value_block_cache,
1158            )? {
1159                MetaLookupResult::FamilyMiss => {
1160                    #[cfg(feature = "stats")]
1161                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1162                }
1163                MetaLookupResult::RangeMiss => {
1164                    #[cfg(feature = "stats")]
1165                    self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1166                }
1167                MetaLookupResult::QuickFilterMiss => {
1168                    #[cfg(feature = "stats")]
1169                    self.stats.miss_aqmf.fetch_add(1, Ordering::Relaxed);
1170                }
1171                MetaLookupResult::SstLookup(result) => match result {
1172                    SstLookupResult::Found(result) => match result {
1173                        LookupValue::Deleted => {
1174                            #[cfg(feature = "stats")]
1175                            self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1176                            return Ok(None);
1177                        }
1178                        LookupValue::Slice { value } => {
1179                            #[cfg(feature = "stats")]
1180                            self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1181                            return Ok(Some(value));
1182                        }
1183                        LookupValue::Blob { sequence_number } => {
1184                            #[cfg(feature = "stats")]
1185                            self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1186                            let blob = self.read_blob(sequence_number)?;
1187                            return Ok(Some(blob));
1188                        }
1189                    },
1190                    SstLookupResult::NotFound => {
1191                        #[cfg(feature = "stats")]
1192                        self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1193                    }
1194                },
1195            }
1196        }
1197        #[cfg(feature = "stats")]
1198        self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1199        Ok(None)
1200    }
1201
1202    /// Returns database statistics.
1203    #[cfg(feature = "stats")]
1204    pub fn statistics(&self) -> Statistics {
1205        let inner = self.inner.read();
1206        Statistics {
1207            meta_files: inner.meta_files.len(),
1208            sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1209            key_block_cache: CacheStatistics::new(&self.key_block_cache),
1210            value_block_cache: CacheStatistics::new(&self.value_block_cache),
1211            aqmf_cache: CacheStatistics::new(&self.aqmf_cache),
1212            hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1213                + self.stats.hits_small.load(Ordering::Relaxed)
1214                + self.stats.hits_blob.load(Ordering::Relaxed),
1215            misses: self.stats.miss_global.load(Ordering::Relaxed),
1216            miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1217            miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1218            miss_aqmf: self.stats.miss_aqmf.load(Ordering::Relaxed),
1219            miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1220        }
1221    }
1222
1223    pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1224        Ok(self
1225            .inner
1226            .read()
1227            .meta_files
1228            .iter()
1229            .rev()
1230            .map(|meta_file| {
1231                let entries = meta_file
1232                    .entries()
1233                    .iter()
1234                    .map(|entry| {
1235                        let aqmf = entry.raw_aqmf(meta_file.aqmf_data());
1236                        MetaFileEntryInfo {
1237                            sequence_number: entry.sequence_number(),
1238                            min_hash: entry.min_hash(),
1239                            max_hash: entry.max_hash(),
1240                            sst_size: entry.size(),
1241                            aqmf_size: entry.aqmf_size(),
1242                            aqmf_entries: aqmf.len(),
1243                            key_compression_dictionary_size: entry
1244                                .key_compression_dictionary_length(),
1245                            value_compression_dictionary_size: entry
1246                                .value_compression_dictionary_length(),
1247                            block_count: entry.block_count(),
1248                        }
1249                    })
1250                    .collect();
1251                MetaFileInfo {
1252                    sequence_number: meta_file.sequence_number(),
1253                    family: meta_file.family(),
1254                    obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1255                    entries,
1256                }
1257            })
1258            .collect())
1259    }
1260
1261    /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
1262    pub fn shutdown(&self) -> Result<()> {
1263        #[cfg(feature = "print_stats")]
1264        println!("{:#?}", self.statistics());
1265        Ok(())
1266    }
1267}
1268
1269pub struct MetaFileInfo {
1270    pub sequence_number: u32,
1271    pub family: u32,
1272    pub obsolete_sst_files: Vec<u32>,
1273    pub entries: Vec<MetaFileEntryInfo>,
1274}
1275
1276pub struct MetaFileEntryInfo {
1277    pub sequence_number: u32,
1278    pub min_hash: u64,
1279    pub max_hash: u64,
1280    pub aqmf_size: u32,
1281    pub aqmf_entries: usize,
1282    pub sst_size: u64,
1283    pub key_compression_dictionary_size: u16,
1284    pub value_compression_dictionary_size: u16,
1285    pub block_count: u16,
1286}