1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{self, File, OpenOptions, ReadDir},
5 io::{BufWriter, Write},
6 mem::swap,
7 ops::RangeInclusive,
8 path::{Path, PathBuf},
9 sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use jiff::Timestamp;
15use memmap2::Mmap;
16use parking_lot::{Mutex, RwLock};
17
18pub use crate::compaction::selector::CompactConfig;
19use crate::{
20 QueryKey,
21 arc_slice::ArcSlice,
22 compaction::selector::{Compactable, compute_metrics, get_merge_segments},
23 compression::decompress_into_arc,
24 constants::{
25 AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
26 KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
27 VALUE_BLOCK_CACHE_SIZE,
28 },
29 key::{StoreKey, hash_key},
30 lookup_entry::{LookupEntry, LookupValue},
31 merge_iter::MergeIter,
32 meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange},
33 meta_file_builder::MetaFileBuilder,
34 parallel_scheduler::ParallelScheduler,
35 sst_filter::SstFilter,
36 static_sorted_file::{BlockCache, SstLookupResult},
37 static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
38 write_batch::{FinishResult, WriteBatch},
39};
40
41#[cfg(feature = "stats")]
42#[derive(Debug)]
43pub struct CacheStatistics {
44 pub hit_rate: f32,
45 pub fill: f32,
46 pub items: usize,
47 pub size: u64,
48 pub hits: u64,
49 pub misses: u64,
50}
51
52#[cfg(feature = "stats")]
53impl CacheStatistics {
54 fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
55 where
56 Key: Eq + std::hash::Hash,
57 Val: Clone,
58 We: quick_cache::Weighter<Key, Val> + Clone,
59 B: std::hash::BuildHasher + Clone,
60 L: quick_cache::Lifecycle<Key, Val> + Clone,
61 {
62 let size = cache.weight();
63 let hits = cache.hits();
64 let misses = cache.misses();
65 Self {
66 hit_rate: hits as f32 / (hits + misses) as f32,
67 fill: size as f32 / cache.capacity() as f32,
68 items: cache.len(),
69 size,
70 hits,
71 misses,
72 }
73 }
74}
75
76#[cfg(feature = "stats")]
77#[derive(Debug)]
78pub struct Statistics {
79 pub meta_files: usize,
80 pub sst_files: usize,
81 pub key_block_cache: CacheStatistics,
82 pub value_block_cache: CacheStatistics,
83 pub amqf_cache: CacheStatistics,
84 pub hits: u64,
85 pub misses: u64,
86 pub miss_family: u64,
87 pub miss_range: u64,
88 pub miss_amqf: u64,
89 pub miss_key: u64,
90}
91
92#[cfg(feature = "stats")]
93#[derive(Default)]
94struct TrackedStats {
95 hits_deleted: std::sync::atomic::AtomicU64,
96 hits_small: std::sync::atomic::AtomicU64,
97 hits_blob: std::sync::atomic::AtomicU64,
98 miss_family: std::sync::atomic::AtomicU64,
99 miss_range: std::sync::atomic::AtomicU64,
100 miss_amqf: std::sync::atomic::AtomicU64,
101 miss_key: std::sync::atomic::AtomicU64,
102 miss_global: std::sync::atomic::AtomicU64,
103}
104
105pub struct TurboPersistence<S: ParallelScheduler> {
108 parallel_scheduler: S,
109 path: PathBuf,
111 read_only: bool,
114 inner: RwLock<Inner>,
116 active_write_operation: AtomicBool,
119 amqf_cache: AmqfCache,
121 key_block_cache: BlockCache,
123 value_block_cache: BlockCache,
125 #[cfg(feature = "stats")]
127 stats: TrackedStats,
128}
129
130struct Inner {
132 meta_files: Vec<MetaFile>,
134 current_sequence_number: u32,
136}
137
138pub struct CommitOptions {
139 new_meta_files: Vec<(u32, File)>,
140 new_sst_files: Vec<(u32, File)>,
141 new_blob_files: Vec<(u32, File)>,
142 sst_seq_numbers_to_delete: Vec<u32>,
143 blob_seq_numbers_to_delete: Vec<u32>,
144 sequence_number: u32,
145 keys_written: u64,
146}
147
148impl<S: ParallelScheduler + Default> TurboPersistence<S> {
149 pub fn open(path: PathBuf) -> Result<Self> {
154 Self::open_with_parallel_scheduler(path, Default::default())
155 }
156
157 pub fn open_read_only(path: PathBuf) -> Result<Self> {
160 Self::open_read_only_with_parallel_scheduler(path, Default::default())
161 }
162}
163
164impl<S: ParallelScheduler> TurboPersistence<S> {
165 fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self {
166 Self {
167 parallel_scheduler,
168 path,
169 read_only,
170 inner: RwLock::new(Inner {
171 meta_files: Vec::new(),
172 current_sequence_number: 0,
173 }),
174 active_write_operation: AtomicBool::new(false),
175 amqf_cache: AmqfCache::with(
176 AMQF_CACHE_SIZE as usize / AMQF_AVG_SIZE,
177 AMQF_CACHE_SIZE,
178 Default::default(),
179 Default::default(),
180 Default::default(),
181 ),
182 key_block_cache: BlockCache::with(
183 KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
184 KEY_BLOCK_CACHE_SIZE,
185 Default::default(),
186 Default::default(),
187 Default::default(),
188 ),
189 value_block_cache: BlockCache::with(
190 VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
191 VALUE_BLOCK_CACHE_SIZE,
192 Default::default(),
193 Default::default(),
194 Default::default(),
195 ),
196 #[cfg(feature = "stats")]
197 stats: TrackedStats::default(),
198 }
199 }
200
201 pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
206 let mut db = Self::new(path, false, parallel_scheduler);
207 db.open_directory(false)?;
208 Ok(db)
209 }
210
211 pub fn open_read_only_with_parallel_scheduler(
214 path: PathBuf,
215 parallel_scheduler: S,
216 ) -> Result<Self> {
217 let mut db = Self::new(path, true, parallel_scheduler);
218 db.open_directory(false)?;
219 Ok(db)
220 }
221
222 fn open_directory(&mut self, read_only: bool) -> Result<()> {
224 match fs::read_dir(&self.path) {
225 Ok(entries) => {
226 if !self
227 .load_directory(entries, read_only)
228 .context("Loading persistence directory failed")?
229 {
230 if read_only {
231 bail!("Failed to open database");
232 }
233 self.init_directory()
234 .context("Initializing persistence directory failed")?;
235 }
236 Ok(())
237 }
238 Err(e) => {
239 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
240 self.create_and_init_directory()
241 .context("Creating and initializing persistence directory failed")?;
242 Ok(())
243 } else {
244 Err(e).context("Failed to open database")
245 }
246 }
247 }
248 }
249
250 fn create_and_init_directory(&mut self) -> Result<()> {
252 fs::create_dir_all(&self.path)?;
253 self.init_directory()
254 }
255
256 fn init_directory(&mut self) -> Result<()> {
258 let mut current = File::create(self.path.join("CURRENT"))?;
259 current.write_u32::<BE>(0)?;
260 current.flush()?;
261 Ok(())
262 }
263
264 fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
266 let mut meta_files = Vec::new();
267 let mut current_file = match File::open(self.path.join("CURRENT")) {
268 Ok(file) => file,
269 Err(e) => {
270 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
271 return Ok(false);
272 } else {
273 return Err(e).context("Failed to open CURRENT file");
274 }
275 }
276 };
277 let current = current_file.read_u32::<BE>()?;
278 drop(current_file);
279
280 let mut deleted_files = HashSet::new();
281 for entry in entries {
282 let entry = entry?;
283 let path = entry.path();
284 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
285 let seq: u32 = path
286 .file_stem()
287 .context("File has no file stem")?
288 .to_str()
289 .context("File stem is not valid utf-8")?
290 .parse()?;
291 if deleted_files.contains(&seq) {
292 continue;
293 }
294 if seq > current {
295 if !read_only {
296 fs::remove_file(&path)?;
297 }
298 } else {
299 match ext {
300 "meta" => {
301 meta_files.push(seq);
302 }
303 "del" => {
304 let mut content = &*fs::read(&path)?;
305 let mut no_existing_files = true;
306 while !content.is_empty() {
307 let seq = content.read_u32::<BE>()?;
308 deleted_files.insert(seq);
309 if !read_only {
310 let sst_file = self.path.join(format!("{seq:08}.sst"));
312 let meta_file = self.path.join(format!("{seq:08}.meta"));
313 let blob_file = self.path.join(format!("{seq:08}.blob"));
314 for path in [sst_file, meta_file, blob_file] {
315 if fs::exists(&path)? {
316 fs::remove_file(path)?;
317 no_existing_files = false;
318 }
319 }
320 }
321 }
322 if !read_only && no_existing_files {
323 fs::remove_file(&path)?;
324 }
325 }
326 "blob" | "sst" => {
327 }
329 _ => {
330 if !path
331 .file_name()
332 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
333 {
334 bail!("Unexpected file in persistence directory: {:?}", path);
335 }
336 }
337 }
338 }
339 } else {
340 match path.file_stem().and_then(|s| s.to_str()) {
341 Some("CURRENT") => {
342 }
344 Some("LOG") => {
345 }
347 _ => {
348 if !path
349 .file_name()
350 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
351 {
352 bail!("Unexpected file in persistence directory: {:?}", path);
353 }
354 }
355 }
356 }
357 }
358
359 meta_files.retain(|seq| !deleted_files.contains(seq));
360 meta_files.sort_unstable();
361 let mut meta_files = self
362 .parallel_scheduler
363 .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
364 let meta_file = MetaFile::open(&self.path, seq)?;
365 Ok(meta_file)
366 })?;
367
368 let mut sst_filter = SstFilter::new();
369 for meta_file in meta_files.iter_mut().rev() {
370 sst_filter.apply_filter(meta_file);
371 }
372
373 let inner = self.inner.get_mut();
374 inner.meta_files = meta_files;
375 inner.current_sequence_number = current;
376 Ok(true)
377 }
378
379 fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
381 let path = self.path.join(format!("{seq:08}.blob"));
382 let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
383 #[cfg(unix)]
384 mmap.advise(memmap2::Advice::Sequential)?;
385 #[cfg(unix)]
386 mmap.advise(memmap2::Advice::WillNeed)?;
387 #[cfg(target_os = "linux")]
388 mmap.advise(memmap2::Advice::DontFork)?;
389 #[cfg(target_os = "linux")]
390 mmap.advise(memmap2::Advice::Unmergeable)?;
391 let mut compressed = &mmap[..];
392 let uncompressed_length = compressed.read_u32::<BE>()?;
393
394 let buffer = decompress_into_arc(uncompressed_length, compressed, None, true)?;
395 Ok(ArcSlice::from(buffer))
396 }
397
398 pub fn is_empty(&self) -> bool {
400 self.inner.read().meta_files.is_empty()
401 }
402
403 pub fn write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
408 &self,
409 ) -> Result<WriteBatch<K, S, FAMILIES>> {
410 if self.read_only {
411 bail!("Cannot write to a read-only database");
412 }
413 if self
414 .active_write_operation
415 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
416 .is_err()
417 {
418 bail!(
419 "Another write batch or compaction is already active (Only a single write \
420 operations is allowed at a time)"
421 );
422 }
423 let current = self.inner.read().current_sequence_number;
424 Ok(WriteBatch::new(
425 self.path.clone(),
426 current,
427 self.parallel_scheduler.clone(),
428 ))
429 }
430
431 fn open_log(&self) -> Result<BufWriter<File>> {
432 if self.read_only {
433 unreachable!("Only write operations can open the log file");
434 }
435 let log_path = self.path.join("LOG");
436 let log_file = OpenOptions::new()
437 .create(true)
438 .append(true)
439 .open(log_path)?;
440 Ok(BufWriter::new(log_file))
441 }
442
443 pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
446 &self,
447 mut write_batch: WriteBatch<K, S, FAMILIES>,
448 ) -> Result<()> {
449 if self.read_only {
450 unreachable!("It's not possible to create a write batch for a read-only database");
451 }
452 let FinishResult {
453 sequence_number,
454 new_meta_files,
455 new_sst_files,
456 new_blob_files,
457 keys_written,
458 } = write_batch.finish()?;
459 self.commit(CommitOptions {
460 new_meta_files,
461 new_sst_files,
462 new_blob_files,
463 sst_seq_numbers_to_delete: vec![],
464 blob_seq_numbers_to_delete: vec![],
465 sequence_number,
466 keys_written,
467 })?;
468 self.active_write_operation.store(false, Ordering::Release);
469 Ok(())
470 }
471
472 fn commit(
475 &self,
476 CommitOptions {
477 mut new_meta_files,
478 mut new_sst_files,
479 mut new_blob_files,
480 mut sst_seq_numbers_to_delete,
481 mut blob_seq_numbers_to_delete,
482 sequence_number: mut seq,
483 keys_written,
484 }: CommitOptions,
485 ) -> Result<(), anyhow::Error> {
486 let time = Timestamp::now();
487
488 new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
489
490 let mut new_meta_files = self
491 .parallel_scheduler
492 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
493 file.sync_all()?;
494 let meta_file = MetaFile::open(&self.path, seq)?;
495 Ok(meta_file)
496 })?;
497
498 let mut sst_filter = SstFilter::new();
499 for meta_file in new_meta_files.iter_mut().rev() {
500 sst_filter.apply_filter(meta_file);
501 }
502
503 self.parallel_scheduler.block_in_place(|| {
504 for (_, file) in new_sst_files.iter() {
505 file.sync_all()?;
506 }
507 for (_, file) in new_blob_files.iter() {
508 file.sync_all()?;
509 }
510 anyhow::Ok(())
511 })?;
512
513 let new_meta_info = new_meta_files
514 .iter()
515 .map(|meta| {
516 let ssts = meta
517 .entries()
518 .iter()
519 .map(|entry| {
520 let seq = entry.sequence_number();
521 let range = entry.range();
522 let size = entry.size();
523 (seq, range.min_hash, range.max_hash, size)
524 })
525 .collect::<Vec<_>>();
526 (
527 meta.sequence_number(),
528 meta.family(),
529 ssts,
530 meta.obsolete_sst_files().to_vec(),
531 )
532 })
533 .collect::<Vec<_>>();
534
535 let has_delete_file;
536 let mut meta_seq_numbers_to_delete = Vec::new();
537
538 {
539 let mut inner = self.inner.write();
540 for meta_file in inner.meta_files.iter_mut().rev() {
541 sst_filter.apply_filter(meta_file);
542 }
543 inner.meta_files.append(&mut new_meta_files);
544 inner.meta_files.reverse();
546 inner.meta_files.retain(|meta| {
547 if sst_filter.apply_and_get_remove(meta) {
548 meta_seq_numbers_to_delete.push(meta.sequence_number());
549 false
550 } else {
551 true
552 }
553 });
554 inner.meta_files.reverse();
555 has_delete_file = !sst_seq_numbers_to_delete.is_empty()
556 || !blob_seq_numbers_to_delete.is_empty()
557 || !meta_seq_numbers_to_delete.is_empty();
558 if has_delete_file {
559 seq += 1;
560 }
561 inner.current_sequence_number = seq;
562 }
563
564 self.parallel_scheduler.block_in_place(|| {
565 if has_delete_file {
566 sst_seq_numbers_to_delete.sort_unstable();
567 meta_seq_numbers_to_delete.sort_unstable();
568 blob_seq_numbers_to_delete.sort_unstable();
569 let mut buf = Vec::with_capacity(
571 (sst_seq_numbers_to_delete.len()
572 + meta_seq_numbers_to_delete.len()
573 + blob_seq_numbers_to_delete.len())
574 * size_of::<u32>(),
575 );
576 for seq in sst_seq_numbers_to_delete.iter() {
577 buf.write_u32::<BE>(*seq)?;
578 }
579 for seq in meta_seq_numbers_to_delete.iter() {
580 buf.write_u32::<BE>(*seq)?;
581 }
582 for seq in blob_seq_numbers_to_delete.iter() {
583 buf.write_u32::<BE>(*seq)?;
584 }
585 let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
586 file.write_all(&buf)?;
587 file.sync_all()?;
588 }
589
590 let mut current_file = OpenOptions::new()
591 .write(true)
592 .truncate(false)
593 .read(false)
594 .open(self.path.join("CURRENT"))?;
595 current_file.write_u32::<BE>(seq)?;
596 current_file.sync_all()?;
597
598 for seq in sst_seq_numbers_to_delete.iter() {
599 fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
600 }
601 for seq in meta_seq_numbers_to_delete.iter() {
602 fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
603 }
604 for seq in blob_seq_numbers_to_delete.iter() {
605 fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
606 }
607
608 {
609 let mut log = self.open_log()?;
610 writeln!(log, "Time {time}")?;
611 let span = time.until(Timestamp::now())?;
612 writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
613 for (seq, family, ssts, obsolete) in new_meta_info {
614 writeln!(log, "{seq:08} META family:{family}",)?;
615 for (seq, min, max, size) in ssts {
616 writeln!(
617 log,
618 " {seq:08} SST {min:016x}-{max:016x} {} MiB",
619 size / 1024 / 1024
620 )?;
621 }
622 for seq in obsolete {
623 writeln!(log, " {seq:08} OBSOLETE SST")?;
624 }
625 }
626 new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
627 for (seq, _) in new_sst_files.iter() {
628 writeln!(log, "{seq:08} NEW SST")?;
629 }
630 new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
631 for (seq, _) in new_blob_files.iter() {
632 writeln!(log, "{seq:08} NEW BLOB")?;
633 }
634 for seq in sst_seq_numbers_to_delete.iter() {
635 writeln!(log, "{seq:08} SST DELETED")?;
636 }
637 for seq in meta_seq_numbers_to_delete.iter() {
638 writeln!(log, "{seq:08} META DELETED")?;
639 }
640 for seq in blob_seq_numbers_to_delete.iter() {
641 writeln!(log, "{seq:08} BLOB DELETED")?;
642 }
643 }
644 anyhow::Ok(())
645 })?;
646 Ok(())
647 }
648
649 pub fn full_compact(&self) -> Result<()> {
652 self.compact(&CompactConfig {
653 min_merge_count: 2,
654 optimal_merge_count: usize::MAX,
655 max_merge_count: usize::MAX,
656 max_merge_bytes: u64::MAX,
657 min_merge_duplication_bytes: 0,
658 optimal_merge_duplication_bytes: u64::MAX,
659 max_merge_segment_count: usize::MAX,
660 })?;
661 Ok(())
662 }
663
664 pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
669 if self.read_only {
670 bail!("Compaction is not allowed on a read only database");
671 }
672 let _span = tracing::info_span!("compact database").entered();
673 if self
674 .active_write_operation
675 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
676 .is_err()
677 {
678 bail!(
679 "Another write batch or compaction is already active (Only a single write \
680 operations is allowed at a time)"
681 );
682 }
683
684 let mut sequence_number;
685 let mut new_meta_files = Vec::new();
686 let mut new_sst_files = Vec::new();
687 let mut sst_seq_numbers_to_delete = Vec::new();
688 let mut blob_seq_numbers_to_delete = Vec::new();
689 let mut keys_written = 0;
690
691 {
692 let inner = self.inner.read();
693 sequence_number = AtomicU32::new(inner.current_sequence_number);
694 self.compact_internal(
695 &inner.meta_files,
696 &sequence_number,
697 &mut new_meta_files,
698 &mut new_sst_files,
699 &mut sst_seq_numbers_to_delete,
700 &mut blob_seq_numbers_to_delete,
701 &mut keys_written,
702 compact_config,
703 )
704 .context("Failed to compact database")?;
705 }
706
707 let has_changes = !new_meta_files.is_empty();
708 if has_changes {
709 self.commit(CommitOptions {
710 new_meta_files,
711 new_sst_files,
712 new_blob_files: Vec::new(),
713 sst_seq_numbers_to_delete,
714 blob_seq_numbers_to_delete,
715 sequence_number: *sequence_number.get_mut(),
716 keys_written,
717 })
718 .context("Failed to commit the database compaction")?;
719 }
720
721 self.active_write_operation.store(false, Ordering::Release);
722
723 Ok(has_changes)
724 }
725
726 fn compact_internal(
728 &self,
729 meta_files: &[MetaFile],
730 sequence_number: &AtomicU32,
731 new_meta_files: &mut Vec<(u32, File)>,
732 new_sst_files: &mut Vec<(u32, File)>,
733 sst_seq_numbers_to_delete: &mut Vec<u32>,
734 blob_seq_numbers_to_delete: &mut Vec<u32>,
735 keys_written: &mut u64,
736 compact_config: &CompactConfig,
737 ) -> Result<()> {
738 if meta_files.is_empty() {
739 return Ok(());
740 }
741
742 struct SstWithRange {
743 meta_index: usize,
744 index_in_meta: u32,
745 seq: u32,
746 range: StaticSortedFileRange,
747 size: u64,
748 }
749
750 impl Compactable for SstWithRange {
751 fn range(&self) -> RangeInclusive<u64> {
752 self.range.min_hash..=self.range.max_hash
753 }
754
755 fn size(&self) -> u64 {
756 self.size
757 }
758 }
759
760 let ssts_with_ranges = meta_files
761 .iter()
762 .enumerate()
763 .flat_map(|(meta_index, meta)| {
764 meta.entries()
765 .iter()
766 .enumerate()
767 .map(move |(index_in_meta, entry)| SstWithRange {
768 meta_index,
769 index_in_meta: index_in_meta as u32,
770 seq: entry.sequence_number(),
771 range: entry.range(),
772 size: entry.size(),
773 })
774 })
775 .collect::<Vec<_>>();
776
777 let families = ssts_with_ranges
778 .iter()
779 .map(|s| s.range.family)
780 .max()
781 .unwrap() as usize
782 + 1;
783
784 let mut sst_by_family = Vec::with_capacity(families);
785 sst_by_family.resize_with(families, Vec::new);
786
787 for sst in ssts_with_ranges {
788 sst_by_family[sst.range.family as usize].push(sst);
789 }
790
791 let key_block_cache = &self.key_block_cache;
792 let value_block_cache = &self.value_block_cache;
793 let path = &self.path;
794
795 let log_mutex = Mutex::new(());
796
797 struct PartialResultPerFamily {
798 new_meta_file: Option<(u32, File)>,
799 new_sst_files: Vec<(u32, File)>,
800 sst_seq_numbers_to_delete: Vec<u32>,
801 blob_seq_numbers_to_delete: Vec<u32>,
802 keys_written: u64,
803 }
804
805 let mut compact_config = compact_config.clone();
806 let merge_jobs = sst_by_family
807 .into_iter()
808 .enumerate()
809 .filter_map(|(family, ssts_with_ranges)| {
810 if compact_config.max_merge_segment_count == 0 {
811 return None;
812 }
813 let (merge_jobs, real_merge_job_size) =
814 get_merge_segments(&ssts_with_ranges, &compact_config);
815 compact_config.max_merge_segment_count -= real_merge_job_size;
816 Some((family, ssts_with_ranges, merge_jobs))
817 })
818 .collect::<Vec<_>>();
819
820 let result = self
821 .parallel_scheduler
822 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
823 merge_jobs,
824 |(family, ssts_with_ranges, merge_jobs)| {
825 let family = family as u32;
826
827 if merge_jobs.is_empty() {
828 return Ok(PartialResultPerFamily {
829 new_meta_file: None,
830 new_sst_files: Vec::new(),
831 sst_seq_numbers_to_delete: Vec::new(),
832 blob_seq_numbers_to_delete: Vec::new(),
833 keys_written: 0,
834 });
835 }
836
837 self.parallel_scheduler.block_in_place(|| {
838 let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
839 let guard = log_mutex.lock();
840 let mut log = self.open_log()?;
841 writeln!(
842 log,
843 "Compaction for family {family} (coverage: {}, overlap: {}, \
844 duplication: {} / {} MiB):",
845 metrics.coverage,
846 metrics.overlap,
847 metrics.duplication,
848 metrics.duplicated_size / 1024 / 1024
849 )?;
850 for job in merge_jobs.iter() {
851 writeln!(log, " merge")?;
852 for i in job.iter() {
853 let seq = ssts_with_ranges[*i].seq;
854 let (min, max) = ssts_with_ranges[*i].range().into_inner();
855 writeln!(log, " {seq:08} {min:016x}-{max:016x}")?;
856 }
857 }
858 drop(guard);
859 anyhow::Ok(())
860 })?;
861
862 let sst_seq_numbers_to_delete = merge_jobs
864 .iter()
865 .filter(|l| l.len() > 1)
866 .flat_map(|l| l.iter().copied())
867 .map(|index| ssts_with_ranges[index].seq)
868 .collect::<Vec<_>>();
869
870 let span = tracing::trace_span!("merge files");
872 enum PartialMergeResult<'l> {
873 Merged {
874 new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
875 blob_seq_numbers_to_delete: Vec<u32>,
876 keys_written: u64,
877 },
878 Move {
879 seq: u32,
880 meta: StaticSortedFileBuilderMeta<'l>,
881 },
882 }
883 let merge_result = self
884 .parallel_scheduler
885 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
886 let _span = span.clone().entered();
887 if indices.len() == 1 {
888 let index = indices[0];
890 let meta_index = ssts_with_ranges[index].meta_index;
891 let index_in_meta = ssts_with_ranges[index].index_in_meta;
892 let meta_file = &meta_files[meta_index];
893 let entry = meta_file.entry(index_in_meta);
894 let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
895 let meta = StaticSortedFileBuilderMeta {
896 min_hash: entry.min_hash(),
897 max_hash: entry.max_hash(),
898 amqf,
899 key_compression_dictionary_length: entry
900 .key_compression_dictionary_length(),
901 block_count: entry.block_count(),
902 size: entry.size(),
903 entries: 0,
904 };
905 return Ok(PartialMergeResult::Move {
906 seq: entry.sequence_number(),
907 meta,
908 });
909 }
910
911 fn create_sst_file<'l, S: ParallelScheduler>(
912 parallel_scheduler: &S,
913 entries: &[LookupEntry<'l>],
914 total_key_size: usize,
915 path: &Path,
916 seq: u32,
917 ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
918 {
919 let _span = tracing::trace_span!("write merged sst file").entered();
920 let (meta, file) = parallel_scheduler.block_in_place(|| {
921 write_static_stored_file(
922 entries,
923 total_key_size,
924 &path.join(format!("{seq:08}.sst")),
925 )
926 })?;
927 Ok((seq, file, meta))
928 }
929
930 let mut new_sst_files = Vec::new();
931
932 let iters = indices
934 .iter()
935 .map(|&index| {
936 let meta_index = ssts_with_ranges[index].meta_index;
937 let index_in_meta = ssts_with_ranges[index].index_in_meta;
938 let meta = &meta_files[meta_index];
939 meta.entry(index_in_meta)
940 .sst(meta)?
941 .iter(key_block_cache, value_block_cache)
942 })
943 .collect::<Result<Vec<_>>>()?;
944
945 let iter = MergeIter::new(iters.into_iter())?;
946
947 let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
950
951 let mut keys_written = 0;
952
953 let mut total_key_size = 0;
954 let mut total_value_size = 0;
955 let mut current: Option<LookupEntry<'_>> = None;
956 let mut entries = Vec::new();
957 let mut last_entries = Vec::new();
958 let mut last_entries_total_key_size = 0;
959 for entry in iter {
960 let entry = entry?;
961
962 if let Some(current) = current.take() {
964 if current.key != entry.key {
965 let key_size = current.key.len();
966 let value_size = current.value.uncompressed_size_in_sst();
967 total_key_size += key_size;
968 total_value_size += value_size;
969
970 if total_key_size + total_value_size
971 > DATA_THRESHOLD_PER_COMPACTED_FILE
972 || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE
973 {
974 let selected_total_key_size =
975 last_entries_total_key_size;
976 swap(&mut entries, &mut last_entries);
977 last_entries_total_key_size = total_key_size - key_size;
978 total_key_size = key_size;
979 total_value_size = value_size;
980
981 if !entries.is_empty() {
982 let seq = sequence_number
983 .fetch_add(1, Ordering::SeqCst)
984 + 1;
985
986 keys_written += entries.len() as u64;
987 new_sst_files.push(create_sst_file(
988 &self.parallel_scheduler,
989 &entries,
990 selected_total_key_size,
991 path,
992 seq,
993 )?);
994
995 entries.clear();
996 }
997 }
998
999 entries.push(current);
1000 } else {
1001 }
1003 }
1004 current = Some(entry);
1005 }
1006 if let Some(entry) = current {
1007 total_key_size += entry.key.len();
1008 entries.push(entry);
1011 }
1012
1013 if last_entries.is_empty() && !entries.is_empty() {
1015 let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1016
1017 keys_written += entries.len() as u64;
1018 new_sst_files.push(create_sst_file(
1019 &self.parallel_scheduler,
1020 &entries,
1021 total_key_size,
1022 path,
1023 seq,
1024 )?);
1025 } else
1026 if !last_entries.is_empty() {
1030 last_entries.append(&mut entries);
1031
1032 last_entries_total_key_size += total_key_size;
1033
1034 let (part1, part2) = last_entries.split_at(last_entries.len() / 2);
1035
1036 let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1037 let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1038
1039 keys_written += part1.len() as u64;
1040 new_sst_files.push(create_sst_file(
1041 &self.parallel_scheduler,
1042 part1,
1043 last_entries_total_key_size / 2,
1045 path,
1046 seq1,
1047 )?);
1048
1049 keys_written += part2.len() as u64;
1050 new_sst_files.push(create_sst_file(
1051 &self.parallel_scheduler,
1052 part2,
1053 last_entries_total_key_size / 2,
1054 path,
1055 seq2,
1056 )?);
1057 }
1058 Ok(PartialMergeResult::Merged {
1059 new_sst_files,
1060 blob_seq_numbers_to_delete,
1061 keys_written,
1062 })
1063 })
1064 .with_context(|| {
1065 format!("Failed to merge database files for family {family}")
1066 })?;
1067
1068 let Some((sst_files_len, blob_delete_len)) = merge_result
1069 .iter()
1070 .map(|r| {
1071 if let PartialMergeResult::Merged {
1072 new_sst_files,
1073 blob_seq_numbers_to_delete,
1074 keys_written: _,
1075 } = r
1076 {
1077 (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1078 } else {
1079 (0, 0)
1080 }
1081 })
1082 .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1083 else {
1084 unreachable!()
1085 };
1086
1087 let mut new_sst_files = Vec::with_capacity(sst_files_len);
1088 let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1089
1090 let mut meta_file_builder = MetaFileBuilder::new(family);
1091
1092 let mut keys_written = 0;
1093 for result in merge_result {
1094 match result {
1095 PartialMergeResult::Merged {
1096 new_sst_files: merged_new_sst_files,
1097 blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1098 keys_written: merged_keys_written,
1099 } => {
1100 for (seq, file, meta) in merged_new_sst_files {
1101 meta_file_builder.add(seq, meta);
1102 new_sst_files.push((seq, file));
1103 }
1104 blob_seq_numbers_to_delete
1105 .extend(merged_blob_seq_numbers_to_delete);
1106 keys_written += merged_keys_written;
1107 }
1108 PartialMergeResult::Move { seq, meta } => {
1109 meta_file_builder.add(seq, meta);
1110 }
1111 }
1112 }
1113
1114 for &seq in sst_seq_numbers_to_delete.iter() {
1115 meta_file_builder.add_obsolete_sst_file(seq);
1116 }
1117
1118 let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1119 let meta_file = {
1120 let _span = tracing::trace_span!("write meta file").entered();
1121 self.parallel_scheduler
1122 .block_in_place(|| meta_file_builder.write(&self.path, seq))?
1123 };
1124
1125 Ok(PartialResultPerFamily {
1126 new_meta_file: Some((seq, meta_file)),
1127 new_sst_files,
1128 sst_seq_numbers_to_delete,
1129 blob_seq_numbers_to_delete,
1130 keys_written,
1131 })
1132 },
1133 )?;
1134
1135 for PartialResultPerFamily {
1136 new_meta_file: inner_new_meta_file,
1137 new_sst_files: mut inner_new_sst_files,
1138 sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1139 blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1140 keys_written: inner_keys_written,
1141 } in result
1142 {
1143 new_meta_files.extend(inner_new_meta_file);
1144 new_sst_files.append(&mut inner_new_sst_files);
1145 sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1146 blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1147 *keys_written += inner_keys_written;
1148 }
1149
1150 Ok(())
1151 }
1152
1153 pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1156 let hash = hash_key(key);
1157 let inner = self.inner.read();
1158 for meta in inner.meta_files.iter().rev() {
1159 match meta.lookup(
1160 family as u32,
1161 hash,
1162 key,
1163 &self.amqf_cache,
1164 &self.key_block_cache,
1165 &self.value_block_cache,
1166 )? {
1167 MetaLookupResult::FamilyMiss => {
1168 #[cfg(feature = "stats")]
1169 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1170 }
1171 MetaLookupResult::RangeMiss => {
1172 #[cfg(feature = "stats")]
1173 self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1174 }
1175 MetaLookupResult::QuickFilterMiss => {
1176 #[cfg(feature = "stats")]
1177 self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1178 }
1179 MetaLookupResult::SstLookup(result) => match result {
1180 SstLookupResult::Found(result) => match result {
1181 LookupValue::Deleted => {
1182 #[cfg(feature = "stats")]
1183 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1184 return Ok(None);
1185 }
1186 LookupValue::Slice { value } => {
1187 #[cfg(feature = "stats")]
1188 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1189 return Ok(Some(value));
1190 }
1191 LookupValue::Blob { sequence_number } => {
1192 #[cfg(feature = "stats")]
1193 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1194 let blob = self.read_blob(sequence_number)?;
1195 return Ok(Some(blob));
1196 }
1197 },
1198 SstLookupResult::NotFound => {
1199 #[cfg(feature = "stats")]
1200 self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1201 }
1202 },
1203 }
1204 }
1205 #[cfg(feature = "stats")]
1206 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1207 Ok(None)
1208 }
1209
1210 #[cfg(feature = "stats")]
1212 pub fn statistics(&self) -> Statistics {
1213 let inner = self.inner.read();
1214 Statistics {
1215 meta_files: inner.meta_files.len(),
1216 sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1217 key_block_cache: CacheStatistics::new(&self.key_block_cache),
1218 value_block_cache: CacheStatistics::new(&self.value_block_cache),
1219 amqf_cache: CacheStatistics::new(&self.amqf_cache),
1220 hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1221 + self.stats.hits_small.load(Ordering::Relaxed)
1222 + self.stats.hits_blob.load(Ordering::Relaxed),
1223 misses: self.stats.miss_global.load(Ordering::Relaxed),
1224 miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1225 miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1226 miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1227 miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1228 }
1229 }
1230
1231 pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1232 Ok(self
1233 .inner
1234 .read()
1235 .meta_files
1236 .iter()
1237 .rev()
1238 .map(|meta_file| {
1239 let entries = meta_file
1240 .entries()
1241 .iter()
1242 .map(|entry| {
1243 let amqf = entry.raw_amqf(meta_file.amqf_data());
1244 MetaFileEntryInfo {
1245 sequence_number: entry.sequence_number(),
1246 min_hash: entry.min_hash(),
1247 max_hash: entry.max_hash(),
1248 sst_size: entry.size(),
1249 amqf_size: entry.amqf_size(),
1250 amqf_entries: amqf.len(),
1251 key_compression_dictionary_size: entry
1252 .key_compression_dictionary_length(),
1253 block_count: entry.block_count(),
1254 }
1255 })
1256 .collect();
1257 MetaFileInfo {
1258 sequence_number: meta_file.sequence_number(),
1259 family: meta_file.family(),
1260 obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1261 entries,
1262 }
1263 })
1264 .collect())
1265 }
1266
1267 pub fn shutdown(&self) -> Result<()> {
1269 #[cfg(feature = "print_stats")]
1270 println!("{:#?}", self.statistics());
1271 Ok(())
1272 }
1273}
1274
1275pub struct MetaFileInfo {
1276 pub sequence_number: u32,
1277 pub family: u32,
1278 pub obsolete_sst_files: Vec<u32>,
1279 pub entries: Vec<MetaFileEntryInfo>,
1280}
1281
1282pub struct MetaFileEntryInfo {
1283 pub sequence_number: u32,
1284 pub min_hash: u64,
1285 pub max_hash: u64,
1286 pub amqf_size: u32,
1287 pub amqf_entries: usize,
1288 pub sst_size: u64,
1289 pub key_compression_dictionary_size: u16,
1290 pub block_count: u16,
1291}