1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{self, File, OpenOptions, ReadDir},
5 io::{BufWriter, Write},
6 mem::swap,
7 ops::RangeInclusive,
8 path::{Path, PathBuf},
9 sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use jiff::Timestamp;
15use memmap2::Mmap;
16use parking_lot::{Mutex, RwLock};
17
18pub use crate::compaction::selector::CompactConfig;
19use crate::{
20 QueryKey,
21 arc_slice::ArcSlice,
22 compaction::selector::{Compactable, compute_metrics, get_merge_segments},
23 compression::decompress_into_arc,
24 constants::{
25 AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
26 KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
27 VALUE_BLOCK_CACHE_SIZE,
28 },
29 key::{StoreKey, hash_key},
30 lookup_entry::{LookupEntry, LookupValue},
31 merge_iter::MergeIter,
32 meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange},
33 meta_file_builder::MetaFileBuilder,
34 parallel_scheduler::ParallelScheduler,
35 sst_filter::SstFilter,
36 static_sorted_file::{BlockCache, SstLookupResult},
37 static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
38 write_batch::{FinishResult, WriteBatch},
39};
40
41#[cfg(feature = "stats")]
42#[derive(Debug)]
43pub struct CacheStatistics {
44 pub hit_rate: f32,
45 pub fill: f32,
46 pub items: usize,
47 pub size: u64,
48 pub hits: u64,
49 pub misses: u64,
50}
51
52#[cfg(feature = "stats")]
53impl CacheStatistics {
54 fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
55 where
56 Key: Eq + std::hash::Hash,
57 Val: Clone,
58 We: quick_cache::Weighter<Key, Val> + Clone,
59 B: std::hash::BuildHasher + Clone,
60 L: quick_cache::Lifecycle<Key, Val> + Clone,
61 {
62 let size = cache.weight();
63 let hits = cache.hits();
64 let misses = cache.misses();
65 Self {
66 hit_rate: hits as f32 / (hits + misses) as f32,
67 fill: size as f32 / cache.capacity() as f32,
68 items: cache.len(),
69 size,
70 hits,
71 misses,
72 }
73 }
74}
75
76#[cfg(feature = "stats")]
77#[derive(Debug)]
78pub struct Statistics {
79 pub meta_files: usize,
80 pub sst_files: usize,
81 pub key_block_cache: CacheStatistics,
82 pub value_block_cache: CacheStatistics,
83 pub amqf_cache: CacheStatistics,
84 pub hits: u64,
85 pub misses: u64,
86 pub miss_family: u64,
87 pub miss_range: u64,
88 pub miss_amqf: u64,
89 pub miss_key: u64,
90}
91
92#[cfg(feature = "stats")]
93#[derive(Default)]
94struct TrackedStats {
95 hits_deleted: std::sync::atomic::AtomicU64,
96 hits_small: std::sync::atomic::AtomicU64,
97 hits_blob: std::sync::atomic::AtomicU64,
98 miss_family: std::sync::atomic::AtomicU64,
99 miss_range: std::sync::atomic::AtomicU64,
100 miss_amqf: std::sync::atomic::AtomicU64,
101 miss_key: std::sync::atomic::AtomicU64,
102 miss_global: std::sync::atomic::AtomicU64,
103}
104
105pub struct TurboPersistence<S: ParallelScheduler> {
108 parallel_scheduler: S,
109 path: PathBuf,
111 read_only: bool,
114 inner: RwLock<Inner>,
116 active_write_operation: AtomicBool,
119 amqf_cache: AmqfCache,
121 key_block_cache: BlockCache,
123 value_block_cache: BlockCache,
125 #[cfg(feature = "stats")]
127 stats: TrackedStats,
128}
129
130struct Inner {
132 meta_files: Vec<MetaFile>,
134 current_sequence_number: u32,
136}
137
138pub struct CommitOptions {
139 new_meta_files: Vec<(u32, File)>,
140 new_sst_files: Vec<(u32, File)>,
141 new_blob_files: Vec<(u32, File)>,
142 sst_seq_numbers_to_delete: Vec<u32>,
143 blob_seq_numbers_to_delete: Vec<u32>,
144 sequence_number: u32,
145 keys_written: u64,
146}
147
148impl<S: ParallelScheduler + Default> TurboPersistence<S> {
149 pub fn open(path: PathBuf) -> Result<Self> {
154 Self::open_with_parallel_scheduler(path, Default::default())
155 }
156
157 pub fn open_read_only(path: PathBuf) -> Result<Self> {
160 Self::open_read_only_with_parallel_scheduler(path, Default::default())
161 }
162}
163
164impl<S: ParallelScheduler> TurboPersistence<S> {
165 fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self {
166 Self {
167 parallel_scheduler,
168 path,
169 read_only,
170 inner: RwLock::new(Inner {
171 meta_files: Vec::new(),
172 current_sequence_number: 0,
173 }),
174 active_write_operation: AtomicBool::new(false),
175 amqf_cache: AmqfCache::with(
176 AMQF_CACHE_SIZE as usize / AMQF_AVG_SIZE,
177 AMQF_CACHE_SIZE,
178 Default::default(),
179 Default::default(),
180 Default::default(),
181 ),
182 key_block_cache: BlockCache::with(
183 KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
184 KEY_BLOCK_CACHE_SIZE,
185 Default::default(),
186 Default::default(),
187 Default::default(),
188 ),
189 value_block_cache: BlockCache::with(
190 VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
191 VALUE_BLOCK_CACHE_SIZE,
192 Default::default(),
193 Default::default(),
194 Default::default(),
195 ),
196 #[cfg(feature = "stats")]
197 stats: TrackedStats::default(),
198 }
199 }
200
201 pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
206 let mut db = Self::new(path, false, parallel_scheduler);
207 db.open_directory(false)?;
208 Ok(db)
209 }
210
211 pub fn open_read_only_with_parallel_scheduler(
214 path: PathBuf,
215 parallel_scheduler: S,
216 ) -> Result<Self> {
217 let mut db = Self::new(path, true, parallel_scheduler);
218 db.open_directory(false)?;
219 Ok(db)
220 }
221
222 fn open_directory(&mut self, read_only: bool) -> Result<()> {
224 match fs::read_dir(&self.path) {
225 Ok(entries) => {
226 if !self
227 .load_directory(entries, read_only)
228 .context("Loading persistence directory failed")?
229 {
230 if read_only {
231 bail!("Failed to open database");
232 }
233 self.init_directory()
234 .context("Initializing persistence directory failed")?;
235 }
236 Ok(())
237 }
238 Err(e) => {
239 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
240 self.create_and_init_directory()
241 .context("Creating and initializing persistence directory failed")?;
242 Ok(())
243 } else {
244 Err(e).context("Failed to open database")
245 }
246 }
247 }
248 }
249
250 fn create_and_init_directory(&mut self) -> Result<()> {
252 fs::create_dir_all(&self.path)?;
253 self.init_directory()
254 }
255
256 fn init_directory(&mut self) -> Result<()> {
258 let mut current = File::create(self.path.join("CURRENT"))?;
259 current.write_u32::<BE>(0)?;
260 current.flush()?;
261 Ok(())
262 }
263
264 fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
266 let mut meta_files = Vec::new();
267 let mut current_file = match File::open(self.path.join("CURRENT")) {
268 Ok(file) => file,
269 Err(e) => {
270 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
271 return Ok(false);
272 } else {
273 return Err(e).context("Failed to open CURRENT file");
274 }
275 }
276 };
277 let current = current_file.read_u32::<BE>()?;
278 drop(current_file);
279
280 let mut deleted_files = HashSet::new();
281 for entry in entries {
282 let entry = entry?;
283 let path = entry.path();
284 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
285 let seq: u32 = path
286 .file_stem()
287 .context("File has no file stem")?
288 .to_str()
289 .context("File stem is not valid utf-8")?
290 .parse()?;
291 if deleted_files.contains(&seq) {
292 continue;
293 }
294 if seq > current {
295 if !read_only {
296 fs::remove_file(&path)?;
297 }
298 } else {
299 match ext {
300 "meta" => {
301 meta_files.push(seq);
302 }
303 "del" => {
304 let mut content = &*fs::read(&path)?;
305 let mut no_existing_files = true;
306 while !content.is_empty() {
307 let seq = content.read_u32::<BE>()?;
308 deleted_files.insert(seq);
309 if !read_only {
310 let sst_file = self.path.join(format!("{seq:08}.sst"));
312 let meta_file = self.path.join(format!("{seq:08}.meta"));
313 let blob_file = self.path.join(format!("{seq:08}.blob"));
314 for path in [sst_file, meta_file, blob_file] {
315 if fs::exists(&path)? {
316 fs::remove_file(path)?;
317 no_existing_files = false;
318 }
319 }
320 }
321 }
322 if !read_only && no_existing_files {
323 fs::remove_file(&path)?;
324 }
325 }
326 "blob" | "sst" => {
327 }
329 _ => {
330 if !path
331 .file_name()
332 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
333 {
334 bail!("Unexpected file in persistence directory: {:?}", path);
335 }
336 }
337 }
338 }
339 } else {
340 match path.file_stem().and_then(|s| s.to_str()) {
341 Some("CURRENT") => {
342 }
344 Some("LOG") => {
345 }
347 _ => {
348 if !path
349 .file_name()
350 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
351 {
352 bail!("Unexpected file in persistence directory: {:?}", path);
353 }
354 }
355 }
356 }
357 }
358
359 meta_files.retain(|seq| !deleted_files.contains(seq));
360 meta_files.sort_unstable();
361 let mut meta_files = self
362 .parallel_scheduler
363 .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
364 let meta_file = MetaFile::open(&self.path, seq)?;
365 Ok(meta_file)
366 })?;
367
368 let mut sst_filter = SstFilter::new();
369 for meta_file in meta_files.iter_mut().rev() {
370 sst_filter.apply_filter(meta_file);
371 }
372
373 let inner = self.inner.get_mut();
374 inner.meta_files = meta_files;
375 inner.current_sequence_number = current;
376 Ok(true)
377 }
378
379 #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
381 fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
382 let path = self.path.join(format!("{seq:08}.blob"));
383 let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
384 #[cfg(unix)]
385 mmap.advise(memmap2::Advice::Sequential)?;
386 #[cfg(unix)]
387 mmap.advise(memmap2::Advice::WillNeed)?;
388 #[cfg(target_os = "linux")]
389 mmap.advise(memmap2::Advice::DontFork)?;
390 #[cfg(target_os = "linux")]
391 mmap.advise(memmap2::Advice::Unmergeable)?;
392 let mut compressed = &mmap[..];
393 let uncompressed_length = compressed.read_u32::<BE>()?;
394
395 let buffer = decompress_into_arc(uncompressed_length, compressed, None, true)?;
396 Ok(ArcSlice::from(buffer))
397 }
398
399 pub fn is_empty(&self) -> bool {
401 self.inner.read().meta_files.is_empty()
402 }
403
404 pub fn write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
409 &self,
410 ) -> Result<WriteBatch<K, S, FAMILIES>> {
411 if self.read_only {
412 bail!("Cannot write to a read-only database");
413 }
414 if self
415 .active_write_operation
416 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
417 .is_err()
418 {
419 bail!(
420 "Another write batch or compaction is already active (Only a single write \
421 operations is allowed at a time)"
422 );
423 }
424 let current = self.inner.read().current_sequence_number;
425 Ok(WriteBatch::new(
426 self.path.clone(),
427 current,
428 self.parallel_scheduler.clone(),
429 ))
430 }
431
432 fn open_log(&self) -> Result<BufWriter<File>> {
433 if self.read_only {
434 unreachable!("Only write operations can open the log file");
435 }
436 let log_path = self.path.join("LOG");
437 let log_file = OpenOptions::new()
438 .create(true)
439 .append(true)
440 .open(log_path)?;
441 Ok(BufWriter::new(log_file))
442 }
443
444 pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static, const FAMILIES: usize>(
447 &self,
448 mut write_batch: WriteBatch<K, S, FAMILIES>,
449 ) -> Result<()> {
450 if self.read_only {
451 unreachable!("It's not possible to create a write batch for a read-only database");
452 }
453 let FinishResult {
454 sequence_number,
455 new_meta_files,
456 new_sst_files,
457 new_blob_files,
458 keys_written,
459 } = write_batch.finish()?;
460 self.commit(CommitOptions {
461 new_meta_files,
462 new_sst_files,
463 new_blob_files,
464 sst_seq_numbers_to_delete: vec![],
465 blob_seq_numbers_to_delete: vec![],
466 sequence_number,
467 keys_written,
468 })?;
469 self.active_write_operation.store(false, Ordering::Release);
470 Ok(())
471 }
472
473 fn commit(
476 &self,
477 CommitOptions {
478 mut new_meta_files,
479 mut new_sst_files,
480 mut new_blob_files,
481 mut sst_seq_numbers_to_delete,
482 mut blob_seq_numbers_to_delete,
483 sequence_number: mut seq,
484 keys_written,
485 }: CommitOptions,
486 ) -> Result<(), anyhow::Error> {
487 let time = Timestamp::now();
488
489 new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
490
491 let mut new_meta_files = self
492 .parallel_scheduler
493 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
494 file.sync_all()?;
495 let meta_file = MetaFile::open(&self.path, seq)?;
496 Ok(meta_file)
497 })?;
498
499 let mut sst_filter = SstFilter::new();
500 for meta_file in new_meta_files.iter_mut().rev() {
501 sst_filter.apply_filter(meta_file);
502 }
503
504 self.parallel_scheduler.block_in_place(|| {
505 for (_, file) in new_sst_files.iter() {
506 file.sync_all()?;
507 }
508 for (_, file) in new_blob_files.iter() {
509 file.sync_all()?;
510 }
511 anyhow::Ok(())
512 })?;
513
514 let new_meta_info = new_meta_files
515 .iter()
516 .map(|meta| {
517 let ssts = meta
518 .entries()
519 .iter()
520 .map(|entry| {
521 let seq = entry.sequence_number();
522 let range = entry.range();
523 let size = entry.size();
524 (seq, range.min_hash, range.max_hash, size)
525 })
526 .collect::<Vec<_>>();
527 (
528 meta.sequence_number(),
529 meta.family(),
530 ssts,
531 meta.obsolete_sst_files().to_vec(),
532 )
533 })
534 .collect::<Vec<_>>();
535
536 let has_delete_file;
537 let mut meta_seq_numbers_to_delete = Vec::new();
538
539 {
540 let mut inner = self.inner.write();
541 for meta_file in inner.meta_files.iter_mut().rev() {
542 sst_filter.apply_filter(meta_file);
543 }
544 inner.meta_files.append(&mut new_meta_files);
545 inner.meta_files.reverse();
547 inner.meta_files.retain(|meta| {
548 if sst_filter.apply_and_get_remove(meta) {
549 meta_seq_numbers_to_delete.push(meta.sequence_number());
550 false
551 } else {
552 true
553 }
554 });
555 inner.meta_files.reverse();
556 has_delete_file = !sst_seq_numbers_to_delete.is_empty()
557 || !blob_seq_numbers_to_delete.is_empty()
558 || !meta_seq_numbers_to_delete.is_empty();
559 if has_delete_file {
560 seq += 1;
561 }
562 inner.current_sequence_number = seq;
563 }
564
565 self.parallel_scheduler.block_in_place(|| {
566 if has_delete_file {
567 sst_seq_numbers_to_delete.sort_unstable();
568 meta_seq_numbers_to_delete.sort_unstable();
569 blob_seq_numbers_to_delete.sort_unstable();
570 let mut buf = Vec::with_capacity(
572 (sst_seq_numbers_to_delete.len()
573 + meta_seq_numbers_to_delete.len()
574 + blob_seq_numbers_to_delete.len())
575 * size_of::<u32>(),
576 );
577 for seq in sst_seq_numbers_to_delete.iter() {
578 buf.write_u32::<BE>(*seq)?;
579 }
580 for seq in meta_seq_numbers_to_delete.iter() {
581 buf.write_u32::<BE>(*seq)?;
582 }
583 for seq in blob_seq_numbers_to_delete.iter() {
584 buf.write_u32::<BE>(*seq)?;
585 }
586 let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
587 file.write_all(&buf)?;
588 file.sync_all()?;
589 }
590
591 let mut current_file = OpenOptions::new()
592 .write(true)
593 .truncate(false)
594 .read(false)
595 .open(self.path.join("CURRENT"))?;
596 current_file.write_u32::<BE>(seq)?;
597 current_file.sync_all()?;
598
599 for seq in sst_seq_numbers_to_delete.iter() {
600 fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
601 }
602 for seq in meta_seq_numbers_to_delete.iter() {
603 fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
604 }
605 for seq in blob_seq_numbers_to_delete.iter() {
606 fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
607 }
608
609 {
610 let mut log = self.open_log()?;
611 writeln!(log, "Time {time}")?;
612 let span = time.until(Timestamp::now())?;
613 writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
614 for (seq, family, ssts, obsolete) in new_meta_info {
615 writeln!(log, "{seq:08} META family:{family}",)?;
616 for (seq, min, max, size) in ssts {
617 writeln!(
618 log,
619 " {seq:08} SST {min:016x}-{max:016x} {} MiB",
620 size / 1024 / 1024
621 )?;
622 }
623 for seq in obsolete {
624 writeln!(log, " {seq:08} OBSOLETE SST")?;
625 }
626 }
627 new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
628 for (seq, _) in new_sst_files.iter() {
629 writeln!(log, "{seq:08} NEW SST")?;
630 }
631 new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
632 for (seq, _) in new_blob_files.iter() {
633 writeln!(log, "{seq:08} NEW BLOB")?;
634 }
635 for seq in sst_seq_numbers_to_delete.iter() {
636 writeln!(log, "{seq:08} SST DELETED")?;
637 }
638 for seq in meta_seq_numbers_to_delete.iter() {
639 writeln!(log, "{seq:08} META DELETED")?;
640 }
641 for seq in blob_seq_numbers_to_delete.iter() {
642 writeln!(log, "{seq:08} BLOB DELETED")?;
643 }
644 }
645 anyhow::Ok(())
646 })?;
647 Ok(())
648 }
649
650 pub fn full_compact(&self) -> Result<()> {
653 self.compact(&CompactConfig {
654 min_merge_count: 2,
655 optimal_merge_count: usize::MAX,
656 max_merge_count: usize::MAX,
657 max_merge_bytes: u64::MAX,
658 min_merge_duplication_bytes: 0,
659 optimal_merge_duplication_bytes: u64::MAX,
660 max_merge_segment_count: usize::MAX,
661 })?;
662 Ok(())
663 }
664
665 pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
670 if self.read_only {
671 bail!("Compaction is not allowed on a read only database");
672 }
673 let _span = tracing::info_span!("compact database").entered();
674 if self
675 .active_write_operation
676 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
677 .is_err()
678 {
679 bail!(
680 "Another write batch or compaction is already active (Only a single write \
681 operations is allowed at a time)"
682 );
683 }
684
685 let mut sequence_number;
686 let mut new_meta_files = Vec::new();
687 let mut new_sst_files = Vec::new();
688 let mut sst_seq_numbers_to_delete = Vec::new();
689 let mut blob_seq_numbers_to_delete = Vec::new();
690 let mut keys_written = 0;
691
692 {
693 let inner = self.inner.read();
694 sequence_number = AtomicU32::new(inner.current_sequence_number);
695 self.compact_internal(
696 &inner.meta_files,
697 &sequence_number,
698 &mut new_meta_files,
699 &mut new_sst_files,
700 &mut sst_seq_numbers_to_delete,
701 &mut blob_seq_numbers_to_delete,
702 &mut keys_written,
703 compact_config,
704 )
705 .context("Failed to compact database")?;
706 }
707
708 let has_changes = !new_meta_files.is_empty();
709 if has_changes {
710 self.commit(CommitOptions {
711 new_meta_files,
712 new_sst_files,
713 new_blob_files: Vec::new(),
714 sst_seq_numbers_to_delete,
715 blob_seq_numbers_to_delete,
716 sequence_number: *sequence_number.get_mut(),
717 keys_written,
718 })
719 .context("Failed to commit the database compaction")?;
720 }
721
722 self.active_write_operation.store(false, Ordering::Release);
723
724 Ok(has_changes)
725 }
726
727 fn compact_internal(
729 &self,
730 meta_files: &[MetaFile],
731 sequence_number: &AtomicU32,
732 new_meta_files: &mut Vec<(u32, File)>,
733 new_sst_files: &mut Vec<(u32, File)>,
734 sst_seq_numbers_to_delete: &mut Vec<u32>,
735 blob_seq_numbers_to_delete: &mut Vec<u32>,
736 keys_written: &mut u64,
737 compact_config: &CompactConfig,
738 ) -> Result<()> {
739 if meta_files.is_empty() {
740 return Ok(());
741 }
742
743 struct SstWithRange {
744 meta_index: usize,
745 index_in_meta: u32,
746 seq: u32,
747 range: StaticSortedFileRange,
748 size: u64,
749 }
750
751 impl Compactable for SstWithRange {
752 fn range(&self) -> RangeInclusive<u64> {
753 self.range.min_hash..=self.range.max_hash
754 }
755
756 fn size(&self) -> u64 {
757 self.size
758 }
759 }
760
761 let ssts_with_ranges = meta_files
762 .iter()
763 .enumerate()
764 .flat_map(|(meta_index, meta)| {
765 meta.entries()
766 .iter()
767 .enumerate()
768 .map(move |(index_in_meta, entry)| SstWithRange {
769 meta_index,
770 index_in_meta: index_in_meta as u32,
771 seq: entry.sequence_number(),
772 range: entry.range(),
773 size: entry.size(),
774 })
775 })
776 .collect::<Vec<_>>();
777
778 let families = ssts_with_ranges
779 .iter()
780 .map(|s| s.range.family)
781 .max()
782 .unwrap() as usize
783 + 1;
784
785 let mut sst_by_family = Vec::with_capacity(families);
786 sst_by_family.resize_with(families, Vec::new);
787
788 for sst in ssts_with_ranges {
789 sst_by_family[sst.range.family as usize].push(sst);
790 }
791
792 let key_block_cache = &self.key_block_cache;
793 let value_block_cache = &self.value_block_cache;
794 let path = &self.path;
795
796 let log_mutex = Mutex::new(());
797
798 struct PartialResultPerFamily {
799 new_meta_file: Option<(u32, File)>,
800 new_sst_files: Vec<(u32, File)>,
801 sst_seq_numbers_to_delete: Vec<u32>,
802 blob_seq_numbers_to_delete: Vec<u32>,
803 keys_written: u64,
804 }
805
806 let mut compact_config = compact_config.clone();
807 let merge_jobs = sst_by_family
808 .into_iter()
809 .enumerate()
810 .filter_map(|(family, ssts_with_ranges)| {
811 if compact_config.max_merge_segment_count == 0 {
812 return None;
813 }
814 let (merge_jobs, real_merge_job_size) =
815 get_merge_segments(&ssts_with_ranges, &compact_config);
816 compact_config.max_merge_segment_count -= real_merge_job_size;
817 Some((family, ssts_with_ranges, merge_jobs))
818 })
819 .collect::<Vec<_>>();
820
821 let result = self
822 .parallel_scheduler
823 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
824 merge_jobs,
825 |(family, ssts_with_ranges, merge_jobs)| {
826 let family = family as u32;
827
828 if merge_jobs.is_empty() {
829 return Ok(PartialResultPerFamily {
830 new_meta_file: None,
831 new_sst_files: Vec::new(),
832 sst_seq_numbers_to_delete: Vec::new(),
833 blob_seq_numbers_to_delete: Vec::new(),
834 keys_written: 0,
835 });
836 }
837
838 self.parallel_scheduler.block_in_place(|| {
839 let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
840 let guard = log_mutex.lock();
841 let mut log = self.open_log()?;
842 writeln!(
843 log,
844 "Compaction for family {family} (coverage: {}, overlap: {}, \
845 duplication: {} / {} MiB):",
846 metrics.coverage,
847 metrics.overlap,
848 metrics.duplication,
849 metrics.duplicated_size / 1024 / 1024
850 )?;
851 for job in merge_jobs.iter() {
852 writeln!(log, " merge")?;
853 for i in job.iter() {
854 let seq = ssts_with_ranges[*i].seq;
855 let (min, max) = ssts_with_ranges[*i].range().into_inner();
856 writeln!(log, " {seq:08} {min:016x}-{max:016x}")?;
857 }
858 }
859 drop(guard);
860 anyhow::Ok(())
861 })?;
862
863 let sst_seq_numbers_to_delete = merge_jobs
865 .iter()
866 .filter(|l| l.len() > 1)
867 .flat_map(|l| l.iter().copied())
868 .map(|index| ssts_with_ranges[index].seq)
869 .collect::<Vec<_>>();
870
871 let span = tracing::trace_span!("merge files");
873 enum PartialMergeResult<'l> {
874 Merged {
875 new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
876 blob_seq_numbers_to_delete: Vec<u32>,
877 keys_written: u64,
878 },
879 Move {
880 seq: u32,
881 meta: StaticSortedFileBuilderMeta<'l>,
882 },
883 }
884 let merge_result = self
885 .parallel_scheduler
886 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
887 let _span = span.clone().entered();
888 if indices.len() == 1 {
889 let index = indices[0];
891 let meta_index = ssts_with_ranges[index].meta_index;
892 let index_in_meta = ssts_with_ranges[index].index_in_meta;
893 let meta_file = &meta_files[meta_index];
894 let entry = meta_file.entry(index_in_meta);
895 let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
896 let meta = StaticSortedFileBuilderMeta {
897 min_hash: entry.min_hash(),
898 max_hash: entry.max_hash(),
899 amqf,
900 key_compression_dictionary_length: entry
901 .key_compression_dictionary_length(),
902 block_count: entry.block_count(),
903 size: entry.size(),
904 entries: 0,
905 };
906 return Ok(PartialMergeResult::Move {
907 seq: entry.sequence_number(),
908 meta,
909 });
910 }
911
912 fn create_sst_file<'l, S: ParallelScheduler>(
913 parallel_scheduler: &S,
914 entries: &[LookupEntry<'l>],
915 total_key_size: usize,
916 path: &Path,
917 seq: u32,
918 ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
919 {
920 let _span = tracing::trace_span!("write merged sst file").entered();
921 let (meta, file) = parallel_scheduler.block_in_place(|| {
922 write_static_stored_file(
923 entries,
924 total_key_size,
925 &path.join(format!("{seq:08}.sst")),
926 )
927 })?;
928 Ok((seq, file, meta))
929 }
930
931 let mut new_sst_files = Vec::new();
932
933 let iters = indices
935 .iter()
936 .map(|&index| {
937 let meta_index = ssts_with_ranges[index].meta_index;
938 let index_in_meta = ssts_with_ranges[index].index_in_meta;
939 let meta = &meta_files[meta_index];
940 meta.entry(index_in_meta)
941 .sst(meta)?
942 .iter(key_block_cache, value_block_cache)
943 })
944 .collect::<Result<Vec<_>>>()?;
945
946 let iter = MergeIter::new(iters.into_iter())?;
947
948 let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
951
952 let mut keys_written = 0;
953
954 let mut total_key_size = 0;
955 let mut total_value_size = 0;
956 let mut current: Option<LookupEntry<'_>> = None;
957 let mut entries = Vec::new();
958 let mut last_entries = Vec::new();
959 let mut last_entries_total_key_size = 0;
960 for entry in iter {
961 let entry = entry?;
962
963 if let Some(current) = current.take() {
965 if current.key != entry.key {
966 let key_size = current.key.len();
967 let value_size = current.value.uncompressed_size_in_sst();
968 total_key_size += key_size;
969 total_value_size += value_size;
970
971 if total_key_size + total_value_size
972 > DATA_THRESHOLD_PER_COMPACTED_FILE
973 || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE
974 {
975 let selected_total_key_size =
976 last_entries_total_key_size;
977 swap(&mut entries, &mut last_entries);
978 last_entries_total_key_size = total_key_size - key_size;
979 total_key_size = key_size;
980 total_value_size = value_size;
981
982 if !entries.is_empty() {
983 let seq = sequence_number
984 .fetch_add(1, Ordering::SeqCst)
985 + 1;
986
987 keys_written += entries.len() as u64;
988 new_sst_files.push(create_sst_file(
989 &self.parallel_scheduler,
990 &entries,
991 selected_total_key_size,
992 path,
993 seq,
994 )?);
995
996 entries.clear();
997 }
998 }
999
1000 entries.push(current);
1001 } else {
1002 }
1004 }
1005 current = Some(entry);
1006 }
1007 if let Some(entry) = current {
1008 total_key_size += entry.key.len();
1009 entries.push(entry);
1012 }
1013
1014 if last_entries.is_empty() && !entries.is_empty() {
1016 let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1017
1018 keys_written += entries.len() as u64;
1019 new_sst_files.push(create_sst_file(
1020 &self.parallel_scheduler,
1021 &entries,
1022 total_key_size,
1023 path,
1024 seq,
1025 )?);
1026 } else
1027 if !last_entries.is_empty() {
1031 last_entries.append(&mut entries);
1032
1033 last_entries_total_key_size += total_key_size;
1034
1035 let (part1, part2) = last_entries.split_at(last_entries.len() / 2);
1036
1037 let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1038 let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1039
1040 keys_written += part1.len() as u64;
1041 new_sst_files.push(create_sst_file(
1042 &self.parallel_scheduler,
1043 part1,
1044 last_entries_total_key_size / 2,
1046 path,
1047 seq1,
1048 )?);
1049
1050 keys_written += part2.len() as u64;
1051 new_sst_files.push(create_sst_file(
1052 &self.parallel_scheduler,
1053 part2,
1054 last_entries_total_key_size / 2,
1055 path,
1056 seq2,
1057 )?);
1058 }
1059 Ok(PartialMergeResult::Merged {
1060 new_sst_files,
1061 blob_seq_numbers_to_delete,
1062 keys_written,
1063 })
1064 })
1065 .with_context(|| {
1066 format!("Failed to merge database files for family {family}")
1067 })?;
1068
1069 let Some((sst_files_len, blob_delete_len)) = merge_result
1070 .iter()
1071 .map(|r| {
1072 if let PartialMergeResult::Merged {
1073 new_sst_files,
1074 blob_seq_numbers_to_delete,
1075 keys_written: _,
1076 } = r
1077 {
1078 (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1079 } else {
1080 (0, 0)
1081 }
1082 })
1083 .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1084 else {
1085 unreachable!()
1086 };
1087
1088 let mut new_sst_files = Vec::with_capacity(sst_files_len);
1089 let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1090
1091 let mut meta_file_builder = MetaFileBuilder::new(family);
1092
1093 let mut keys_written = 0;
1094 for result in merge_result {
1095 match result {
1096 PartialMergeResult::Merged {
1097 new_sst_files: merged_new_sst_files,
1098 blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1099 keys_written: merged_keys_written,
1100 } => {
1101 for (seq, file, meta) in merged_new_sst_files {
1102 meta_file_builder.add(seq, meta);
1103 new_sst_files.push((seq, file));
1104 }
1105 blob_seq_numbers_to_delete
1106 .extend(merged_blob_seq_numbers_to_delete);
1107 keys_written += merged_keys_written;
1108 }
1109 PartialMergeResult::Move { seq, meta } => {
1110 meta_file_builder.add(seq, meta);
1111 }
1112 }
1113 }
1114
1115 for &seq in sst_seq_numbers_to_delete.iter() {
1116 meta_file_builder.add_obsolete_sst_file(seq);
1117 }
1118
1119 let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1120 let meta_file = {
1121 let _span = tracing::trace_span!("write meta file").entered();
1122 self.parallel_scheduler
1123 .block_in_place(|| meta_file_builder.write(&self.path, seq))?
1124 };
1125
1126 Ok(PartialResultPerFamily {
1127 new_meta_file: Some((seq, meta_file)),
1128 new_sst_files,
1129 sst_seq_numbers_to_delete,
1130 blob_seq_numbers_to_delete,
1131 keys_written,
1132 })
1133 },
1134 )?;
1135
1136 for PartialResultPerFamily {
1137 new_meta_file: inner_new_meta_file,
1138 new_sst_files: mut inner_new_sst_files,
1139 sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1140 blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1141 keys_written: inner_keys_written,
1142 } in result
1143 {
1144 new_meta_files.extend(inner_new_meta_file);
1145 new_sst_files.append(&mut inner_new_sst_files);
1146 sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1147 blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1148 *keys_written += inner_keys_written;
1149 }
1150
1151 Ok(())
1152 }
1153
1154 pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1157 let hash = hash_key(key);
1158 let inner = self.inner.read();
1159 for meta in inner.meta_files.iter().rev() {
1160 match meta.lookup(
1161 family as u32,
1162 hash,
1163 key,
1164 &self.amqf_cache,
1165 &self.key_block_cache,
1166 &self.value_block_cache,
1167 )? {
1168 MetaLookupResult::FamilyMiss => {
1169 #[cfg(feature = "stats")]
1170 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1171 }
1172 MetaLookupResult::RangeMiss => {
1173 #[cfg(feature = "stats")]
1174 self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1175 }
1176 MetaLookupResult::QuickFilterMiss => {
1177 #[cfg(feature = "stats")]
1178 self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1179 }
1180 MetaLookupResult::SstLookup(result) => match result {
1181 SstLookupResult::Found(result) => match result {
1182 LookupValue::Deleted => {
1183 #[cfg(feature = "stats")]
1184 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1185 return Ok(None);
1186 }
1187 LookupValue::Slice { value } => {
1188 #[cfg(feature = "stats")]
1189 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1190 return Ok(Some(value));
1191 }
1192 LookupValue::Blob { sequence_number } => {
1193 #[cfg(feature = "stats")]
1194 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1195 let blob = self.read_blob(sequence_number)?;
1196 return Ok(Some(blob));
1197 }
1198 },
1199 SstLookupResult::NotFound => {
1200 #[cfg(feature = "stats")]
1201 self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1202 }
1203 },
1204 }
1205 }
1206 #[cfg(feature = "stats")]
1207 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1208 Ok(None)
1209 }
1210
1211 #[cfg(feature = "stats")]
1213 pub fn statistics(&self) -> Statistics {
1214 let inner = self.inner.read();
1215 Statistics {
1216 meta_files: inner.meta_files.len(),
1217 sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1218 key_block_cache: CacheStatistics::new(&self.key_block_cache),
1219 value_block_cache: CacheStatistics::new(&self.value_block_cache),
1220 amqf_cache: CacheStatistics::new(&self.amqf_cache),
1221 hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1222 + self.stats.hits_small.load(Ordering::Relaxed)
1223 + self.stats.hits_blob.load(Ordering::Relaxed),
1224 misses: self.stats.miss_global.load(Ordering::Relaxed),
1225 miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1226 miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1227 miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1228 miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1229 }
1230 }
1231
1232 pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1233 Ok(self
1234 .inner
1235 .read()
1236 .meta_files
1237 .iter()
1238 .rev()
1239 .map(|meta_file| {
1240 let entries = meta_file
1241 .entries()
1242 .iter()
1243 .map(|entry| {
1244 let amqf = entry.raw_amqf(meta_file.amqf_data());
1245 MetaFileEntryInfo {
1246 sequence_number: entry.sequence_number(),
1247 min_hash: entry.min_hash(),
1248 max_hash: entry.max_hash(),
1249 sst_size: entry.size(),
1250 amqf_size: entry.amqf_size(),
1251 amqf_entries: amqf.len(),
1252 key_compression_dictionary_size: entry
1253 .key_compression_dictionary_length(),
1254 block_count: entry.block_count(),
1255 }
1256 })
1257 .collect();
1258 MetaFileInfo {
1259 sequence_number: meta_file.sequence_number(),
1260 family: meta_file.family(),
1261 obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1262 entries,
1263 }
1264 })
1265 .collect())
1266 }
1267
1268 pub fn shutdown(&self) -> Result<()> {
1270 #[cfg(feature = "print_stats")]
1271 println!("{:#?}", self.statistics());
1272 Ok(())
1273 }
1274}
1275
1276pub struct MetaFileInfo {
1277 pub sequence_number: u32,
1278 pub family: u32,
1279 pub obsolete_sst_files: Vec<u32>,
1280 pub entries: Vec<MetaFileEntryInfo>,
1281}
1282
1283pub struct MetaFileEntryInfo {
1284 pub sequence_number: u32,
1285 pub min_hash: u64,
1286 pub max_hash: u64,
1287 pub amqf_size: u32,
1288 pub amqf_entries: usize,
1289 pub sst_size: u64,
1290 pub key_compression_dictionary_size: u16,
1291 pub block_count: u16,
1292}