turbo_persistence/
db.rs

1use std::{
2    borrow::Cow,
3    collections::HashSet,
4    fs::{self, File, OpenOptions, ReadDir},
5    io::{BufWriter, Write},
6    mem::swap,
7    ops::RangeInclusive,
8    path::{Path, PathBuf},
9    sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use jiff::Timestamp;
15use memmap2::Mmap;
16use parking_lot::{Mutex, RwLock};
17
18pub use crate::compaction::selector::CompactConfig;
19use crate::{
20    QueryKey,
21    arc_slice::ArcSlice,
22    compaction::selector::{Compactable, compute_metrics, get_merge_segments},
23    compression::decompress_into_arc,
24    constants::{
25        AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
26        KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
27        VALUE_BLOCK_CACHE_SIZE,
28    },
29    key::{StoreKey, hash_key},
30    lookup_entry::{LookupEntry, LookupValue},
31    merge_iter::MergeIter,
32    meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange},
33    meta_file_builder::MetaFileBuilder,
34    parallel_scheduler::ParallelScheduler,
35    sst_filter::SstFilter,
36    static_sorted_file::{BlockCache, SstLookupResult},
37    static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
38    write_batch::{FinishResult, WriteBatch},
39};
40
41#[cfg(feature = "stats")]
42#[derive(Debug)]
43pub struct CacheStatistics {
44    pub hit_rate: f32,
45    pub fill: f32,
46    pub items: usize,
47    pub size: u64,
48    pub hits: u64,
49    pub misses: u64,
50}
51
52#[cfg(feature = "stats")]
53impl CacheStatistics {
54    fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
55    where
56        Key: Eq + std::hash::Hash,
57        Val: Clone,
58        We: quick_cache::Weighter<Key, Val> + Clone,
59        B: std::hash::BuildHasher + Clone,
60        L: quick_cache::Lifecycle<Key, Val> + Clone,
61    {
62        let size = cache.weight();
63        let hits = cache.hits();
64        let misses = cache.misses();
65        Self {
66            hit_rate: hits as f32 / (hits + misses) as f32,
67            fill: size as f32 / cache.capacity() as f32,
68            items: cache.len(),
69            size,
70            hits,
71            misses,
72        }
73    }
74}
75
76#[cfg(feature = "stats")]
77#[derive(Debug)]
78pub struct Statistics {
79    pub meta_files: usize,
80    pub sst_files: usize,
81    pub key_block_cache: CacheStatistics,
82    pub value_block_cache: CacheStatistics,
83    pub amqf_cache: CacheStatistics,
84    pub hits: u64,
85    pub misses: u64,
86    pub miss_family: u64,
87    pub miss_range: u64,
88    pub miss_amqf: u64,
89    pub miss_key: u64,
90}
91
92#[cfg(feature = "stats")]
93#[derive(Default)]
94struct TrackedStats {
95    hits_deleted: std::sync::atomic::AtomicU64,
96    hits_small: std::sync::atomic::AtomicU64,
97    hits_blob: std::sync::atomic::AtomicU64,
98    miss_family: std::sync::atomic::AtomicU64,
99    miss_range: std::sync::atomic::AtomicU64,
100    miss_amqf: std::sync::atomic::AtomicU64,
101    miss_key: std::sync::atomic::AtomicU64,
102    miss_global: std::sync::atomic::AtomicU64,
103}
104
105/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
106/// using a single write batch. It allows for concurrent reads.
107pub struct TurboPersistence<S: ParallelScheduler> {
108    parallel_scheduler: S,
109    /// The path to the directory where the database is stored
110    path: PathBuf,
111    /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
112    /// no modification on the database is performed.
113    read_only: bool,
114    /// The inner state of the database. Writing will update that.
115    inner: RwLock<Inner>,
116    /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent
117    /// write operations.
118    active_write_operation: AtomicBool,
119    /// A cache for deserialized AMQF filters.
120    amqf_cache: AmqfCache,
121    /// A cache for decompressed key blocks.
122    key_block_cache: BlockCache,
123    /// A cache for decompressed value blocks.
124    value_block_cache: BlockCache,
125    /// Statistics for the database.
126    #[cfg(feature = "stats")]
127    stats: TrackedStats,
128}
129
130/// The inner state of the database.
131struct Inner {
132    /// The list of meta files in the database. This is used to derive the SST files.
133    meta_files: Vec<MetaFile>,
134    /// The current sequence number for the database.
135    current_sequence_number: u32,
136}
137
138pub struct CommitOptions {
139    new_meta_files: Vec<(u32, File)>,
140    new_sst_files: Vec<(u32, File)>,
141    new_blob_files: Vec<(u32, File)>,
142    sst_seq_numbers_to_delete: Vec<u32>,
143    blob_seq_numbers_to_delete: Vec<u32>,
144    sequence_number: u32,
145    keys_written: u64,
146}
147
148impl<S: ParallelScheduler + Default> TurboPersistence<S> {
149    /// Open a TurboPersistence database at the given path.
150    /// This will read the directory and might performance cleanup when the database was not closed
151    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
152    /// files, so it's fast.
153    pub fn open(path: PathBuf) -> Result<Self> {
154        Self::open_with_parallel_scheduler(path, Default::default())
155    }
156
157    /// Open a TurboPersistence database at the given path in read only mode.
158    /// This will read the directory. No Cleanup is performed.
159    pub fn open_read_only(path: PathBuf) -> Result<Self> {
160        Self::open_read_only_with_parallel_scheduler(path, Default::default())
161    }
162}
163
164impl<S: ParallelScheduler> TurboPersistence<S> {
165    fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self {
166        Self {
167            parallel_scheduler,
168            path,
169            read_only,
170            inner: RwLock::new(Inner {
171                meta_files: Vec::new(),
172                current_sequence_number: 0,
173            }),
174            active_write_operation: AtomicBool::new(false),
175            amqf_cache: AmqfCache::with(
176                AMQF_CACHE_SIZE as usize / AMQF_AVG_SIZE,
177                AMQF_CACHE_SIZE,
178                Default::default(),
179                Default::default(),
180                Default::default(),
181            ),
182            key_block_cache: BlockCache::with(
183                KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
184                KEY_BLOCK_CACHE_SIZE,
185                Default::default(),
186                Default::default(),
187                Default::default(),
188            ),
189            value_block_cache: BlockCache::with(
190                VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
191                VALUE_BLOCK_CACHE_SIZE,
192                Default::default(),
193                Default::default(),
194                Default::default(),
195            ),
196            #[cfg(feature = "stats")]
197            stats: TrackedStats::default(),
198        }
199    }
200
201    /// Open a TurboPersistence database at the given path.
202    /// This will read the directory and might performance cleanup when the database was not closed
203    /// properly. Cleanup only requires to read a few bytes from a few files and to delete
204    /// files, so it's fast.
205    pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
206        let mut db = Self::new(path, false, parallel_scheduler);
207        db.open_directory(false)?;
208        Ok(db)
209    }
210
211    /// Open a TurboPersistence database at the given path in read only mode.
212    /// This will read the directory. No Cleanup is performed.
213    pub fn open_read_only_with_parallel_scheduler(
214        path: PathBuf,
215        parallel_scheduler: S,
216    ) -> Result<Self> {
217        let mut db = Self::new(path, true, parallel_scheduler);
218        db.open_directory(false)?;
219        Ok(db)
220    }
221
222    /// Performs the initial check on the database directory.
223    fn open_directory(&mut self, read_only: bool) -> Result<()> {
224        match fs::read_dir(&self.path) {
225            Ok(entries) => {
226                if !self
227                    .load_directory(entries, read_only)
228                    .context("Loading persistence directory failed")?
229                {
230                    if read_only {
231                        bail!("Failed to open database");
232                    }
233                    self.init_directory()
234                        .context("Initializing persistence directory failed")?;
235                }
236                Ok(())
237            }
238            Err(e) => {
239                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
240                    self.create_and_init_directory()
241                        .context("Creating and initializing persistence directory failed")?;
242                    Ok(())
243                } else {
244                    Err(e).context("Failed to open database")
245                }
246            }
247        }
248    }
249
250    /// Creates the directory and initializes it.
251    fn create_and_init_directory(&mut self) -> Result<()> {
252        fs::create_dir_all(&self.path)?;
253        self.init_directory()
254    }
255
256    /// Initializes the directory by creating the CURRENT file.
257    fn init_directory(&mut self) -> Result<()> {
258        let mut current = File::create(self.path.join("CURRENT"))?;
259        current.write_u32::<BE>(0)?;
260        current.flush()?;
261        Ok(())
262    }
263
264    /// Loads an existing database directory and performs cleanup if necessary.
265    fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
266        let mut meta_files = Vec::new();
267        let mut current_file = match File::open(self.path.join("CURRENT")) {
268            Ok(file) => file,
269            Err(e) => {
270                if !read_only && e.kind() == std::io::ErrorKind::NotFound {
271                    return Ok(false);
272                } else {
273                    return Err(e).context("Failed to open CURRENT file");
274                }
275            }
276        };
277        let current = current_file.read_u32::<BE>()?;
278        drop(current_file);
279
280        let mut deleted_files = HashSet::new();
281        for entry in entries {
282            let entry = entry?;
283            let path = entry.path();
284            if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
285                let seq: u32 = path
286                    .file_stem()
287                    .context("File has no file stem")?
288                    .to_str()
289                    .context("File stem is not valid utf-8")?
290                    .parse()?;
291                if deleted_files.contains(&seq) {
292                    continue;
293                }
294                if seq > current {
295                    if !read_only {
296                        fs::remove_file(&path)?;
297                    }
298                } else {
299                    match ext {
300                        "meta" => {
301                            meta_files.push(seq);
302                        }
303                        "del" => {
304                            let mut content = &*fs::read(&path)?;
305                            let mut no_existing_files = true;
306                            while !content.is_empty() {
307                                let seq = content.read_u32::<BE>()?;
308                                deleted_files.insert(seq);
309                                if !read_only {
310                                    // Remove the files that are marked for deletion
311                                    let sst_file = self.path.join(format!("{seq:08}.sst"));
312                                    let meta_file = self.path.join(format!("{seq:08}.meta"));
313                                    let blob_file = self.path.join(format!("{seq:08}.blob"));
314                                    for path in [sst_file, meta_file, blob_file] {
315                                        if fs::exists(&path)? {
316                                            fs::remove_file(path)?;
317                                            no_existing_files = false;
318                                        }
319                                    }
320                                }
321                            }
322                            if !read_only && no_existing_files {
323                                fs::remove_file(&path)?;
324                            }
325                        }
326                        "blob" | "sst" => {
327                            // ignore blobs and sst, they are read when needed
328                        }
329                        _ => {
330                            if !path
331                                .file_name()
332                                .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
333                            {
334                                bail!("Unexpected file in persistence directory: {:?}", path);
335                            }
336                        }
337                    }
338                }
339            } else {
340                match path.file_stem().and_then(|s| s.to_str()) {
341                    Some("CURRENT") => {
342                        // Already read
343                    }
344                    Some("LOG") => {
345                        // Ignored, write-only
346                    }
347                    _ => {
348                        if !path
349                            .file_name()
350                            .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
351                        {
352                            bail!("Unexpected file in persistence directory: {:?}", path);
353                        }
354                    }
355                }
356            }
357        }
358
359        meta_files.retain(|seq| !deleted_files.contains(seq));
360        meta_files.sort_unstable();
361        let mut meta_files = self
362            .parallel_scheduler
363            .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
364                let meta_file = MetaFile::open(&self.path, seq)?;
365                Ok(meta_file)
366            })?;
367
368        let mut sst_filter = SstFilter::new();
369        for meta_file in meta_files.iter_mut().rev() {
370            sst_filter.apply_filter(meta_file);
371        }
372
373        let inner = self.inner.get_mut();
374        inner.meta_files = meta_files;
375        inner.current_sequence_number = current;
376        Ok(true)
377    }
378
379    /// Reads and decompresses a blob file. This is not backed by any cache.
380    fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
381        let path = self.path.join(format!("{seq:08}.blob"));
382        let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
383        #[cfg(unix)]
384        mmap.advise(memmap2::Advice::Sequential)?;
385        #[cfg(unix)]
386        mmap.advise(memmap2::Advice::WillNeed)?;
387        #[cfg(target_os = "linux")]
388        mmap.advise(memmap2::Advice::DontFork)?;
389        #[cfg(target_os = "linux")]
390        mmap.advise(memmap2::Advice::Unmergeable)?;
391        let mut compressed = &mmap[..];
392        let uncompressed_length = compressed.read_u32::<BE>()?;
393
394        let buffer = decompress_into_arc(uncompressed_length, compressed, None, true)?;
395        Ok(ArcSlice::from(buffer))
396    }
397
398    /// Returns true if the database is empty.
399    pub fn is_empty(&self) -> bool {
400        self.inner.read().meta_files.is_empty()
401    }
402
403    /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a
404    /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
405    /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
406    /// This data will only become visible after the WriteBatch is committed.
407    pub fn write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
408        &self,
409    ) -> Result<WriteBatch<K, S, FAMILIES>> {
410        if self.read_only {
411            bail!("Cannot write to a read-only database");
412        }
413        if self
414            .active_write_operation
415            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
416            .is_err()
417        {
418            bail!(
419                "Another write batch or compaction is already active (Only a single write \
420                 operations is allowed at a time)"
421            );
422        }
423        let current = self.inner.read().current_sequence_number;
424        Ok(WriteBatch::new(
425            self.path.clone(),
426            current,
427            self.parallel_scheduler.clone(),
428        ))
429    }
430
431    fn open_log(&self) -> Result<BufWriter<File>> {
432        if self.read_only {
433            unreachable!("Only write operations can open the log file");
434        }
435        let log_path = self.path.join("LOG");
436        let log_file = OpenOptions::new()
437            .create(true)
438            .append(true)
439            .open(log_path)?;
440        Ok(BufWriter::new(log_file))
441    }
442
443    /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
444    /// visible to readers.
445    pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
446        &self,
447        mut write_batch: WriteBatch<K, S, FAMILIES>,
448    ) -> Result<()> {
449        if self.read_only {
450            unreachable!("It's not possible to create a write batch for a read-only database");
451        }
452        let FinishResult {
453            sequence_number,
454            new_meta_files,
455            new_sst_files,
456            new_blob_files,
457            keys_written,
458        } = write_batch.finish()?;
459        self.commit(CommitOptions {
460            new_meta_files,
461            new_sst_files,
462            new_blob_files,
463            sst_seq_numbers_to_delete: vec![],
464            blob_seq_numbers_to_delete: vec![],
465            sequence_number,
466            keys_written,
467        })?;
468        self.active_write_operation.store(false, Ordering::Release);
469        Ok(())
470    }
471
472    /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the
473    /// new files.
474    fn commit(
475        &self,
476        CommitOptions {
477            mut new_meta_files,
478            mut new_sst_files,
479            mut new_blob_files,
480            mut sst_seq_numbers_to_delete,
481            mut blob_seq_numbers_to_delete,
482            sequence_number: mut seq,
483            keys_written,
484        }: CommitOptions,
485    ) -> Result<(), anyhow::Error> {
486        let time = Timestamp::now();
487
488        new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
489
490        let mut new_meta_files = self
491            .parallel_scheduler
492            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
493                file.sync_all()?;
494                let meta_file = MetaFile::open(&self.path, seq)?;
495                Ok(meta_file)
496            })?;
497
498        let mut sst_filter = SstFilter::new();
499        for meta_file in new_meta_files.iter_mut().rev() {
500            sst_filter.apply_filter(meta_file);
501        }
502
503        self.parallel_scheduler.block_in_place(|| {
504            for (_, file) in new_sst_files.iter() {
505                file.sync_all()?;
506            }
507            for (_, file) in new_blob_files.iter() {
508                file.sync_all()?;
509            }
510            anyhow::Ok(())
511        })?;
512
513        let new_meta_info = new_meta_files
514            .iter()
515            .map(|meta| {
516                let ssts = meta
517                    .entries()
518                    .iter()
519                    .map(|entry| {
520                        let seq = entry.sequence_number();
521                        let range = entry.range();
522                        let size = entry.size();
523                        (seq, range.min_hash, range.max_hash, size)
524                    })
525                    .collect::<Vec<_>>();
526                (
527                    meta.sequence_number(),
528                    meta.family(),
529                    ssts,
530                    meta.obsolete_sst_files().to_vec(),
531                )
532            })
533            .collect::<Vec<_>>();
534
535        let has_delete_file;
536        let mut meta_seq_numbers_to_delete = Vec::new();
537
538        {
539            let mut inner = self.inner.write();
540            for meta_file in inner.meta_files.iter_mut().rev() {
541                sst_filter.apply_filter(meta_file);
542            }
543            inner.meta_files.append(&mut new_meta_files);
544            // apply_and_get_remove need to run in reverse order
545            inner.meta_files.reverse();
546            inner.meta_files.retain(|meta| {
547                if sst_filter.apply_and_get_remove(meta) {
548                    meta_seq_numbers_to_delete.push(meta.sequence_number());
549                    false
550                } else {
551                    true
552                }
553            });
554            inner.meta_files.reverse();
555            has_delete_file = !sst_seq_numbers_to_delete.is_empty()
556                || !blob_seq_numbers_to_delete.is_empty()
557                || !meta_seq_numbers_to_delete.is_empty();
558            if has_delete_file {
559                seq += 1;
560            }
561            inner.current_sequence_number = seq;
562        }
563
564        self.parallel_scheduler.block_in_place(|| {
565            if has_delete_file {
566                sst_seq_numbers_to_delete.sort_unstable();
567                meta_seq_numbers_to_delete.sort_unstable();
568                blob_seq_numbers_to_delete.sort_unstable();
569                // Write *.del file, marking the selected files as to delete
570                let mut buf = Vec::with_capacity(
571                    (sst_seq_numbers_to_delete.len()
572                        + meta_seq_numbers_to_delete.len()
573                        + blob_seq_numbers_to_delete.len())
574                        * size_of::<u32>(),
575                );
576                for seq in sst_seq_numbers_to_delete.iter() {
577                    buf.write_u32::<BE>(*seq)?;
578                }
579                for seq in meta_seq_numbers_to_delete.iter() {
580                    buf.write_u32::<BE>(*seq)?;
581                }
582                for seq in blob_seq_numbers_to_delete.iter() {
583                    buf.write_u32::<BE>(*seq)?;
584                }
585                let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
586                file.write_all(&buf)?;
587                file.sync_all()?;
588            }
589
590            let mut current_file = OpenOptions::new()
591                .write(true)
592                .truncate(false)
593                .read(false)
594                .open(self.path.join("CURRENT"))?;
595            current_file.write_u32::<BE>(seq)?;
596            current_file.sync_all()?;
597
598            for seq in sst_seq_numbers_to_delete.iter() {
599                fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
600            }
601            for seq in meta_seq_numbers_to_delete.iter() {
602                fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
603            }
604            for seq in blob_seq_numbers_to_delete.iter() {
605                fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
606            }
607
608            {
609                let mut log = self.open_log()?;
610                writeln!(log, "Time {time}")?;
611                let span = time.until(Timestamp::now())?;
612                writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
613                for (seq, family, ssts, obsolete) in new_meta_info {
614                    writeln!(log, "{seq:08} META family:{family}",)?;
615                    for (seq, min, max, size) in ssts {
616                        writeln!(
617                            log,
618                            "  {seq:08} SST  {min:016x}-{max:016x} {} MiB",
619                            size / 1024 / 1024
620                        )?;
621                    }
622                    for seq in obsolete {
623                        writeln!(log, "  {seq:08} OBSOLETE SST")?;
624                    }
625                }
626                new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
627                for (seq, _) in new_sst_files.iter() {
628                    writeln!(log, "{seq:08} NEW SST")?;
629                }
630                new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
631                for (seq, _) in new_blob_files.iter() {
632                    writeln!(log, "{seq:08} NEW BLOB")?;
633                }
634                for seq in sst_seq_numbers_to_delete.iter() {
635                    writeln!(log, "{seq:08} SST DELETED")?;
636                }
637                for seq in meta_seq_numbers_to_delete.iter() {
638                    writeln!(log, "{seq:08} META DELETED")?;
639                }
640                for seq in blob_seq_numbers_to_delete.iter() {
641                    writeln!(log, "{seq:08} BLOB DELETED")?;
642                }
643            }
644            anyhow::Ok(())
645        })?;
646        Ok(())
647    }
648
649    /// Runs a full compaction on the database. This will rewrite all SST files, removing all
650    /// duplicate keys and separating all key ranges into unique files.
651    pub fn full_compact(&self) -> Result<()> {
652        self.compact(&CompactConfig {
653            min_merge_count: 2,
654            optimal_merge_count: usize::MAX,
655            max_merge_count: usize::MAX,
656            max_merge_bytes: u64::MAX,
657            min_merge_duplication_bytes: 0,
658            optimal_merge_duplication_bytes: u64::MAX,
659            max_merge_segment_count: usize::MAX,
660        })?;
661        Ok(())
662    }
663
664    /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
665    /// files is above the given threshold. The coverage is the average number of SST files that
666    /// need to be read to find a key. It also limits the maximum number of SST files that are
667    /// merged at once, which is the main factor for the runtime of the compaction.
668    pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
669        if self.read_only {
670            bail!("Compaction is not allowed on a read only database");
671        }
672        let _span = tracing::info_span!("compact database").entered();
673        if self
674            .active_write_operation
675            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
676            .is_err()
677        {
678            bail!(
679                "Another write batch or compaction is already active (Only a single write \
680                 operations is allowed at a time)"
681            );
682        }
683
684        let mut sequence_number;
685        let mut new_meta_files = Vec::new();
686        let mut new_sst_files = Vec::new();
687        let mut sst_seq_numbers_to_delete = Vec::new();
688        let mut blob_seq_numbers_to_delete = Vec::new();
689        let mut keys_written = 0;
690
691        {
692            let inner = self.inner.read();
693            sequence_number = AtomicU32::new(inner.current_sequence_number);
694            self.compact_internal(
695                &inner.meta_files,
696                &sequence_number,
697                &mut new_meta_files,
698                &mut new_sst_files,
699                &mut sst_seq_numbers_to_delete,
700                &mut blob_seq_numbers_to_delete,
701                &mut keys_written,
702                compact_config,
703            )
704            .context("Failed to compact database")?;
705        }
706
707        let has_changes = !new_meta_files.is_empty();
708        if has_changes {
709            self.commit(CommitOptions {
710                new_meta_files,
711                new_sst_files,
712                new_blob_files: Vec::new(),
713                sst_seq_numbers_to_delete,
714                blob_seq_numbers_to_delete,
715                sequence_number: *sequence_number.get_mut(),
716                keys_written,
717            })
718            .context("Failed to commit the database compaction")?;
719        }
720
721        self.active_write_operation.store(false, Ordering::Release);
722
723        Ok(has_changes)
724    }
725
726    /// Internal function to perform a compaction.
727    fn compact_internal(
728        &self,
729        meta_files: &[MetaFile],
730        sequence_number: &AtomicU32,
731        new_meta_files: &mut Vec<(u32, File)>,
732        new_sst_files: &mut Vec<(u32, File)>,
733        sst_seq_numbers_to_delete: &mut Vec<u32>,
734        blob_seq_numbers_to_delete: &mut Vec<u32>,
735        keys_written: &mut u64,
736        compact_config: &CompactConfig,
737    ) -> Result<()> {
738        if meta_files.is_empty() {
739            return Ok(());
740        }
741
742        struct SstWithRange {
743            meta_index: usize,
744            index_in_meta: u32,
745            seq: u32,
746            range: StaticSortedFileRange,
747            size: u64,
748        }
749
750        impl Compactable for SstWithRange {
751            fn range(&self) -> RangeInclusive<u64> {
752                self.range.min_hash..=self.range.max_hash
753            }
754
755            fn size(&self) -> u64 {
756                self.size
757            }
758        }
759
760        let ssts_with_ranges = meta_files
761            .iter()
762            .enumerate()
763            .flat_map(|(meta_index, meta)| {
764                meta.entries()
765                    .iter()
766                    .enumerate()
767                    .map(move |(index_in_meta, entry)| SstWithRange {
768                        meta_index,
769                        index_in_meta: index_in_meta as u32,
770                        seq: entry.sequence_number(),
771                        range: entry.range(),
772                        size: entry.size(),
773                    })
774            })
775            .collect::<Vec<_>>();
776
777        let families = ssts_with_ranges
778            .iter()
779            .map(|s| s.range.family)
780            .max()
781            .unwrap() as usize
782            + 1;
783
784        let mut sst_by_family = Vec::with_capacity(families);
785        sst_by_family.resize_with(families, Vec::new);
786
787        for sst in ssts_with_ranges {
788            sst_by_family[sst.range.family as usize].push(sst);
789        }
790
791        let key_block_cache = &self.key_block_cache;
792        let value_block_cache = &self.value_block_cache;
793        let path = &self.path;
794
795        let log_mutex = Mutex::new(());
796
797        struct PartialResultPerFamily {
798            new_meta_file: Option<(u32, File)>,
799            new_sst_files: Vec<(u32, File)>,
800            sst_seq_numbers_to_delete: Vec<u32>,
801            blob_seq_numbers_to_delete: Vec<u32>,
802            keys_written: u64,
803        }
804
805        let mut compact_config = compact_config.clone();
806        let merge_jobs = sst_by_family
807            .into_iter()
808            .enumerate()
809            .filter_map(|(family, ssts_with_ranges)| {
810                if compact_config.max_merge_segment_count == 0 {
811                    return None;
812                }
813                let (merge_jobs, real_merge_job_size) =
814                    get_merge_segments(&ssts_with_ranges, &compact_config);
815                compact_config.max_merge_segment_count -= real_merge_job_size;
816                Some((family, ssts_with_ranges, merge_jobs))
817            })
818            .collect::<Vec<_>>();
819
820        let result = self
821            .parallel_scheduler
822            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
823                merge_jobs,
824                |(family, ssts_with_ranges, merge_jobs)| {
825                    let family = family as u32;
826
827                    if merge_jobs.is_empty() {
828                        return Ok(PartialResultPerFamily {
829                            new_meta_file: None,
830                            new_sst_files: Vec::new(),
831                            sst_seq_numbers_to_delete: Vec::new(),
832                            blob_seq_numbers_to_delete: Vec::new(),
833                            keys_written: 0,
834                        });
835                    }
836
837                    self.parallel_scheduler.block_in_place(|| {
838                        let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
839                        let guard = log_mutex.lock();
840                        let mut log = self.open_log()?;
841                        writeln!(
842                            log,
843                            "Compaction for family {family} (coverage: {}, overlap: {}, \
844                             duplication: {} / {} MiB):",
845                            metrics.coverage,
846                            metrics.overlap,
847                            metrics.duplication,
848                            metrics.duplicated_size / 1024 / 1024
849                        )?;
850                        for job in merge_jobs.iter() {
851                            writeln!(log, "  merge")?;
852                            for i in job.iter() {
853                                let seq = ssts_with_ranges[*i].seq;
854                                let (min, max) = ssts_with_ranges[*i].range().into_inner();
855                                writeln!(log, "    {seq:08} {min:016x}-{max:016x}")?;
856                            }
857                        }
858                        drop(guard);
859                        anyhow::Ok(())
860                    })?;
861
862                    // Later we will remove the merged files
863                    let sst_seq_numbers_to_delete = merge_jobs
864                        .iter()
865                        .filter(|l| l.len() > 1)
866                        .flat_map(|l| l.iter().copied())
867                        .map(|index| ssts_with_ranges[index].seq)
868                        .collect::<Vec<_>>();
869
870                    // Merge SST files
871                    let span = tracing::trace_span!("merge files");
872                    enum PartialMergeResult<'l> {
873                        Merged {
874                            new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
875                            blob_seq_numbers_to_delete: Vec<u32>,
876                            keys_written: u64,
877                        },
878                        Move {
879                            seq: u32,
880                            meta: StaticSortedFileBuilderMeta<'l>,
881                        },
882                    }
883                    let merge_result = self
884                        .parallel_scheduler
885                        .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
886                            let _span = span.clone().entered();
887                            if indices.len() == 1 {
888                                // If we only have one file, we can just move it
889                                let index = indices[0];
890                                let meta_index = ssts_with_ranges[index].meta_index;
891                                let index_in_meta = ssts_with_ranges[index].index_in_meta;
892                                let meta_file = &meta_files[meta_index];
893                                let entry = meta_file.entry(index_in_meta);
894                                let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
895                                let meta = StaticSortedFileBuilderMeta {
896                                    min_hash: entry.min_hash(),
897                                    max_hash: entry.max_hash(),
898                                    amqf,
899                                    key_compression_dictionary_length: entry
900                                        .key_compression_dictionary_length(),
901                                    block_count: entry.block_count(),
902                                    size: entry.size(),
903                                    entries: 0,
904                                };
905                                return Ok(PartialMergeResult::Move {
906                                    seq: entry.sequence_number(),
907                                    meta,
908                                });
909                            }
910
911                            fn create_sst_file<'l, S: ParallelScheduler>(
912                                parallel_scheduler: &S,
913                                entries: &[LookupEntry<'l>],
914                                total_key_size: usize,
915                                path: &Path,
916                                seq: u32,
917                            ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
918                            {
919                                let _span = tracing::trace_span!("write merged sst file").entered();
920                                let (meta, file) = parallel_scheduler.block_in_place(|| {
921                                    write_static_stored_file(
922                                        entries,
923                                        total_key_size,
924                                        &path.join(format!("{seq:08}.sst")),
925                                    )
926                                })?;
927                                Ok((seq, file, meta))
928                            }
929
930                            let mut new_sst_files = Vec::new();
931
932                            // Iterate all SST files
933                            let iters = indices
934                                .iter()
935                                .map(|&index| {
936                                    let meta_index = ssts_with_ranges[index].meta_index;
937                                    let index_in_meta = ssts_with_ranges[index].index_in_meta;
938                                    let meta = &meta_files[meta_index];
939                                    meta.entry(index_in_meta)
940                                        .sst(meta)?
941                                        .iter(key_block_cache, value_block_cache)
942                                })
943                                .collect::<Result<Vec<_>>>()?;
944
945                            let iter = MergeIter::new(iters.into_iter())?;
946
947                            // TODO figure out how to delete blobs when they are no longer
948                            // referenced
949                            let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
950
951                            let mut keys_written = 0;
952
953                            let mut total_key_size = 0;
954                            let mut total_value_size = 0;
955                            let mut current: Option<LookupEntry<'_>> = None;
956                            let mut entries = Vec::new();
957                            let mut last_entries = Vec::new();
958                            let mut last_entries_total_key_size = 0;
959                            for entry in iter {
960                                let entry = entry?;
961
962                                // Remove duplicates
963                                if let Some(current) = current.take() {
964                                    if current.key != entry.key {
965                                        let key_size = current.key.len();
966                                        let value_size = current.value.uncompressed_size_in_sst();
967                                        total_key_size += key_size;
968                                        total_value_size += value_size;
969
970                                        if total_key_size + total_value_size
971                                            > DATA_THRESHOLD_PER_COMPACTED_FILE
972                                            || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE
973                                        {
974                                            let selected_total_key_size =
975                                                last_entries_total_key_size;
976                                            swap(&mut entries, &mut last_entries);
977                                            last_entries_total_key_size = total_key_size - key_size;
978                                            total_key_size = key_size;
979                                            total_value_size = value_size;
980
981                                            if !entries.is_empty() {
982                                                let seq = sequence_number
983                                                    .fetch_add(1, Ordering::SeqCst)
984                                                    + 1;
985
986                                                keys_written += entries.len() as u64;
987                                                new_sst_files.push(create_sst_file(
988                                                    &self.parallel_scheduler,
989                                                    &entries,
990                                                    selected_total_key_size,
991                                                    path,
992                                                    seq,
993                                                )?);
994
995                                                entries.clear();
996                                            }
997                                        }
998
999                                        entries.push(current);
1000                                    } else {
1001                                        // Override value
1002                                    }
1003                                }
1004                                current = Some(entry);
1005                            }
1006                            if let Some(entry) = current {
1007                                total_key_size += entry.key.len();
1008                                // Obsolete as we no longer need total_value_size
1009                                // total_value_size += entry.value.uncompressed_size_in_sst();
1010                                entries.push(entry);
1011                            }
1012
1013                            // If we have one set of entries left, write them to a new SST file
1014                            if last_entries.is_empty() && !entries.is_empty() {
1015                                let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1016
1017                                keys_written += entries.len() as u64;
1018                                new_sst_files.push(create_sst_file(
1019                                    &self.parallel_scheduler,
1020                                    &entries,
1021                                    total_key_size,
1022                                    path,
1023                                    seq,
1024                                )?);
1025                            } else
1026                            // If we have two sets of entries left, merge them and
1027                            // split it into two SST files, to avoid having a
1028                            // single SST file that is very small.
1029                            if !last_entries.is_empty() {
1030                                last_entries.append(&mut entries);
1031
1032                                last_entries_total_key_size += total_key_size;
1033
1034                                let (part1, part2) = last_entries.split_at(last_entries.len() / 2);
1035
1036                                let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1037                                let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1038
1039                                keys_written += part1.len() as u64;
1040                                new_sst_files.push(create_sst_file(
1041                                    &self.parallel_scheduler,
1042                                    part1,
1043                                    // We don't know the exact sizes so we estimate them
1044                                    last_entries_total_key_size / 2,
1045                                    path,
1046                                    seq1,
1047                                )?);
1048
1049                                keys_written += part2.len() as u64;
1050                                new_sst_files.push(create_sst_file(
1051                                    &self.parallel_scheduler,
1052                                    part2,
1053                                    last_entries_total_key_size / 2,
1054                                    path,
1055                                    seq2,
1056                                )?);
1057                            }
1058                            Ok(PartialMergeResult::Merged {
1059                                new_sst_files,
1060                                blob_seq_numbers_to_delete,
1061                                keys_written,
1062                            })
1063                        })
1064                        .with_context(|| {
1065                            format!("Failed to merge database files for family {family}")
1066                        })?;
1067
1068                    let Some((sst_files_len, blob_delete_len)) = merge_result
1069                        .iter()
1070                        .map(|r| {
1071                            if let PartialMergeResult::Merged {
1072                                new_sst_files,
1073                                blob_seq_numbers_to_delete,
1074                                keys_written: _,
1075                            } = r
1076                            {
1077                                (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1078                            } else {
1079                                (0, 0)
1080                            }
1081                        })
1082                        .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1083                    else {
1084                        unreachable!()
1085                    };
1086
1087                    let mut new_sst_files = Vec::with_capacity(sst_files_len);
1088                    let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1089
1090                    let mut meta_file_builder = MetaFileBuilder::new(family);
1091
1092                    let mut keys_written = 0;
1093                    for result in merge_result {
1094                        match result {
1095                            PartialMergeResult::Merged {
1096                                new_sst_files: merged_new_sst_files,
1097                                blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1098                                keys_written: merged_keys_written,
1099                            } => {
1100                                for (seq, file, meta) in merged_new_sst_files {
1101                                    meta_file_builder.add(seq, meta);
1102                                    new_sst_files.push((seq, file));
1103                                }
1104                                blob_seq_numbers_to_delete
1105                                    .extend(merged_blob_seq_numbers_to_delete);
1106                                keys_written += merged_keys_written;
1107                            }
1108                            PartialMergeResult::Move { seq, meta } => {
1109                                meta_file_builder.add(seq, meta);
1110                            }
1111                        }
1112                    }
1113
1114                    for &seq in sst_seq_numbers_to_delete.iter() {
1115                        meta_file_builder.add_obsolete_sst_file(seq);
1116                    }
1117
1118                    let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1119                    let meta_file = {
1120                        let _span = tracing::trace_span!("write meta file").entered();
1121                        self.parallel_scheduler
1122                            .block_in_place(|| meta_file_builder.write(&self.path, seq))?
1123                    };
1124
1125                    Ok(PartialResultPerFamily {
1126                        new_meta_file: Some((seq, meta_file)),
1127                        new_sst_files,
1128                        sst_seq_numbers_to_delete,
1129                        blob_seq_numbers_to_delete,
1130                        keys_written,
1131                    })
1132                },
1133            )?;
1134
1135        for PartialResultPerFamily {
1136            new_meta_file: inner_new_meta_file,
1137            new_sst_files: mut inner_new_sst_files,
1138            sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1139            blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1140            keys_written: inner_keys_written,
1141        } in result
1142        {
1143            new_meta_files.extend(inner_new_meta_file);
1144            new_sst_files.append(&mut inner_new_sst_files);
1145            sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1146            blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1147            *keys_written += inner_keys_written;
1148        }
1149
1150        Ok(())
1151    }
1152
1153    /// Get a value from the database. Returns None if the key is not found. The returned value
1154    /// might hold onto a block of the database and it should not be hold long-term.
1155    pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1156        let hash = hash_key(key);
1157        let inner = self.inner.read();
1158        for meta in inner.meta_files.iter().rev() {
1159            match meta.lookup(
1160                family as u32,
1161                hash,
1162                key,
1163                &self.amqf_cache,
1164                &self.key_block_cache,
1165                &self.value_block_cache,
1166            )? {
1167                MetaLookupResult::FamilyMiss => {
1168                    #[cfg(feature = "stats")]
1169                    self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1170                }
1171                MetaLookupResult::RangeMiss => {
1172                    #[cfg(feature = "stats")]
1173                    self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1174                }
1175                MetaLookupResult::QuickFilterMiss => {
1176                    #[cfg(feature = "stats")]
1177                    self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1178                }
1179                MetaLookupResult::SstLookup(result) => match result {
1180                    SstLookupResult::Found(result) => match result {
1181                        LookupValue::Deleted => {
1182                            #[cfg(feature = "stats")]
1183                            self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1184                            return Ok(None);
1185                        }
1186                        LookupValue::Slice { value } => {
1187                            #[cfg(feature = "stats")]
1188                            self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1189                            return Ok(Some(value));
1190                        }
1191                        LookupValue::Blob { sequence_number } => {
1192                            #[cfg(feature = "stats")]
1193                            self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1194                            let blob = self.read_blob(sequence_number)?;
1195                            return Ok(Some(blob));
1196                        }
1197                    },
1198                    SstLookupResult::NotFound => {
1199                        #[cfg(feature = "stats")]
1200                        self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1201                    }
1202                },
1203            }
1204        }
1205        #[cfg(feature = "stats")]
1206        self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1207        Ok(None)
1208    }
1209
1210    /// Returns database statistics.
1211    #[cfg(feature = "stats")]
1212    pub fn statistics(&self) -> Statistics {
1213        let inner = self.inner.read();
1214        Statistics {
1215            meta_files: inner.meta_files.len(),
1216            sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1217            key_block_cache: CacheStatistics::new(&self.key_block_cache),
1218            value_block_cache: CacheStatistics::new(&self.value_block_cache),
1219            amqf_cache: CacheStatistics::new(&self.amqf_cache),
1220            hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1221                + self.stats.hits_small.load(Ordering::Relaxed)
1222                + self.stats.hits_blob.load(Ordering::Relaxed),
1223            misses: self.stats.miss_global.load(Ordering::Relaxed),
1224            miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1225            miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1226            miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1227            miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1228        }
1229    }
1230
1231    pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1232        Ok(self
1233            .inner
1234            .read()
1235            .meta_files
1236            .iter()
1237            .rev()
1238            .map(|meta_file| {
1239                let entries = meta_file
1240                    .entries()
1241                    .iter()
1242                    .map(|entry| {
1243                        let amqf = entry.raw_amqf(meta_file.amqf_data());
1244                        MetaFileEntryInfo {
1245                            sequence_number: entry.sequence_number(),
1246                            min_hash: entry.min_hash(),
1247                            max_hash: entry.max_hash(),
1248                            sst_size: entry.size(),
1249                            amqf_size: entry.amqf_size(),
1250                            amqf_entries: amqf.len(),
1251                            key_compression_dictionary_size: entry
1252                                .key_compression_dictionary_length(),
1253                            block_count: entry.block_count(),
1254                        }
1255                    })
1256                    .collect();
1257                MetaFileInfo {
1258                    sequence_number: meta_file.sequence_number(),
1259                    family: meta_file.family(),
1260                    obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1261                    entries,
1262                }
1263            })
1264            .collect())
1265    }
1266
1267    /// Shuts down the database. This will print statistics if the `print_stats` feature is enabled.
1268    pub fn shutdown(&self) -> Result<()> {
1269        #[cfg(feature = "print_stats")]
1270        println!("{:#?}", self.statistics());
1271        Ok(())
1272    }
1273}
1274
1275pub struct MetaFileInfo {
1276    pub sequence_number: u32,
1277    pub family: u32,
1278    pub obsolete_sst_files: Vec<u32>,
1279    pub entries: Vec<MetaFileEntryInfo>,
1280}
1281
1282pub struct MetaFileEntryInfo {
1283    pub sequence_number: u32,
1284    pub min_hash: u64,
1285    pub max_hash: u64,
1286    pub amqf_size: u32,
1287    pub amqf_entries: usize,
1288    pub sst_size: u64,
1289    pub key_compression_dictionary_size: u16,
1290    pub block_count: u16,
1291}