1use std::{
2 any::{Any, TypeId},
3 borrow::Cow,
4 collections::HashSet,
5 fs::{self, File, OpenOptions, ReadDir},
6 io::{BufWriter, Write},
7 mem::{MaybeUninit, swap, transmute},
8 ops::RangeInclusive,
9 path::{Path, PathBuf},
10 sync::{
11 Arc,
12 atomic::{AtomicBool, AtomicU32, Ordering},
13 },
14};
15
16use anyhow::{Context, Result, bail};
17use byteorder::{BE, ReadBytesExt, WriteBytesExt};
18use jiff::Timestamp;
19use lzzzz::lz4::decompress;
20use memmap2::Mmap;
21use parking_lot::{Mutex, RwLock};
22use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
23use tracing::Span;
24
25pub use crate::compaction::selector::CompactConfig;
26use crate::{
27 QueryKey,
28 arc_slice::ArcSlice,
29 compaction::selector::{Compactable, compute_metrics, get_merge_segments},
30 constants::{
31 AQMF_AVG_SIZE, AQMF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
32 KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
33 VALUE_BLOCK_CACHE_SIZE,
34 },
35 key::{StoreKey, hash_key},
36 lookup_entry::{LookupEntry, LookupValue},
37 merge_iter::MergeIter,
38 meta_file::{AqmfCache, MetaFile, MetaLookupResult, StaticSortedFileRange},
39 meta_file_builder::MetaFileBuilder,
40 sst_filter::SstFilter,
41 static_sorted_file::{BlockCache, SstLookupResult},
42 static_sorted_file_builder::{StaticSortedFileBuilder, StaticSortedFileBuilderMeta},
43 write_batch::{FinishResult, WriteBatch},
44};
45
46#[cfg(feature = "stats")]
47#[derive(Debug)]
48pub struct CacheStatistics {
49 pub hit_rate: f32,
50 pub fill: f32,
51 pub items: usize,
52 pub size: u64,
53 pub hits: u64,
54 pub misses: u64,
55}
56
57#[cfg(feature = "stats")]
58impl CacheStatistics {
59 fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
60 where
61 Key: Eq + std::hash::Hash,
62 Val: Clone,
63 We: quick_cache::Weighter<Key, Val> + Clone,
64 B: std::hash::BuildHasher + Clone,
65 L: quick_cache::Lifecycle<Key, Val> + Clone,
66 {
67 let size = cache.weight();
68 let hits = cache.hits();
69 let misses = cache.misses();
70 Self {
71 hit_rate: hits as f32 / (hits + misses) as f32,
72 fill: size as f32 / cache.capacity() as f32,
73 items: cache.len(),
74 size,
75 hits,
76 misses,
77 }
78 }
79}
80
81#[cfg(feature = "stats")]
82#[derive(Debug)]
83pub struct Statistics {
84 pub meta_files: usize,
85 pub sst_files: usize,
86 pub key_block_cache: CacheStatistics,
87 pub value_block_cache: CacheStatistics,
88 pub aqmf_cache: CacheStatistics,
89 pub hits: u64,
90 pub misses: u64,
91 pub miss_family: u64,
92 pub miss_range: u64,
93 pub miss_aqmf: u64,
94 pub miss_key: u64,
95}
96
97#[cfg(feature = "stats")]
98#[derive(Default)]
99struct TrackedStats {
100 hits_deleted: std::sync::atomic::AtomicU64,
101 hits_small: std::sync::atomic::AtomicU64,
102 hits_blob: std::sync::atomic::AtomicU64,
103 miss_family: std::sync::atomic::AtomicU64,
104 miss_range: std::sync::atomic::AtomicU64,
105 miss_aqmf: std::sync::atomic::AtomicU64,
106 miss_key: std::sync::atomic::AtomicU64,
107 miss_global: std::sync::atomic::AtomicU64,
108}
109
110pub struct TurboPersistence {
113 path: PathBuf,
115 read_only: bool,
118 inner: RwLock<Inner>,
120 idle_write_batch: Mutex<Option<(TypeId, Box<dyn Any + Send + Sync>)>>,
123 active_write_operation: AtomicBool,
126 aqmf_cache: AqmfCache,
128 key_block_cache: BlockCache,
130 value_block_cache: BlockCache,
132 #[cfg(feature = "stats")]
134 stats: TrackedStats,
135}
136
137struct Inner {
139 meta_files: Vec<MetaFile>,
141 current_sequence_number: u32,
143}
144
145pub struct CommitOptions {
146 new_meta_files: Vec<(u32, File)>,
147 new_sst_files: Vec<(u32, File)>,
148 new_blob_files: Vec<(u32, File)>,
149 sst_seq_numbers_to_delete: Vec<u32>,
150 blob_seq_numbers_to_delete: Vec<u32>,
151 sequence_number: u32,
152 keys_written: u64,
153}
154
155impl TurboPersistence {
156 fn new(path: PathBuf, read_only: bool) -> Self {
157 Self {
158 path,
159 read_only,
160 inner: RwLock::new(Inner {
161 meta_files: Vec::new(),
162 current_sequence_number: 0,
163 }),
164 idle_write_batch: Mutex::new(None),
165 active_write_operation: AtomicBool::new(false),
166 aqmf_cache: AqmfCache::with(
167 AQMF_CACHE_SIZE as usize / AQMF_AVG_SIZE,
168 AQMF_CACHE_SIZE,
169 Default::default(),
170 Default::default(),
171 Default::default(),
172 ),
173 key_block_cache: BlockCache::with(
174 KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
175 KEY_BLOCK_CACHE_SIZE,
176 Default::default(),
177 Default::default(),
178 Default::default(),
179 ),
180 value_block_cache: BlockCache::with(
181 VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
182 VALUE_BLOCK_CACHE_SIZE,
183 Default::default(),
184 Default::default(),
185 Default::default(),
186 ),
187 #[cfg(feature = "stats")]
188 stats: TrackedStats::default(),
189 }
190 }
191
192 pub fn open(path: PathBuf) -> Result<Self> {
197 let mut db = Self::new(path, false);
198 db.open_directory(false)?;
199 Ok(db)
200 }
201
202 pub fn open_read_only(path: PathBuf) -> Result<Self> {
205 let mut db = Self::new(path, true);
206 db.open_directory(false)?;
207 Ok(db)
208 }
209
210 fn open_directory(&mut self, read_only: bool) -> Result<()> {
212 match fs::read_dir(&self.path) {
213 Ok(entries) => {
214 if !self
215 .load_directory(entries, read_only)
216 .context("Loading persistence directory failed")?
217 {
218 if read_only {
219 bail!("Failed to open database");
220 }
221 self.init_directory()
222 .context("Initializing persistence directory failed")?;
223 }
224 Ok(())
225 }
226 Err(e) => {
227 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
228 self.create_and_init_directory()
229 .context("Creating and initializing persistence directory failed")?;
230 Ok(())
231 } else {
232 Err(e).context("Failed to open database")
233 }
234 }
235 }
236 }
237
238 fn create_and_init_directory(&mut self) -> Result<()> {
240 fs::create_dir_all(&self.path)?;
241 self.init_directory()
242 }
243
244 fn init_directory(&mut self) -> Result<()> {
246 let mut current = File::create(self.path.join("CURRENT"))?;
247 current.write_u32::<BE>(0)?;
248 current.flush()?;
249 Ok(())
250 }
251
252 fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
254 let mut meta_files = Vec::new();
255 let mut current_file = match File::open(self.path.join("CURRENT")) {
256 Ok(file) => file,
257 Err(e) => {
258 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
259 return Ok(false);
260 } else {
261 return Err(e).context("Failed to open CURRENT file");
262 }
263 }
264 };
265 let current = current_file.read_u32::<BE>()?;
266 drop(current_file);
267
268 let mut deleted_files = HashSet::new();
269 for entry in entries {
270 let entry = entry?;
271 let path = entry.path();
272 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
273 let seq: u32 = path
274 .file_stem()
275 .context("File has no file stem")?
276 .to_str()
277 .context("File stem is not valid utf-8")?
278 .parse()?;
279 if deleted_files.contains(&seq) {
280 continue;
281 }
282 if seq > current {
283 if !read_only {
284 fs::remove_file(&path)?;
285 }
286 } else {
287 match ext {
288 "meta" => {
289 meta_files.push(seq);
290 }
291 "del" => {
292 let mut content = &*fs::read(&path)?;
293 let mut no_existing_files = true;
294 while !content.is_empty() {
295 let seq = content.read_u32::<BE>()?;
296 deleted_files.insert(seq);
297 if !read_only {
298 let sst_file = self.path.join(format!("{seq:08}.sst"));
300 let meta_file = self.path.join(format!("{seq:08}.meta"));
301 let blob_file = self.path.join(format!("{seq:08}.blob"));
302 for path in [sst_file, meta_file, blob_file] {
303 if fs::exists(&path)? {
304 fs::remove_file(path)?;
305 no_existing_files = false;
306 }
307 }
308 }
309 }
310 if !read_only && no_existing_files {
311 fs::remove_file(&path)?;
312 }
313 }
314 "blob" | "sst" => {
315 }
317 _ => {
318 if !path
319 .file_name()
320 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
321 {
322 bail!("Unexpected file in persistence directory: {:?}", path);
323 }
324 }
325 }
326 }
327 } else {
328 match path.file_stem().and_then(|s| s.to_str()) {
329 Some("CURRENT") => {
330 }
332 Some("LOG") => {
333 }
335 _ => {
336 if !path
337 .file_name()
338 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
339 {
340 bail!("Unexpected file in persistence directory: {:?}", path);
341 }
342 }
343 }
344 }
345 }
346
347 meta_files.retain(|seq| !deleted_files.contains(seq));
348 meta_files.sort_unstable();
349 let span = Span::current();
350 let mut meta_files = meta_files
351 .into_par_iter()
352 .with_min_len(1)
353 .map(|seq| {
354 let _span = span.enter();
355 let meta_file = MetaFile::open(&self.path, seq)?;
356 Ok(meta_file)
357 })
358 .collect::<Result<Vec<MetaFile>>>()?;
359
360 let mut sst_filter = SstFilter::new();
361 for meta_file in meta_files.iter_mut().rev() {
362 sst_filter.apply_filter(meta_file);
363 }
364
365 let inner = self.inner.get_mut();
366 inner.meta_files = meta_files;
367 inner.current_sequence_number = current;
368 Ok(true)
369 }
370
371 fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
373 let path = self.path.join(format!("{seq:08}.blob"));
374 let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
375 #[cfg(unix)]
376 mmap.advise(memmap2::Advice::Sequential)?;
377 #[cfg(unix)]
378 mmap.advise(memmap2::Advice::WillNeed)?;
379 #[cfg(target_os = "linux")]
380 mmap.advise(memmap2::Advice::DontFork)?;
381 #[cfg(target_os = "linux")]
382 mmap.advise(memmap2::Advice::Unmergeable)?;
383 let mut compressed = &mmap[..];
384 let uncompressed_length = compressed.read_u32::<BE>()? as usize;
385
386 let buffer = Arc::new_zeroed_slice(uncompressed_length);
387 let mut buffer = unsafe { transmute::<Arc<[MaybeUninit<u8>]>, Arc<[u8]>>(buffer) };
389 let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
391 decompress(compressed, decompressed)?;
392 Ok(ArcSlice::from(buffer))
393 }
394
395 pub fn is_empty(&self) -> bool {
397 self.inner.read().meta_files.is_empty()
398 }
399
400 pub fn write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
405 &self,
406 ) -> Result<WriteBatch<K, FAMILIES>> {
407 if self.read_only {
408 bail!("Cannot write to a read-only database");
409 }
410 if self
411 .active_write_operation
412 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
413 .is_err()
414 {
415 bail!(
416 "Another write batch or compaction is already active (Only a single write \
417 operations is allowed at a time)"
418 );
419 }
420 let current = self.inner.read().current_sequence_number;
421 if let Some((ty, any)) = self.idle_write_batch.lock().take()
422 && ty == TypeId::of::<WriteBatch<K, FAMILIES>>()
423 {
424 let mut write_batch = *any.downcast::<WriteBatch<K, FAMILIES>>().unwrap();
425 write_batch.reset(current);
426 return Ok(write_batch);
427 }
428 Ok(WriteBatch::new(self.path.clone(), current))
429 }
430
431 fn open_log(&self) -> Result<BufWriter<File>> {
432 if self.read_only {
433 unreachable!("Only write operations can open the log file");
434 }
435 let log_path = self.path.join("LOG");
436 let log_file = OpenOptions::new()
437 .create(true)
438 .append(true)
439 .open(log_path)?;
440 Ok(BufWriter::new(log_file))
441 }
442
443 pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
446 &self,
447 mut write_batch: WriteBatch<K, FAMILIES>,
448 ) -> Result<()> {
449 if self.read_only {
450 unreachable!("It's not possible to create a write batch for a read-only database");
451 }
452 let FinishResult {
453 sequence_number,
454 new_meta_files,
455 new_sst_files,
456 new_blob_files,
457 keys_written,
458 } = write_batch.finish()?;
459 self.commit(CommitOptions {
460 new_meta_files,
461 new_sst_files,
462 new_blob_files,
463 sst_seq_numbers_to_delete: vec![],
464 blob_seq_numbers_to_delete: vec![],
465 sequence_number,
466 keys_written,
467 })?;
468 self.active_write_operation.store(false, Ordering::Release);
469 self.idle_write_batch.lock().replace((
470 TypeId::of::<WriteBatch<K, FAMILIES>>(),
471 Box::new(write_batch),
472 ));
473 Ok(())
474 }
475
476 fn commit(
479 &self,
480 CommitOptions {
481 mut new_meta_files,
482 mut new_sst_files,
483 mut new_blob_files,
484 mut sst_seq_numbers_to_delete,
485 mut blob_seq_numbers_to_delete,
486 sequence_number: mut seq,
487 keys_written,
488 }: CommitOptions,
489 ) -> Result<(), anyhow::Error> {
490 let time = Timestamp::now();
491
492 new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
493
494 let mut new_meta_files = new_meta_files
495 .into_par_iter()
496 .with_min_len(1)
497 .map(|(seq, file)| {
498 file.sync_all()?;
499 let meta_file = MetaFile::open(&self.path, seq)?;
500 Ok(meta_file)
501 })
502 .collect::<Result<Vec<_>>>()?;
503
504 let mut sst_filter = SstFilter::new();
505 for meta_file in new_meta_files.iter_mut().rev() {
506 sst_filter.apply_filter(meta_file);
507 }
508
509 for (_, file) in new_sst_files.iter() {
510 file.sync_all()?;
511 }
512 for (_, file) in new_blob_files.iter() {
513 file.sync_all()?;
514 }
515
516 let new_meta_info = new_meta_files
517 .iter()
518 .map(|meta| {
519 let ssts = meta
520 .entries()
521 .iter()
522 .map(|entry| {
523 let seq = entry.sequence_number();
524 let range = entry.range();
525 let size = entry.size();
526 (seq, range.min_hash, range.max_hash, size)
527 })
528 .collect::<Vec<_>>();
529 (
530 meta.sequence_number(),
531 meta.family(),
532 ssts,
533 meta.obsolete_sst_files().to_vec(),
534 )
535 })
536 .collect::<Vec<_>>();
537
538 let has_delete_file;
539 let mut meta_seq_numbers_to_delete = Vec::new();
540
541 {
542 let mut inner = self.inner.write();
543 for meta_file in inner.meta_files.iter_mut().rev() {
544 sst_filter.apply_filter(meta_file);
545 }
546 inner.meta_files.append(&mut new_meta_files);
547 inner.meta_files.reverse();
549 inner.meta_files.retain(|meta| {
550 if sst_filter.apply_and_get_remove(meta) {
551 meta_seq_numbers_to_delete.push(meta.sequence_number());
552 false
553 } else {
554 true
555 }
556 });
557 inner.meta_files.reverse();
558 has_delete_file = !sst_seq_numbers_to_delete.is_empty()
559 || !blob_seq_numbers_to_delete.is_empty()
560 || !meta_seq_numbers_to_delete.is_empty();
561 if has_delete_file {
562 seq += 1;
563 }
564 inner.current_sequence_number = seq;
565 }
566
567 if has_delete_file {
568 sst_seq_numbers_to_delete.sort_unstable();
569 meta_seq_numbers_to_delete.sort_unstable();
570 blob_seq_numbers_to_delete.sort_unstable();
571 let mut buf = Vec::with_capacity(
573 (sst_seq_numbers_to_delete.len()
574 + meta_seq_numbers_to_delete.len()
575 + blob_seq_numbers_to_delete.len())
576 * size_of::<u32>(),
577 );
578 for seq in sst_seq_numbers_to_delete.iter() {
579 buf.write_u32::<BE>(*seq)?;
580 }
581 for seq in meta_seq_numbers_to_delete.iter() {
582 buf.write_u32::<BE>(*seq)?;
583 }
584 for seq in blob_seq_numbers_to_delete.iter() {
585 buf.write_u32::<BE>(*seq)?;
586 }
587 let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
588 file.write_all(&buf)?;
589 file.sync_all()?;
590 }
591
592 let mut current_file = OpenOptions::new()
593 .write(true)
594 .truncate(false)
595 .read(false)
596 .open(self.path.join("CURRENT"))?;
597 current_file.write_u32::<BE>(seq)?;
598 current_file.sync_all()?;
599
600 for seq in sst_seq_numbers_to_delete.iter() {
601 fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
602 }
603 for seq in meta_seq_numbers_to_delete.iter() {
604 fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
605 }
606 for seq in blob_seq_numbers_to_delete.iter() {
607 fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
608 }
609
610 {
611 let mut log = self.open_log()?;
612 writeln!(log, "Time {time}")?;
613 let span = time.until(Timestamp::now())?;
614 writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
615 for (seq, family, ssts, obsolete) in new_meta_info {
616 writeln!(log, "{seq:08} META family:{family}",)?;
617 for (seq, min, max, size) in ssts {
618 writeln!(
619 log,
620 " {seq:08} SST {min:016x}-{max:016x} {} MiB",
621 size / 1024 / 1024
622 )?;
623 }
624 for seq in obsolete {
625 writeln!(log, " {seq:08} OBSOLETE SST")?;
626 }
627 }
628 new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
629 for (seq, _) in new_sst_files.iter() {
630 writeln!(log, "{seq:08} NEW SST")?;
631 }
632 new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
633 for (seq, _) in new_blob_files.iter() {
634 writeln!(log, "{seq:08} NEW BLOB")?;
635 }
636 for seq in sst_seq_numbers_to_delete.iter() {
637 writeln!(log, "{seq:08} SST DELETED")?;
638 }
639 for seq in meta_seq_numbers_to_delete.iter() {
640 writeln!(log, "{seq:08} META DELETED")?;
641 }
642 for seq in blob_seq_numbers_to_delete.iter() {
643 writeln!(log, "{seq:08} BLOB DELETED")?;
644 }
645 }
646
647 Ok(())
648 }
649
650 pub fn full_compact(&self) -> Result<()> {
653 self.compact(&CompactConfig {
654 min_merge_count: 2,
655 optimal_merge_count: usize::MAX,
656 max_merge_count: usize::MAX,
657 max_merge_bytes: u64::MAX,
658 min_merge_duplication_bytes: 0,
659 optimal_merge_duplication_bytes: u64::MAX,
660 max_merge_segment_count: usize::MAX,
661 })?;
662 Ok(())
663 }
664
665 pub fn compact(&self, compact_config: &CompactConfig) -> Result<()> {
670 if self.read_only {
671 bail!("Compaction is not allowed on a read only database");
672 }
673 let _span = tracing::info_span!("compact database").entered();
674 if self
675 .active_write_operation
676 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
677 .is_err()
678 {
679 bail!(
680 "Another write batch or compaction is already active (Only a single write \
681 operations is allowed at a time)"
682 );
683 }
684
685 let mut sequence_number;
686 let mut new_meta_files = Vec::new();
687 let mut new_sst_files = Vec::new();
688 let mut sst_seq_numbers_to_delete = Vec::new();
689 let mut blob_seq_numbers_to_delete = Vec::new();
690 let mut keys_written = 0;
691
692 {
693 let inner = self.inner.read();
694 sequence_number = AtomicU32::new(inner.current_sequence_number);
695 self.compact_internal(
696 &inner.meta_files,
697 &sequence_number,
698 &mut new_meta_files,
699 &mut new_sst_files,
700 &mut sst_seq_numbers_to_delete,
701 &mut blob_seq_numbers_to_delete,
702 &mut keys_written,
703 compact_config,
704 )
705 .context("Failed to compact database")?;
706 }
707
708 if !new_meta_files.is_empty() {
709 self.commit(CommitOptions {
710 new_meta_files,
711 new_sst_files,
712 new_blob_files: Vec::new(),
713 sst_seq_numbers_to_delete,
714 blob_seq_numbers_to_delete,
715 sequence_number: *sequence_number.get_mut(),
716 keys_written,
717 })
718 .context("Failed to commit the database compaction")?;
719 }
720
721 self.active_write_operation.store(false, Ordering::Release);
722
723 Ok(())
724 }
725
726 fn compact_internal(
728 &self,
729 meta_files: &[MetaFile],
730 sequence_number: &AtomicU32,
731 new_meta_files: &mut Vec<(u32, File)>,
732 new_sst_files: &mut Vec<(u32, File)>,
733 sst_seq_numbers_to_delete: &mut Vec<u32>,
734 blob_seq_numbers_to_delete: &mut Vec<u32>,
735 keys_written: &mut u64,
736 compact_config: &CompactConfig,
737 ) -> Result<()> {
738 if meta_files.is_empty() {
739 return Ok(());
740 }
741
742 struct SstWithRange {
743 meta_index: usize,
744 index_in_meta: u32,
745 seq: u32,
746 range: StaticSortedFileRange,
747 size: u64,
748 }
749
750 impl Compactable for SstWithRange {
751 fn range(&self) -> RangeInclusive<u64> {
752 self.range.min_hash..=self.range.max_hash
753 }
754
755 fn size(&self) -> u64 {
756 self.size
757 }
758 }
759
760 let ssts_with_ranges = meta_files
761 .iter()
762 .enumerate()
763 .flat_map(|(meta_index, meta)| {
764 meta.entries()
765 .iter()
766 .enumerate()
767 .map(move |(index_in_meta, entry)| SstWithRange {
768 meta_index,
769 index_in_meta: index_in_meta as u32,
770 seq: entry.sequence_number(),
771 range: entry.range(),
772 size: entry.size(),
773 })
774 })
775 .collect::<Vec<_>>();
776
777 let families = ssts_with_ranges
778 .iter()
779 .map(|s| s.range.family)
780 .max()
781 .unwrap() as usize
782 + 1;
783
784 let mut sst_by_family = Vec::with_capacity(families);
785 sst_by_family.resize_with(families, Vec::new);
786
787 for sst in ssts_with_ranges {
788 sst_by_family[sst.range.family as usize].push(sst);
789 }
790
791 let key_block_cache = &self.key_block_cache;
792 let value_block_cache = &self.value_block_cache;
793 let path = &self.path;
794
795 let log_mutex = Mutex::new(());
796 let span = Span::current();
797
798 struct PartialResultPerFamily {
799 new_meta_file: Option<(u32, File)>,
800 new_sst_files: Vec<(u32, File)>,
801 sst_seq_numbers_to_delete: Vec<u32>,
802 blob_seq_numbers_to_delete: Vec<u32>,
803 keys_written: u64,
804 }
805
806 let result = sst_by_family
807 .into_par_iter()
808 .with_min_len(1)
809 .enumerate()
810 .map(|(family, ssts_with_ranges)| {
811 let family = family as u32;
812 let _span = span.clone().entered();
813
814 let merge_jobs = get_merge_segments(&ssts_with_ranges, compact_config);
815
816 if merge_jobs.is_empty() {
817 return Ok(PartialResultPerFamily {
818 new_meta_file: None,
819 new_sst_files: Vec::new(),
820 sst_seq_numbers_to_delete: Vec::new(),
821 blob_seq_numbers_to_delete: Vec::new(),
822 keys_written: 0,
823 });
824 }
825
826 {
827 let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
828 let guard = log_mutex.lock();
829 let mut log = self.open_log()?;
830 writeln!(
831 log,
832 "Compaction for family {family} (coverage: {}, overlap: {}, duplication: \
833 {} / {} MiB):",
834 metrics.coverage,
835 metrics.overlap,
836 metrics.duplication,
837 metrics.duplicated_size / 1024 / 1024
838 )?;
839 for job in merge_jobs.iter() {
840 writeln!(log, " merge")?;
841 for i in job.iter() {
842 let seq = ssts_with_ranges[*i].seq;
843 let (min, max) = ssts_with_ranges[*i].range().into_inner();
844 writeln!(log, " {seq:08} {min:016x}-{max:016x}")?;
845 }
846 }
847 drop(guard);
848 }
849
850 let sst_seq_numbers_to_delete = merge_jobs
852 .iter()
853 .filter(|l| l.len() > 1)
854 .flat_map(|l| l.iter().copied())
855 .map(|index| ssts_with_ranges[index].seq)
856 .collect::<Vec<_>>();
857
858 let span = tracing::trace_span!("merge files");
860 enum PartialMergeResult<'l> {
861 Merged {
862 new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
863 blob_seq_numbers_to_delete: Vec<u32>,
864 keys_written: u64,
865 },
866 Move {
867 seq: u32,
868 meta: StaticSortedFileBuilderMeta<'l>,
869 },
870 }
871 let merge_result = merge_jobs
872 .into_par_iter()
873 .with_min_len(1)
874 .map(|indices| {
875 let _span = span.clone().entered();
876 if indices.len() == 1 {
877 let index = indices[0];
879 let meta_index = ssts_with_ranges[index].meta_index;
880 let index_in_meta = ssts_with_ranges[index].index_in_meta;
881 let meta_file = &meta_files[meta_index];
882 let entry = meta_file.entry(index_in_meta);
883 let aqmf = Cow::Borrowed(entry.raw_aqmf(meta_file.aqmf_data()));
884 let meta = StaticSortedFileBuilderMeta {
885 min_hash: entry.min_hash(),
886 max_hash: entry.max_hash(),
887 aqmf,
888 key_compression_dictionary_length: entry
889 .key_compression_dictionary_length(),
890 value_compression_dictionary_length: entry
891 .value_compression_dictionary_length(),
892 block_count: entry.block_count(),
893 size: entry.size(),
894 entries: 0,
895 };
896 return Ok(PartialMergeResult::Move {
897 seq: entry.sequence_number(),
898 meta,
899 });
900 }
901
902 fn create_sst_file(
903 entries: &[LookupEntry],
904 total_key_size: usize,
905 total_value_size: usize,
906 path: &Path,
907 seq: u32,
908 ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
909 {
910 let _span = tracing::trace_span!("write merged sst file").entered();
911 let builder = StaticSortedFileBuilder::new(
912 entries,
913 total_key_size,
914 total_value_size,
915 )?;
916 let (meta, file) =
917 builder.write(&path.join(format!("{seq:08}.sst")))?;
918 Ok((seq, file, meta))
919 }
920
921 let mut new_sst_files = Vec::new();
922
923 let iters = indices
925 .iter()
926 .map(|&index| {
927 let meta_index = ssts_with_ranges[index].meta_index;
928 let index_in_meta = ssts_with_ranges[index].index_in_meta;
929 let meta = &meta_files[meta_index];
930 meta.entry(index_in_meta)
931 .sst(meta)?
932 .iter(key_block_cache, value_block_cache)
933 })
934 .collect::<Result<Vec<_>>>()?;
935
936 let iter = MergeIter::new(iters.into_iter())?;
937
938 let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
941
942 let mut keys_written = 0;
943
944 let mut total_key_size = 0;
945 let mut total_value_size = 0;
946 let mut current: Option<LookupEntry> = None;
947 let mut entries = Vec::new();
948 let mut last_entries = Vec::new();
949 let mut last_entries_total_sizes = (0, 0);
950 for entry in iter {
951 let entry = entry?;
952
953 if let Some(current) = current.take() {
955 if current.key != entry.key {
956 let key_size = current.key.len();
957 let value_size = current.value.size_in_sst();
958 total_key_size += key_size;
959 total_value_size += value_size;
960
961 if total_key_size + total_value_size
962 > DATA_THRESHOLD_PER_COMPACTED_FILE
963 || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE
964 {
965 let (selected_total_key_size, selected_total_value_size) =
966 last_entries_total_sizes;
967 swap(&mut entries, &mut last_entries);
968 last_entries_total_sizes = (
969 total_key_size - key_size,
970 total_value_size - value_size,
971 );
972 total_key_size = key_size;
973 total_value_size = value_size;
974
975 if !entries.is_empty() {
976 let seq =
977 sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
978
979 keys_written += entries.len() as u64;
980 new_sst_files.push(create_sst_file(
981 &entries,
982 selected_total_key_size,
983 selected_total_value_size,
984 path,
985 seq,
986 )?);
987
988 entries.clear();
989 }
990 }
991
992 entries.push(current);
993 } else {
994 }
996 }
997 current = Some(entry);
998 }
999 if let Some(entry) = current {
1000 total_key_size += entry.key.len();
1001 total_value_size += entry.value.size_in_sst();
1002 entries.push(entry);
1003 }
1004
1005 if last_entries.is_empty() && !entries.is_empty() {
1007 let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1008
1009 keys_written += entries.len() as u64;
1010 new_sst_files.push(create_sst_file(
1011 &entries,
1012 total_key_size,
1013 total_value_size,
1014 path,
1015 seq,
1016 )?);
1017 } else
1018 if !last_entries.is_empty() {
1022 last_entries.append(&mut entries);
1023
1024 last_entries_total_sizes.0 += total_key_size;
1025 last_entries_total_sizes.1 += total_value_size;
1026
1027 let (part1, part2) = last_entries.split_at(last_entries.len() / 2);
1028
1029 let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1030 let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1031
1032 keys_written += part1.len() as u64;
1033 new_sst_files.push(create_sst_file(
1034 part1,
1035 last_entries_total_sizes.0 / 2,
1037 last_entries_total_sizes.1 / 2,
1038 path,
1039 seq1,
1040 )?);
1041
1042 keys_written += part2.len() as u64;
1043 new_sst_files.push(create_sst_file(
1044 part2,
1045 last_entries_total_sizes.0 / 2,
1046 last_entries_total_sizes.1 / 2,
1047 path,
1048 seq2,
1049 )?);
1050 }
1051 Ok(PartialMergeResult::Merged {
1052 new_sst_files,
1053 blob_seq_numbers_to_delete,
1054 keys_written,
1055 })
1056 })
1057 .collect::<Result<Vec<_>>>()
1058 .with_context(|| {
1059 format!("Failed to merge database files for family {family}")
1060 })?;
1061
1062 let Some((sst_files_len, blob_delete_len)) = merge_result
1063 .iter()
1064 .map(|r| {
1065 if let PartialMergeResult::Merged {
1066 new_sst_files,
1067 blob_seq_numbers_to_delete,
1068 keys_written: _,
1069 } = r
1070 {
1071 (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1072 } else {
1073 (0, 0)
1074 }
1075 })
1076 .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1077 else {
1078 unreachable!()
1079 };
1080
1081 let mut new_sst_files = Vec::with_capacity(sst_files_len);
1082 let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1083
1084 let mut meta_file_builder = MetaFileBuilder::new(family);
1085
1086 let mut keys_written = 0;
1087 for result in merge_result {
1088 match result {
1089 PartialMergeResult::Merged {
1090 new_sst_files: merged_new_sst_files,
1091 blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1092 keys_written: merged_keys_written,
1093 } => {
1094 for (seq, file, meta) in merged_new_sst_files {
1095 meta_file_builder.add(seq, meta);
1096 new_sst_files.push((seq, file));
1097 }
1098 blob_seq_numbers_to_delete.extend(merged_blob_seq_numbers_to_delete);
1099 keys_written += merged_keys_written;
1100 }
1101 PartialMergeResult::Move { seq, meta } => {
1102 meta_file_builder.add(seq, meta);
1103 }
1104 }
1105 }
1106
1107 for &seq in sst_seq_numbers_to_delete.iter() {
1108 meta_file_builder.add_obsolete_sst_file(seq);
1109 }
1110
1111 let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1112 let meta_file = {
1113 let _span = tracing::trace_span!("write meta file").entered();
1114 meta_file_builder.write(&self.path, seq)?
1115 };
1116
1117 Ok(PartialResultPerFamily {
1118 new_meta_file: Some((seq, meta_file)),
1119 new_sst_files,
1120 sst_seq_numbers_to_delete,
1121 blob_seq_numbers_to_delete,
1122 keys_written,
1123 })
1124 })
1125 .collect::<Result<Vec<_>>>()?;
1126
1127 for PartialResultPerFamily {
1128 new_meta_file: inner_new_meta_file,
1129 new_sst_files: mut inner_new_sst_files,
1130 sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1131 blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1132 keys_written: inner_keys_written,
1133 } in result
1134 {
1135 new_meta_files.extend(inner_new_meta_file);
1136 new_sst_files.append(&mut inner_new_sst_files);
1137 sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1138 blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1139 *keys_written += inner_keys_written;
1140 }
1141
1142 Ok(())
1143 }
1144
1145 pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1148 let hash = hash_key(key);
1149 let inner = self.inner.read();
1150 for meta in inner.meta_files.iter().rev() {
1151 match meta.lookup(
1152 family as u32,
1153 hash,
1154 key,
1155 &self.aqmf_cache,
1156 &self.key_block_cache,
1157 &self.value_block_cache,
1158 )? {
1159 MetaLookupResult::FamilyMiss => {
1160 #[cfg(feature = "stats")]
1161 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1162 }
1163 MetaLookupResult::RangeMiss => {
1164 #[cfg(feature = "stats")]
1165 self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1166 }
1167 MetaLookupResult::QuickFilterMiss => {
1168 #[cfg(feature = "stats")]
1169 self.stats.miss_aqmf.fetch_add(1, Ordering::Relaxed);
1170 }
1171 MetaLookupResult::SstLookup(result) => match result {
1172 SstLookupResult::Found(result) => match result {
1173 LookupValue::Deleted => {
1174 #[cfg(feature = "stats")]
1175 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1176 return Ok(None);
1177 }
1178 LookupValue::Slice { value } => {
1179 #[cfg(feature = "stats")]
1180 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1181 return Ok(Some(value));
1182 }
1183 LookupValue::Blob { sequence_number } => {
1184 #[cfg(feature = "stats")]
1185 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1186 let blob = self.read_blob(sequence_number)?;
1187 return Ok(Some(blob));
1188 }
1189 },
1190 SstLookupResult::NotFound => {
1191 #[cfg(feature = "stats")]
1192 self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1193 }
1194 },
1195 }
1196 }
1197 #[cfg(feature = "stats")]
1198 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1199 Ok(None)
1200 }
1201
1202 #[cfg(feature = "stats")]
1204 pub fn statistics(&self) -> Statistics {
1205 let inner = self.inner.read();
1206 Statistics {
1207 meta_files: inner.meta_files.len(),
1208 sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1209 key_block_cache: CacheStatistics::new(&self.key_block_cache),
1210 value_block_cache: CacheStatistics::new(&self.value_block_cache),
1211 aqmf_cache: CacheStatistics::new(&self.aqmf_cache),
1212 hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1213 + self.stats.hits_small.load(Ordering::Relaxed)
1214 + self.stats.hits_blob.load(Ordering::Relaxed),
1215 misses: self.stats.miss_global.load(Ordering::Relaxed),
1216 miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1217 miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1218 miss_aqmf: self.stats.miss_aqmf.load(Ordering::Relaxed),
1219 miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1220 }
1221 }
1222
1223 pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1224 Ok(self
1225 .inner
1226 .read()
1227 .meta_files
1228 .iter()
1229 .rev()
1230 .map(|meta_file| {
1231 let entries = meta_file
1232 .entries()
1233 .iter()
1234 .map(|entry| {
1235 let aqmf = entry.raw_aqmf(meta_file.aqmf_data());
1236 MetaFileEntryInfo {
1237 sequence_number: entry.sequence_number(),
1238 min_hash: entry.min_hash(),
1239 max_hash: entry.max_hash(),
1240 sst_size: entry.size(),
1241 aqmf_size: entry.aqmf_size(),
1242 aqmf_entries: aqmf.len(),
1243 key_compression_dictionary_size: entry
1244 .key_compression_dictionary_length(),
1245 value_compression_dictionary_size: entry
1246 .value_compression_dictionary_length(),
1247 block_count: entry.block_count(),
1248 }
1249 })
1250 .collect();
1251 MetaFileInfo {
1252 sequence_number: meta_file.sequence_number(),
1253 family: meta_file.family(),
1254 obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1255 entries,
1256 }
1257 })
1258 .collect())
1259 }
1260
1261 pub fn shutdown(&self) -> Result<()> {
1263 #[cfg(feature = "print_stats")]
1264 println!("{:#?}", self.statistics());
1265 Ok(())
1266 }
1267}
1268
1269pub struct MetaFileInfo {
1270 pub sequence_number: u32,
1271 pub family: u32,
1272 pub obsolete_sst_files: Vec<u32>,
1273 pub entries: Vec<MetaFileEntryInfo>,
1274}
1275
1276pub struct MetaFileEntryInfo {
1277 pub sequence_number: u32,
1278 pub min_hash: u64,
1279 pub max_hash: u64,
1280 pub aqmf_size: u32,
1281 pub aqmf_entries: usize,
1282 pub sst_size: u64,
1283 pub key_compression_dictionary_size: u16,
1284 pub value_compression_dictionary_size: u16,
1285 pub block_count: u16,
1286}