turbo_persistence/
db.rs

1use std::{
2    borrow::Cow,
3    collections::HashSet,
4    fs::{self, File, OpenOptions, ReadDir},
5    io::{BufWriter, Write},
6    mem::swap,
7    ops::RangeInclusive,
8    path::{Path, PathBuf},
9    sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use jiff::Timestamp;
15use memmap2::Mmap;
16use parking_lot::{Mutex, RwLock};
17
18pub use crate::compaction::selector::CompactConfig;
19use crate::{
20    QueryKey,
21    arc_slice::ArcSlice,
22    compaction::selector::{Compactable, compute_metrics, get_merge_segments},
23    compression::decompress_into_arc,
24    constants::{
25        AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
26        KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
27        VALUE_BLOCK_CACHE_SIZE,
28    },
29    key::{StoreKey, hash_key},
30    lookup_entry::{LookupEntry, LookupValue},
31    merge_iter::MergeIter,
32    meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange},
33    meta_file_builder::MetaFileBuilder,
34    parallel_scheduler::ParallelScheduler,
35    sst_filter::SstFilter,
36    static_sorted_file::{BlockCache, SstLookupResult},
37    static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
38    write_batch::{FinishResult, WriteBatch},
39};
40
41#[cfg(feature = "stats")]
42#[derive(Debug)]
43pub struct CacheStatistics {
44    pub hit_rate: f32,
45    pub fill: f32,
46    pub items: usize,
47    pub size: u64,
48    pub hits: u64,
49    pub misses: u64,
50}
51
52#[cfg(feature = "stats")]
53impl CacheStatistics {
54    fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
55    where
56        Key: Eq + std::hash::Hash,
57        Val: Clone,
58        We: quick_cache::Weighter<Key, Val> + Clone,
59        B: std::hash::BuildHasher + Clone,
60        L: quick_cache::Lifecycle<Key, Val> + Clone,
61    {
62        let size = cache.weight();
63        let hits = cache.hits();
64        let misses = cache.misses();
65        Self {
66            hit_rate: hits as f32 / (hits + misses) as f32,
67            fill: size as f32 / cache.capacity() as f32,
68            items: cache.len(),
69            size,
70            hits,
71            misses,
72        }
73    }
74}
75
76#[cfg(feature = "stats")]
77#[derive(Debug)]
78pub struct Statistics {
79    pub meta_files: usize,
80    pub sst_files: usize,
81    pub key_block_cache: CacheStatistics,
82    pub value_block_cache: CacheStatistics,
83    pub amqf_cache: CacheStatistics,
84    pub hits: u64,
85    pub misses: u64,
86    pub miss_family: u64,
87    pub miss_range: u64,
88    pub miss_amqf: u64,
89    pub miss_key: u64,
90}
91
92#[cfg(feature = "stats")]
93#[derive(Default)]
94struct TrackedStats {
95    hits_deleted: std::sync::atomic::AtomicU64,
96    hits_small: std::sync::atomic::AtomicU64,
97    hits_blob: std::sync::atomic::AtomicU64,
98    miss_family: std::sync::atomic::AtomicU64,
99    miss_range: std::sync::atomic::AtomicU64,
100    miss_amqf: std::sync::atomic::AtomicU64,
101    miss_key: std::sync::atomic::AtomicU64,
102    miss_global: std::sync::atomic::AtomicU64,
103}
104
105/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
106/// using a single write batch. It allows for concurrent reads.
107pub struct TurboPersistence<S: ParallelScheduler> {
108    parallel_scheduler: S,
109    /// The path to the directory where the database is stored
110    path: PathBuf,
111    /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
112    /// no modification on the database is performed.
113    read_only: bool,
114    /// The inner state of the database. Writing will update that.
115    inner: RwLock<Inner>,
116    /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent
117    /// write operations.
118    active_write_operation: AtomicBool,
119    /// A cache for deserialized AMQF filters.
120    amqf_cache: AmqfCache,
121    /// A cache for decompressed key blocks.
122    key_block_cache: BlockCache,
123    /// A cache for decompressed value blocks.
124    value_block_cache: BlockCache,
125    /// Statistics for the database.
126    #[cfg(feature = "stats")]
127    stats: TrackedStats,
128}
129
130/// The inner state of the database.
131struct Inner {
132    /// The list of meta files in the database. This is used to derive the SST files.
133    meta_files: Vec<MetaFile>,
134    /// The current sequence number for the database.
135    current_sequence_number: u32,
136}
137
138pub struct CommitOptions {
139    new_meta_files: Vec<(u32, File)>,
140    new_sst_files: Vec<(u32, File)>,
141    new_blob_files: Vec<(u32, File)>,
142    sst_seq_numbers_to_delete: Vec<u32>,
143    blob_seq_numbers_to_delete: Vec<u32>,
144    sequence_number: u32,
145    keys_written: u64,
146}
147
148impl<S: ParallelScheduler + Default> TurboPersistence<S> {
149    /// Open a TurboPersistence database at the given path.
150    /// This will read the directory and might performance cleanup when the database was not closed
151    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
152    /// files, so it's fast.
153    pub fn open(path: PathBuf) -> Result<Self> {
154        Self::open_with_parallel_scheduler(path, Default::default())
155    }
156
157    /// Open a TurboPersistence database at the given path in read only mode.
158    /// This will read the directory. No Cleanup is performed.
159    pub fn open_read_only(path: PathBuf) -> Result<Self> {
160        Self::open_read_only_with_parallel_scheduler(path, Default::default())
161    }
162}
163
164impl<S: ParallelScheduler> TurboPersistence<S> {
165    fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self {
166        Self {
167            parallel_scheduler,
168            path,
169            read_only,
170            inner: RwLock::new(Inner {
171                meta_files: Vec::new(),
172                current_sequence_number: 0,
173            }),
174            active_write_operation: AtomicBool::new(false),
175            amqf_cache: AmqfCache::with(
176                AMQF_CACHE_SIZE as usize / AMQF_AVG_SIZE,
177                AMQF_CACHE_SIZE,
178                Default::default(),
179                Default::default(),
180                Default::default(),
181            ),
182            key_block_cache: BlockCache::with(
183                KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
184                KEY_BLOCK_CACHE_SIZE,
185                Default::default(),
186                Default::default(),
187                Default::default(),
188            ),
189            value_block_cache: BlockCache::with(
190                VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
191                VALUE_BLOCK_CACHE_SIZE,
192                Default::default(),
193                Default::default(),
194                Default::default(),
195            ),
196            #[cfg(feature = "stats")]
197            stats: TrackedStats::default(),
198        }
199    }
200
201    /// Open a TurboPersistence database at the given path.
202    /// This will read the directory and might performance cleanup when the database was not closed
203    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
204    /// files, so it's fast.
205    pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
206        let mut db = Self::new(path, false, parallel_scheduler);
207        db.open_directory(false)?;
208        Ok(db)
209    }
210
211    /// Open a TurboPersistence database at the given path in read only mode.
212    /// This will read the directory. No Cleanup is performed.
213    pub fn open_read_only_with_parallel_scheduler(
214        path: PathBuf,
215        parallel_scheduler: S,
216    ) -> Result<Self> {
217        let mut db = Self::new(path, true, parallel_scheduler);
218        db.open_directory(false)?;
219        Ok(db)
220    }
221
222    /// Performs the initial check on the database directory.
223    fn open_directory(&mut self, read_only: bool) -> Result<()> {
224        match fs::read_dir(&self.path) {
225            Ok(entries) => {
226                if !self
227                    .load_directory(entries, read_only)
228                    .context("Loading persistence directory failed")?
229                {
230                    if read_only {
231                        bail!("Failed to open database");
232                    }
233                    self.init_directory()
234                        .context("Initializing persistence directory failed")?;
235                }
236                Ok(())
237            }
238            Err(e) => {
239                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
240                    self.create_and_init_directory()
241                        .context("Creating and initializing persistence directory failed")?;
242                    Ok(())
243                } else {
244                    Err(e).context("Failed to open database")
245                }
246            }
247        }
248    }
249
250    /// Creates the directory and initializes it.
251    fn create_and_init_directory(&mut self) -> Result<()> {
252        fs::create_dir_all(&self.path)?;
253        self.init_directory()
254    }
255
256    /// Initializes the directory by creating the CURRENT file.
257    fn init_directory(&mut self) -> Result<()> {
258        let mut current = File::create(self.path.join("CURRENT"))?;
259        current.write_u32::<BE>(0)?;
260        current.flush()?;
261        Ok(())
262    }
263
264    /// Loads an existing database directory and performs cleanup if necessary.
265    fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
266        let mut meta_files = Vec::new();
267        let mut current_file = match File::open(self.path.join("CURRENT")) {
268            Ok(file) => file,
269            Err(e) => {
270                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
271                    return Ok(false);
272                } else {
273                    return Err(e).context("Failed to open CURRENT file");
274                }
275            }
276        };
277        let current = current_file.read_u32::<BE>()?;
278        drop(current_file);
279
280        let mut deleted_files = HashSet::new();
281        for entry in entries {
282            let entry = entry?;
283            let path = entry.path();
284            if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
285                let seq: u32 = path
286                    .file_stem()
287                    .context("File has no file stem")?
288                    .to_str()
289                    .context("File stem is not valid utf-8")?
290                    .parse()?;
291                if deleted_files.contains(&seq) {
292                    continue;
293                }
294                if seq > current {
295                    if !read_only {
296                        fs::remove_file(&path)?;
297                    }
298                } else {
299                    match ext {
300                        "meta" => {
301                            meta_files.push(seq);
302                        }
303                        "del" => {
304                            let mut content = &*fs::read(&path)?;
305                            let mut no_existing_files = true;
306                            while !content.is_empty() {
307                                let seq = content.read_u32::<BE>()?;
308                                deleted_files.insert(seq);
309                                if !read_only {
310                                    // Remove the files that are marked for deletion
311                                    let sst_file = self.path.join(format!("{seq:08}.sst"));
312                                    let meta_file = self.path.join(format!("{seq:08}.meta"));
313                                    let blob_file = self.path.join(format!("{seq:08}.blob"));
314                                    for path in [sst_file, meta_file, blob_file] {
315                                        if fs::exists(&path)? {
316                                            fs::remove_file(path)?;
317                                            no_existing_files = false;
318                                        }
319                                    }
320                                }
321                            }
322                            if !read_only && no_existing_files {
323                                fs::remove_file(&path)?;
324                            }
325                        }
326                        "blob" | "sst" => {
327                            // ignore blobs and sst, they are read when needed
328                        }
329                        _ => {
330                            if !path
331                                .file_name()
332                                .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
333                            {
334                                bail!("Unexpected file in persistence directory: {:?}", path);
335                            }
336                        }
337                    }
338                }
339            } else {
340                match path.file_stem().and_then(|s| s.to_str()) {
341                    Some("CURRENT") => {
342                        // Already read
343                    }
344                    Some("LOG") => {
345                        // Ignored, write-only
346                    }
347                    _ => {
348                        if !path
349                            .file_name()
350                            .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
351                        {
352                            bail!("Unexpected file in persistence directory: {:?}", path);
353                        }
354                    }
355                }
356            }
357        }
358
359        meta_files.retain(|seq| !deleted_files.contains(seq));
360        meta_files.sort_unstable();
361        let mut meta_files = self
362            .parallel_scheduler
363            .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
364                let meta_file = MetaFile::open(&self.path, seq)?;
365                Ok(meta_file)
366            })?;
367
368        let mut sst_filter = SstFilter::new();
369        for meta_file in meta_files.iter_mut().rev() {
370            sst_filter.apply_filter(meta_file);
371        }
372
373        let inner = self.inner.get_mut();
374        inner.meta_files = meta_files;
375        inner.current_sequence_number = current;
376        Ok(true)
377    }
378
379    /// Reads and decompresses a blob file. This is not backed by any cache.
380    #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
381    fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
382        let path = self.path.join(format!("{seq:08}.blob"));
383        let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
384        #[cfg(unix)]
385        mmap.advise(memmap2::Advice::Sequential)?;
386        #[cfg(unix)]
387        mmap.advise(memmap2::Advice::WillNeed)?;
388        #[cfg(target_os = "linux")]
389        mmap.advise(memmap2::Advice::DontFork)?;
390        #[cfg(target_os = "linux")]
391        mmap.advise(memmap2::Advice::Unmergeable)?;
392        let mut compressed = &mmap[..];
393        let uncompressed_length = compressed.read_u32::<BE>()?;
394
395        let buffer = decompress_into_arc(uncompressed_length, compressed, None, true)?;
396        Ok(ArcSlice::from(buffer))
397    }
398
399    /// Returns true if the database is empty.
400    pub fn is_empty(&self) -> bool {
401        self.inner.read().meta_files.is_empty()
402    }
403
404    /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a
405    /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
406    /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
407    /// This data will only become visible after the WriteBatch is committed.
408    pub fn write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
409        &self,
410    ) -> Result<WriteBatch<K, S, FAMILIES>> {
411        if self.read_only {
412            bail!("Cannot write to a read-only database");
413        }
414        if self
415            .active_write_operation
416            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
417            .is_err()
418        {
419            bail!(
420                "Another write batch or compaction is already active (Only a single write \
421                 operations is allowed at a time)"
422            );
423        }
424        let current = self.inner.read().current_sequence_number;
425        Ok(WriteBatch::new(
426            self.path.clone(),
427            current,
428            self.parallel_scheduler.clone(),
429        ))
430    }
431
432    fn open_log(&self) -> Result<BufWriter<File>> {
433        if self.read_only {
434            unreachable!("Only write operations can open the log file");
435        }
436        let log_path = self.path.join("LOG");
437        let log_file = OpenOptions::new()
438            .create(true)
439            .append(true)
440            .open(log_path)?;
441        Ok(BufWriter::new(log_file))
442    }
443
444    /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
445    /// visible to readers.
446    pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
447        &self,
448        mut write_batch: WriteBatch<K, S, FAMILIES>,
449    ) -> Result<()> {
450        if self.read_only {
451            unreachable!("It's not possible to create a write batch for a read-only database");
452        }
453        let FinishResult {
454            sequence_number,
455            new_meta_files,
456            new_sst_files,
457            new_blob_files,
458            keys_written,
459        } = write_batch.finish()?;
460        self.commit(CommitOptions {
461            new_meta_files,
462            new_sst_files,
463            new_blob_files,
464            sst_seq_numbers_to_delete: vec![],
465            blob_seq_numbers_to_delete: vec![],
466            sequence_number,
467            keys_written,
468        })?;
469        self.active_write_operation.store(false, Ordering::Release);
470        Ok(())
471    }
472
473    /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the
474    /// new files.
475    fn commit(
476        &self,
477        CommitOptions {
478            mut new_meta_files,
479            mut new_sst_files,
480            mut new_blob_files,
481            mut sst_seq_numbers_to_delete,
482            mut blob_seq_numbers_to_delete,
483            sequence_number: mut seq,
484            keys_written,
485        }: CommitOptions,
486    ) -> Result<(), anyhow::Error> {
487        let time = Timestamp::now();
488
489        new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
490
491        let mut new_meta_files = self
492            .parallel_scheduler
493            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
494                file.sync_all()?;
495                let meta_file = MetaFile::open(&self.path, seq)?;
496                Ok(meta_file)
497            })?;
498
499        let mut sst_filter = SstFilter::new();
500        for meta_file in new_meta_files.iter_mut().rev() {
501            sst_filter.apply_filter(meta_file);
502        }
503
504        self.parallel_scheduler.block_in_place(|| {
505            for (_, file) in new_sst_files.iter() {
506                file.sync_all()?;
507            }
508            for (_, file) in new_blob_files.iter() {
509                file.sync_all()?;
510            }
511            anyhow::Ok(())
512        })?;
513
514        let new_meta_info = new_meta_files
515            .iter()
516            .map(|meta| {
517                let ssts = meta
518                    .entries()
519                    .iter()
520                    .map(|entry| {
521                        let seq = entry.sequence_number();
522                        let range = entry.range();
523                        let size = entry.size();
524                        (seq, range.min_hash, range.max_hash, size)
525                    })
526                    .collect::<Vec<_>>();
527                (
528                    meta.sequence_number(),
529                    meta.family(),
530                    ssts,
531                    meta.obsolete_sst_files().to_vec(),
532                )
533            })
534            .collect::<Vec<_>>();
535
536        let has_delete_file;
537        let mut meta_seq_numbers_to_delete = Vec::new();
538
539        {
540            let mut inner = self.inner.write();
541            for meta_file in inner.meta_files.iter_mut().rev() {
542                sst_filter.apply_filter(meta_file);
543            }
544            inner.meta_files.append(&mut new_meta_files);
545            // apply_and_get_remove need to run in reverse order
546            inner.meta_files.reverse();
547            inner.meta_files.retain(|meta| {
548                if sst_filter.apply_and_get_remove(meta) {
549                    meta_seq_numbers_to_delete.push(meta.sequence_number());
550                    false
551                } else {
552                    true
553                }
554            });
555            inner.meta_files.reverse();
556            has_delete_file = !sst_seq_numbers_to_delete.is_empty()
557                || !blob_seq_numbers_to_delete.is_empty()
558                || !meta_seq_numbers_to_delete.is_empty();
559            if has_delete_file {
560                seq += 1;
561            }
562            inner.current_sequence_number = seq;
563        }
564
565        self.parallel_scheduler.block_in_place(|| {
566            if has_delete_file {
567                sst_seq_numbers_to_delete.sort_unstable();
568                meta_seq_numbers_to_delete.sort_unstable();
569                blob_seq_numbers_to_delete.sort_unstable();
570                // Write *.del file, marking the selected files as to delete
571                let mut buf = Vec::with_capacity(
572                    (sst_seq_numbers_to_delete.len()
573                        + meta_seq_numbers_to_delete.len()
574                        + blob_seq_numbers_to_delete.len())
575                        * size_of::<u32>(),
576                );
577                for seq in sst_seq_numbers_to_delete.iter() {
578                    buf.write_u32::<BE>(*seq)?;
579                }
580                for seq in meta_seq_numbers_to_delete.iter() {
581                    buf.write_u32::<BE>(*seq)?;
582                }
583                for seq in blob_seq_numbers_to_delete.iter() {
584                    buf.write_u32::<BE>(*seq)?;
585                }
586                let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
587                file.write_all(&buf)?;
588                file.sync_all()?;
589            }
590
591            let mut current_file = OpenOptions::new()
592                .write(true)
593                .truncate(false)
594                .read(false)
595                .open(self.path.join("CURRENT"))?;
596            current_file.write_u32::<BE>(seq)?;
597            current_file.sync_all()?;
598
599            for seq in sst_seq_numbers_to_delete.iter() {
600                fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
601            }
602            for seq in meta_seq_numbers_to_delete.iter() {
603                fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
604            }
605            for seq in blob_seq_numbers_to_delete.iter() {
606                fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
607            }
608
609            {
610                let mut log = self.open_log()?;
611                writeln!(log, "Time {time}")?;
612                let span = time.until(Timestamp::now())?;
613                writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
614                for (seq, family, ssts, obsolete) in new_meta_info {
615                    writeln!(log, "{seq:08} META family:{family}",)?;
616                    for (seq, min, max, size) in ssts {
617                        writeln!(
618                            log,
619                            "  {seq:08} SST  {min:016x}-{max:016x} {} MiB",
620                            size / 1024 / 1024
621                        )?;
622                    }
623                    for seq in obsolete {
624                        writeln!(log, "  {seq:08} OBSOLETE SST")?;
625                    }
626                }
627                new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
628                for (seq, _) in new_sst_files.iter() {
629                    writeln!(log, "{seq:08} NEW SST")?;
630                }
631                new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
632                for (seq, _) in new_blob_files.iter() {
633                    writeln!(log, "{seq:08} NEW BLOB")?;
634                }
635                for seq in sst_seq_numbers_to_delete.iter() {
636                    writeln!(log, "{seq:08} SST DELETED")?;
637                }
638                for seq in meta_seq_numbers_to_delete.iter() {
639                    writeln!(log, "{seq:08} META DELETED")?;
640                }
641                for seq in blob_seq_numbers_to_delete.iter() {
642                    writeln!(log, "{seq:08} BLOB DELETED")?;
643                }
644            }
645            anyhow::Ok(())
646        })?;
647        Ok(())
648    }
649
650    /// Runs a full compaction on the database. This will rewrite all SST files, removing all
651    /// duplicate keys and separating all key ranges into unique files.
652    pub fn full_compact(&self) -> Result<()> {
653        self.compact(&CompactConfig {
654            min_merge_count: 2,
655            optimal_merge_count: usize::MAX,
656            max_merge_count: usize::MAX,
657            max_merge_bytes: u64::MAX,
658            min_merge_duplication_bytes: 0,
659            optimal_merge_duplication_bytes: u64::MAX,
660            max_merge_segment_count: usize::MAX,
661        })?;
662        Ok(())
663    }
664
665    /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
666    /// files is above the given threshold. The coverage is the average number of SST files that
667    /// need to be read to find a key. It also limits the maximum number of SST files that are
668    /// merged at once, which is the main factor for the runtime of the compaction.
669    pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
670        if self.read_only {
671            bail!("Compaction is not allowed on a read only database");
672        }
673        let _span = tracing::info_span!("compact database").entered();
674        if self
675            .active_write_operation
676            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
677            .is_err()
678        {
679            bail!(
680                "Another write batch or compaction is already active (Only a single write \
681                 operations is allowed at a time)"
682            );
683        }
684
685        let mut sequence_number;
686        let mut new_meta_files = Vec::new();
687        let mut new_sst_files = Vec::new();
688        let mut sst_seq_numbers_to_delete = Vec::new();
689        let mut blob_seq_numbers_to_delete = Vec::new();
690        let mut keys_written = 0;
691
692        {
693            let inner = self.inner.read();
694            sequence_number = AtomicU32::new(inner.current_sequence_number);
695            self.compact_internal(
696                &inner.meta_files,
697                &sequence_number,
698                &mut new_meta_files,
699                &mut new_sst_files,
700                &mut sst_seq_numbers_to_delete,
701                &mut blob_seq_numbers_to_delete,
702                &mut keys_written,
703                compact_config,
704            )
705            .context("Failed to compact database")?;
706        }
707
708        let has_changes = !new_meta_files.is_empty();
709        if has_changes {
710            self.commit(CommitOptions {
711                new_meta_files,
712                new_sst_files,
713                new_blob_files: Vec::new(),
714                sst_seq_numbers_to_delete,
715                blob_seq_numbers_to_delete,
716                sequence_number: *sequence_number.get_mut(),
717                keys_written,
718            })
719            .context("Failed to commit the database compaction")?;
720        }
721
722        self.active_write_operation.store(false, Ordering::Release);
723
724        Ok(has_changes)
725    }
726
727    /// Internal function to perform a compaction.
728    fn compact_internal(
729        &self,
730        meta_files: &[MetaFile],
731        sequence_number: &AtomicU32,
732        new_meta_files: &mut Vec<(u32, File)>,
733        new_sst_files: &mut Vec<(u32, File)>,
734        sst_seq_numbers_to_delete: &mut Vec<u32>,
735        blob_seq_numbers_to_delete: &mut Vec<u32>,
736        keys_written: &mut u64,
737        compact_config: &CompactConfig,
738    ) -> Result<()> {
739        if meta_files.is_empty() {
740            return Ok(());
741        }
742
743        struct SstWithRange {
744            meta_index: usize,
745            index_in_meta: u32,
746            seq: u32,
747            range: StaticSortedFileRange,
748            size: u64,
749        }
750
751        impl Compactable for SstWithRange {
752            fn range(&self) -> RangeInclusive<u64> {
753                self.range.min_hash..=self.range.max_hash
754            }
755
756            fn size(&self) -> u64 {
757                self.size
758            }
759        }
760
761        let ssts_with_ranges = meta_files
762            .iter()
763            .enumerate()
764            .flat_map(|(meta_index, meta)| {
765                meta.entries()
766                    .iter()
767                    .enumerate()
768                    .map(move |(index_in_meta, entry)| SstWithRange {
769                        meta_index,
770                        index_in_meta: index_in_meta as u32,
771                        seq: entry.sequence_number(),
772                        range: entry.range(),
773                        size: entry.size(),
774                    })
775            })
776            .collect::<Vec<_>>();
777
778        let families = ssts_with_ranges
779            .iter()
780            .map(|s| s.range.family)
781            .max()
782            .unwrap() as usize
783            + 1;
784
785        let mut sst_by_family = Vec::with_capacity(families);
786        sst_by_family.resize_with(families, Vec::new);
787
788        for sst in ssts_with_ranges {
789            sst_by_family[sst.range.family as usize].push(sst);
790        }
791
792        let key_block_cache = &self.key_block_cache;
793        let value_block_cache = &self.value_block_cache;
794        let path = &self.path;
795
796        let log_mutex = Mutex::new(());
797
798        struct PartialResultPerFamily {
799            new_meta_file: Option<(u32, File)>,
800            new_sst_files: Vec<(u32, File)>,
801            sst_seq_numbers_to_delete: Vec<u32>,
802            blob_seq_numbers_to_delete: Vec<u32>,
803            keys_written: u64,
804        }
805
806        let mut compact_config = compact_config.clone();
807        let merge_jobs = sst_by_family
808            .into_iter()
809            .enumerate()
810            .filter_map(|(family, ssts_with_ranges)| {
811                if compact_config.max_merge_segment_count == 0 {
812                    return None;
813                }
814                let (merge_jobs, real_merge_job_size) =
815                    get_merge_segments(&ssts_with_ranges, &compact_config);
816                compact_config.max_merge_segment_count -= real_merge_job_size;
817                Some((family, ssts_with_ranges, merge_jobs))
818            })
819            .collect::<Vec<_>>();
820
821        let result = self
822            .parallel_scheduler
823            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
824                merge_jobs,
825                |(family, ssts_with_ranges, merge_jobs)| {
826                    let family = family as u32;
827
828                    if merge_jobs.is_empty() {
829                        return Ok(PartialResultPerFamily {
830                            new_meta_file: None,
831                            new_sst_files: Vec::new(),
832                            sst_seq_numbers_to_delete: Vec::new(),
833                            blob_seq_numbers_to_delete: Vec::new(),
834                            keys_written: 0,
835                        });
836                    }
837
838                    self.parallel_scheduler.block_in_place(|| {
839                        let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
840                        let guard = log_mutex.lock();
841                        let mut log = self.open_log()?;
842                        writeln!(
843                            log,
844                            "Compaction for family {family} (coverage: {}, overlap: {}, \
845                             duplication: {} / {} MiB):",
846                            metrics.coverage,
847                            metrics.overlap,
848                            metrics.duplication,
849                            metrics.duplicated_size / 1024 / 1024
850                        )?;
851                        for job in merge_jobs.iter() {
852                            writeln!(log, "  merge")?;
853                            for i in job.iter() {
854                                let seq = ssts_with_ranges[*i].seq;
855                                let (min, max) = ssts_with_ranges[*i].range().into_inner();
856                                writeln!(log, "    {seq:08} {min:016x}-{max:016x}")?;
857                            }
858                        }
859                        drop(guard);
860                        anyhow::Ok(())
861                    })?;
862
863                    // Later we will remove the merged files
864                    let sst_seq_numbers_to_delete = merge_jobs
865                        .iter()
866                        .filter(|l| l.len() > 1)
867                        .flat_map(|l| l.iter().copied())
868                        .map(|index| ssts_with_ranges[index].seq)
869                        .collect::<Vec<_>>();
870
871                    // Merge SST files
872                    let span = tracing::trace_span!("merge files");
873                    enum PartialMergeResult<'l> {
874                        Merged {
875                            new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
876                            blob_seq_numbers_to_delete: Vec<u32>,
877                            keys_written: u64,
878                        },
879                        Move {
880                            seq: u32,
881                            meta: StaticSortedFileBuilderMeta<'l>,
882                        },
883                    }
884                    let merge_result = self
885                        .parallel_scheduler
886                        .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
887                            let _span = span.clone().entered();
888                            if indices.len() == 1 {
889                                // If we only have one file, we can just move it
890                                let index = indices[0];
891                                let meta_index = ssts_with_ranges[index].meta_index;
892                                let index_in_meta = ssts_with_ranges[index].index_in_meta;
893                                let meta_file = &meta_files[meta_index];
894                                let entry = meta_file.entry(index_in_meta);
895                                let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
896                                let meta = StaticSortedFileBuilderMeta {
897                                    min_hash: entry.min_hash(),
898                                    max_hash: entry.max_hash(),
899                                    amqf,
900                                    key_compression_dictionary_length: entry
901                                        .key_compression_dictionary_length(),
902                                    block_count: entry.block_count(),
903                                    size: entry.size(),
904                                    entries: 0,
905                                };
906                                return Ok(PartialMergeResult::Move {
907                                    seq: entry.sequence_number(),
908                                    meta,
909                                });
910                            }
911
912                            fn create_sst_file<'l, S: ParallelScheduler>(
913                                parallel_scheduler: &S,
914                                entries: &[LookupEntry<'l>],
915                                total_key_size: usize,
916                                path: &Path,
917                                seq: u32,
918                            ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
919                            {
920                                let _span = tracing::trace_span!("write merged sst file").entered();
921                                let (meta, file) = parallel_scheduler.block_in_place(|| {
922                                    write_static_stored_file(
923                                        entries,
924                                        total_key_size,
925                                        &path.join(format!("{seq:08}.sst")),
926                                    )
927                                })?;
928                                Ok((seq, file, meta))
929                            }
930
931                            let mut new_sst_files = Vec::new();
932
933                            // Iterate all SST files
934                            let iters = indices
935                                .iter()
936                                .map(|&index| {
937                                    let meta_index = ssts_with_ranges[index].meta_index;
938                                    let index_in_meta = ssts_with_ranges[index].index_in_meta;
939                                    let meta = &meta_files[meta_index];
940                                    meta.entry(index_in_meta)
941                                        .sst(meta)?
942                                        .iter(key_block_cache, value_block_cache)
943                                })
944                                .collect::<Result<Vec<_>>>()?;
945
946                            let iter = MergeIter::new(iters.into_iter())?;
947
948                            // TODO figure out how to delete blobs when they are no longer
949                            // referenced
950                            let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
951
952                            let mut keys_written = 0;
953
954                            let mut total_key_size = 0;
955                            let mut total_value_size = 0;
956                            let mut current: Option<LookupEntry<'_>> = None;
957                            let mut entries = Vec::new();
958                            let mut last_entries = Vec::new();
959                            let mut last_entries_total_key_size = 0;
960                            for entry in iter {
961                                let entry = entry?;
962
963                                // Remove duplicates
964                                if let Some(current) = current.take() {
965                                    if current.key != entry.key {
966                                        let key_size = current.key.len();
967                                        let value_size = current.value.uncompressed_size_in_sst();
968                                        total_key_size += key_size;
969                                        total_value_size += value_size;
970
971                                        if total_key_size + total_value_size
972                                            > DATA_THRESHOLD_PER_COMPACTED_FILE
973                                            || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE
974                                        {
975                                            let selected_total_key_size =
976                                                last_entries_total_key_size;
977                                            swap(&mut entries, &mut last_entries);
978                                            last_entries_total_key_size = total_key_size - key_size;
979                                            total_key_size = key_size;
980                                            total_value_size = value_size;
981
982                                            if !entries.is_empty() {
983                                                let seq = sequence_number
984                                                    .fetch_add(1, Ordering::SeqCst)
985                                                    + 1;
986
987                                                keys_written += entries.len() as u64;
988                                                new_sst_files.push(create_sst_file(
989                                                    &self.parallel_scheduler,
990                                                    &entries,
991                                                    selected_total_key_size,
992                                                    path,
993                                                    seq,
994                                                )?);
995
996                                                entries.clear();
997                                            }
998                                        }
999
1000                                        entries.push(current);
1001                                    } else {
1002                                        // Override value
1003                                    }
1004                                }
1005                                current = Some(entry);
1006                            }
1007                            if let Some(entry) = current {
1008                                total_key_size += entry.key.len();
1009                                // Obsolete as we no longer need total_value_size
1010                                // total_value_size += entry.value.uncompressed_size_in_sst();
1011                                entries.push(entry);
1012                            }
1013
1014                            // If we have one set of entries left, write them to a new SST file
1015                            if last_entries.is_empty() && !entries.is_empty() {
1016                                let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1017
1018                                keys_written += entries.len() as u64;
1019                                new_sst_files.push(create_sst_file(
1020                                    &self.parallel_scheduler,
1021                                    &entries,
1022                                    total_key_size,
1023                                    path,
1024                                    seq,
1025                                )?);
1026                            } else
1027                            // If we have two sets of entries left, merge them and
1028                            // split it into two SST files, to avoid having a
1029                            // single SST file that is very small.
1030                            if !last_entries.is_empty() {
1031                                last_entries.append(&mut entries);
1032
1033                                last_entries_total_key_size += total_key_size;
1034
1035                                let (part1, part2) = last_entries.split_at(last_entries.len() / 2);
1036
1037                                let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1038                                let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1039
1040                                keys_written += part1.len() as u64;
1041                                new_sst_files.push(create_sst_file(
1042                                    &self.parallel_scheduler,
1043                                    part1,
1044                                    // We don't know the exact sizes so we estimate them
1045                                    last_entries_total_key_size / 2,
1046                                    path,
1047                                    seq1,
1048                                )?);
1049
1050                                keys_written += part2.len() as u64;
1051                                new_sst_files.push(create_sst_file(
1052                                    &self.parallel_scheduler,
1053                                    part2,
1054                                    last_entries_total_key_size / 2,
1055                                    path,
1056                                    seq2,
1057                                )?);
1058                            }
1059                            Ok(PartialMergeResult::Merged {
1060                                new_sst_files,
1061                                blob_seq_numbers_to_delete,
1062                                keys_written,
1063                            })
1064                        })
1065                        .with_context(|| {
1066                            format!("Failed to merge database files for family {family}")
1067                        })?;
1068
1069                    let Some((sst_files_len, blob_delete_len)) = merge_result
1070                        .iter()
1071                        .map(|r| {
1072                            if let PartialMergeResult::Merged {
1073                                new_sst_files,
1074                                blob_seq_numbers_to_delete,
1075                                keys_written: _,
1076                            } = r
1077                            {
1078                                (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1079                            } else {
1080                                (0, 0)
1081                            }
1082                        })
1083                        .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1084                    else {
1085                        unreachable!()
1086                    };
1087
1088                    let mut new_sst_files = Vec::with_capacity(sst_files_len);
1089                    let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1090
1091                    let mut meta_file_builder = MetaFileBuilder::new(family);
1092
1093                    let mut keys_written = 0;
1094                    for result in merge_result {
1095                        match result {
1096                            PartialMergeResult::Merged {
1097                                new_sst_files: merged_new_sst_files,
1098                                blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1099                                keys_written: merged_keys_written,
1100                            } => {
1101                                for (seq, file, meta) in merged_new_sst_files {
1102                                    meta_file_builder.add(seq, meta);
1103                                    new_sst_files.push((seq, file));
1104                                }
1105                                blob_seq_numbers_to_delete
1106                                    .extend(merged_blob_seq_numbers_to_delete);
1107                                keys_written += merged_keys_written;
1108                            }
1109                            PartialMergeResult::Move { seq, meta } => {
1110                                meta_file_builder.add(seq, meta);
1111                            }
1112                        }
1113                    }
1114
1115                    for &seq in sst_seq_numbers_to_delete.iter() {
1116                        meta_file_builder.add_obsolete_sst_file(seq);
1117                    }
1118
1119                    let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1120                    let meta_file = {
1121                        let _span = tracing::trace_span!("write meta file").entered();
1122                        self.parallel_scheduler
1123                            .block_in_place(|| meta_file_builder.write(&self.path, seq))?
1124                    };
1125
1126                    Ok(PartialResultPerFamily {
1127                        new_meta_file: Some((seq, meta_file)),
1128                        new_sst_files,
1129                        sst_seq_numbers_to_delete,
1130                        blob_seq_numbers_to_delete,
1131                        keys_written,
1132                    })
1133                },
1134            )?;
1135
1136        for PartialResultPerFamily {
1137            new_meta_file: inner_new_meta_file,
1138            new_sst_files: mut inner_new_sst_files,
1139            sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1140            blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1141            keys_written: inner_keys_written,
1142        } in result
1143        {
1144            new_meta_files.extend(inner_new_meta_file);
1145            new_sst_files.append(&mut inner_new_sst_files);
1146            sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1147            blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1148            *keys_written += inner_keys_written;
1149        }
1150
1151        Ok(())
1152    }
1153
1154    /// Get a value from the database. Returns None if the key is not found. The returned value
1155    /// might hold onto a block of the database and it should not be hold long-term.
1156    pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1157        let hash = hash_key(key);
1158        let inner = self.inner.read();
1159        for meta in inner.meta_files.iter().rev() {
1160            match meta.lookup(
1161                family as u32,
1162                hash,
1163                key,
1164                &self.amqf_cache,
1165                &self.key_block_cache,
1166                &self.value_block_cache,
1167            )? {
1168                MetaLookupResult::FamilyMiss => {
1169                    #[cfg(feature = "stats")]
1170                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1171                }
1172                MetaLookupResult::RangeMiss => {
1173                    #[cfg(feature = "stats")]
1174                    self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1175                }
1176                MetaLookupResult::QuickFilterMiss => {
1177                    #[cfg(feature = "stats")]
1178                    self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1179                }
1180                MetaLookupResult::SstLookup(result) => match result {
1181                    SstLookupResult::Found(result) => match result {
1182                        LookupValue::Deleted => {
1183                            #[cfg(feature = "stats")]
1184                            self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1185                            return Ok(None);
1186                        }
1187                        LookupValue::Slice { value } => {
1188                            #[cfg(feature = "stats")]
1189                            self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1190                            return Ok(Some(value));
1191                        }
1192                        LookupValue::Blob { sequence_number } => {
1193                            #[cfg(feature = "stats")]
1194                            self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1195                            let blob = self.read_blob(sequence_number)?;
1196                            return Ok(Some(blob));
1197                        }
1198                    },
1199                    SstLookupResult::NotFound => {
1200                        #[cfg(feature = "stats")]
1201                        self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1202                    }
1203                },
1204            }
1205        }
1206        #[cfg(feature = "stats")]
1207        self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1208        Ok(None)
1209    }
1210
1211    /// Returns database statistics.
1212    #[cfg(feature = "stats")]
1213    pub fn statistics(&self) -> Statistics {
1214        let inner = self.inner.read();
1215        Statistics {
1216            meta_files: inner.meta_files.len(),
1217            sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1218            key_block_cache: CacheStatistics::new(&self.key_block_cache),
1219            value_block_cache: CacheStatistics::new(&self.value_block_cache),
1220            amqf_cache: CacheStatistics::new(&self.amqf_cache),
1221            hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1222                + self.stats.hits_small.load(Ordering::Relaxed)
1223                + self.stats.hits_blob.load(Ordering::Relaxed),
1224            misses: self.stats.miss_global.load(Ordering::Relaxed),
1225            miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1226            miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1227            miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1228            miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1229        }
1230    }
1231
1232    pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1233        Ok(self
1234            .inner
1235            .read()
1236            .meta_files
1237            .iter()
1238            .rev()
1239            .map(|meta_file| {
1240                let entries = meta_file
1241                    .entries()
1242                    .iter()
1243                    .map(|entry| {
1244                        let amqf = entry.raw_amqf(meta_file.amqf_data());
1245                        MetaFileEntryInfo {
1246                            sequence_number: entry.sequence_number(),
1247                            min_hash: entry.min_hash(),
1248                            max_hash: entry.max_hash(),
1249                            sst_size: entry.size(),
1250                            amqf_size: entry.amqf_size(),
1251                            amqf_entries: amqf.len(),
1252                            key_compression_dictionary_size: entry
1253                                .key_compression_dictionary_length(),
1254                            block_count: entry.block_count(),
1255                        }
1256                    })
1257                    .collect();
1258                MetaFileInfo {
1259                    sequence_number: meta_file.sequence_number(),
1260                    family: meta_file.family(),
1261                    obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1262                    entries,
1263                }
1264            })
1265            .collect())
1266    }
1267
1268    /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
1269    pub fn shutdown(&self) -> Result<()> {
1270        #[cfg(feature = "print_stats")]
1271        println!("{:#?}", self.statistics());
1272        Ok(())
1273    }
1274}
1275
1276pub struct MetaFileInfo {
1277    pub sequence_number: u32,
1278    pub family: u32,
1279    pub obsolete_sst_files: Vec<u32>,
1280    pub entries: Vec<MetaFileEntryInfo>,
1281}
1282
1283pub struct MetaFileEntryInfo {
1284    pub sequence_number: u32,
1285    pub min_hash: u64,
1286    pub max_hash: u64,
1287    pub amqf_size: u32,
1288    pub amqf_entries: usize,
1289    pub sst_size: u64,
1290    pub key_compression_dictionary_size: u16,
1291    pub block_count: u16,
1292}