1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{self, File, OpenOptions, ReadDir},
5 io::{BufWriter, Write},
6 mem::{swap, take},
7 ops::RangeInclusive,
8 path::{Path, PathBuf},
9 sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use dashmap::DashSet;
15use jiff::Timestamp;
16use memmap2::Mmap;
17use nohash_hasher::BuildNoHashHasher;
18use parking_lot::{Mutex, RwLock};
19use smallvec::SmallVec;
20
21pub use crate::compaction::selector::CompactConfig;
22use crate::{
23 QueryKey,
24 arc_slice::ArcSlice,
25 compaction::selector::{Compactable, get_merge_segments},
26 compression::decompress_into_arc,
27 constants::{
28 AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
29 KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
30 VALUE_BLOCK_CACHE_SIZE,
31 },
32 key::{StoreKey, hash_key},
33 lookup_entry::{LookupEntry, LookupValue},
34 merge_iter::MergeIter,
35 meta_file::{AmqfCache, MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
36 meta_file_builder::MetaFileBuilder,
37 parallel_scheduler::ParallelScheduler,
38 sst_filter::SstFilter,
39 static_sorted_file::{BlockCache, SstLookupResult},
40 static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
41 write_batch::{FinishResult, WriteBatch},
42};
43
44#[cfg(feature = "stats")]
45#[derive(Debug)]
46pub struct CacheStatistics {
47 pub hit_rate: f32,
48 pub fill: f32,
49 pub items: usize,
50 pub size: u64,
51 pub hits: u64,
52 pub misses: u64,
53}
54
55#[cfg(feature = "stats")]
56impl CacheStatistics {
57 fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
58 where
59 Key: Eq + std::hash::Hash,
60 Val: Clone,
61 We: quick_cache::Weighter<Key, Val> + Clone,
62 B: std::hash::BuildHasher + Clone,
63 L: quick_cache::Lifecycle<Key, Val> + Clone,
64 {
65 let size = cache.weight();
66 let hits = cache.hits();
67 let misses = cache.misses();
68 Self {
69 hit_rate: hits as f32 / (hits + misses) as f32,
70 fill: size as f32 / cache.capacity() as f32,
71 items: cache.len(),
72 size,
73 hits,
74 misses,
75 }
76 }
77}
78
79#[cfg(feature = "stats")]
80#[derive(Debug)]
81pub struct Statistics {
82 pub meta_files: usize,
83 pub sst_files: usize,
84 pub key_block_cache: CacheStatistics,
85 pub value_block_cache: CacheStatistics,
86 pub amqf_cache: CacheStatistics,
87 pub hits: u64,
88 pub misses: u64,
89 pub miss_family: u64,
90 pub miss_range: u64,
91 pub miss_amqf: u64,
92 pub miss_key: u64,
93}
94
95#[cfg(feature = "stats")]
96#[derive(Default)]
97struct TrackedStats {
98 hits_deleted: std::sync::atomic::AtomicU64,
99 hits_small: std::sync::atomic::AtomicU64,
100 hits_blob: std::sync::atomic::AtomicU64,
101 miss_family: std::sync::atomic::AtomicU64,
102 miss_range: std::sync::atomic::AtomicU64,
103 miss_amqf: std::sync::atomic::AtomicU64,
104 miss_key: std::sync::atomic::AtomicU64,
105 miss_global: std::sync::atomic::AtomicU64,
106}
107
108pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
111 parallel_scheduler: S,
112 path: PathBuf,
114 read_only: bool,
117 inner: RwLock<Inner<FAMILIES>>,
119 active_write_operation: AtomicBool,
122 amqf_cache: AmqfCache,
124 key_block_cache: BlockCache,
126 value_block_cache: BlockCache,
128 #[cfg(feature = "stats")]
130 stats: TrackedStats,
131}
132
133struct Inner<const FAMILIES: usize> {
135 meta_files: Vec<MetaFile>,
137 current_sequence_number: u32,
139 accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
143}
144
145pub struct CommitOptions {
146 new_meta_files: Vec<(u32, File)>,
147 new_sst_files: Vec<(u32, File)>,
148 new_blob_files: Vec<(u32, File)>,
149 sst_seq_numbers_to_delete: Vec<u32>,
150 blob_seq_numbers_to_delete: Vec<u32>,
151 sequence_number: u32,
152 keys_written: u64,
153}
154
155impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
156 pub fn open(path: PathBuf) -> Result<Self> {
161 Self::open_with_parallel_scheduler(path, Default::default())
162 }
163
164 pub fn open_read_only(path: PathBuf) -> Result<Self> {
167 Self::open_read_only_with_parallel_scheduler(path, Default::default())
168 }
169}
170
171impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
172 fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self {
173 Self {
174 parallel_scheduler,
175 path,
176 read_only,
177 inner: RwLock::new(Inner {
178 meta_files: Vec::new(),
179 current_sequence_number: 0,
180 accessed_key_hashes: [(); FAMILIES]
181 .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
182 }),
183 active_write_operation: AtomicBool::new(false),
184 amqf_cache: AmqfCache::with(
185 AMQF_CACHE_SIZE as usize / AMQF_AVG_SIZE,
186 AMQF_CACHE_SIZE,
187 Default::default(),
188 Default::default(),
189 Default::default(),
190 ),
191 key_block_cache: BlockCache::with(
192 KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
193 KEY_BLOCK_CACHE_SIZE,
194 Default::default(),
195 Default::default(),
196 Default::default(),
197 ),
198 value_block_cache: BlockCache::with(
199 VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
200 VALUE_BLOCK_CACHE_SIZE,
201 Default::default(),
202 Default::default(),
203 Default::default(),
204 ),
205 #[cfg(feature = "stats")]
206 stats: TrackedStats::default(),
207 }
208 }
209
210 pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
215 let mut db = Self::new(path, false, parallel_scheduler);
216 db.open_directory(false)?;
217 Ok(db)
218 }
219
220 pub fn open_read_only_with_parallel_scheduler(
223 path: PathBuf,
224 parallel_scheduler: S,
225 ) -> Result<Self> {
226 let mut db = Self::new(path, true, parallel_scheduler);
227 db.open_directory(false)?;
228 Ok(db)
229 }
230
231 fn open_directory(&mut self, read_only: bool) -> Result<()> {
233 match fs::read_dir(&self.path) {
234 Ok(entries) => {
235 if !self
236 .load_directory(entries, read_only)
237 .context("Loading persistence directory failed")?
238 {
239 if read_only {
240 bail!("Failed to open database");
241 }
242 self.init_directory()
243 .context("Initializing persistence directory failed")?;
244 }
245 Ok(())
246 }
247 Err(e) => {
248 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
249 self.create_and_init_directory()
250 .context("Creating and initializing persistence directory failed")?;
251 Ok(())
252 } else {
253 Err(e).context("Failed to open database")
254 }
255 }
256 }
257 }
258
259 fn create_and_init_directory(&mut self) -> Result<()> {
261 fs::create_dir_all(&self.path)?;
262 self.init_directory()
263 }
264
265 fn init_directory(&mut self) -> Result<()> {
267 let mut current = File::create(self.path.join("CURRENT"))?;
268 current.write_u32::<BE>(0)?;
269 current.flush()?;
270 Ok(())
271 }
272
273 fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
275 let mut meta_files = Vec::new();
276 let mut current_file = match File::open(self.path.join("CURRENT")) {
277 Ok(file) => file,
278 Err(e) => {
279 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
280 return Ok(false);
281 } else {
282 return Err(e).context("Failed to open CURRENT file");
283 }
284 }
285 };
286 let current = current_file.read_u32::<BE>()?;
287 drop(current_file);
288
289 let mut deleted_files = HashSet::new();
290 for entry in entries {
291 let entry = entry?;
292 let path = entry.path();
293 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
294 let seq: u32 = path
295 .file_stem()
296 .context("File has no file stem")?
297 .to_str()
298 .context("File stem is not valid utf-8")?
299 .parse()?;
300 if deleted_files.contains(&seq) {
301 continue;
302 }
303 if seq > current {
304 if !read_only {
305 fs::remove_file(&path)?;
306 }
307 } else {
308 match ext {
309 "meta" => {
310 meta_files.push(seq);
311 }
312 "del" => {
313 let mut content = &*fs::read(&path)?;
314 let mut no_existing_files = true;
315 while !content.is_empty() {
316 let seq = content.read_u32::<BE>()?;
317 deleted_files.insert(seq);
318 if !read_only {
319 let sst_file = self.path.join(format!("{seq:08}.sst"));
321 let meta_file = self.path.join(format!("{seq:08}.meta"));
322 let blob_file = self.path.join(format!("{seq:08}.blob"));
323 for path in [sst_file, meta_file, blob_file] {
324 if fs::exists(&path)? {
325 fs::remove_file(path)?;
326 no_existing_files = false;
327 }
328 }
329 }
330 }
331 if !read_only && no_existing_files {
332 fs::remove_file(&path)?;
333 }
334 }
335 "blob" | "sst" => {
336 }
338 _ => {
339 if !path
340 .file_name()
341 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
342 {
343 bail!("Unexpected file in persistence directory: {:?}", path);
344 }
345 }
346 }
347 }
348 } else {
349 match path.file_stem().and_then(|s| s.to_str()) {
350 Some("CURRENT") => {
351 }
353 Some("LOG") => {
354 }
356 _ => {
357 if !path
358 .file_name()
359 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
360 {
361 bail!("Unexpected file in persistence directory: {:?}", path);
362 }
363 }
364 }
365 }
366 }
367
368 meta_files.retain(|seq| !deleted_files.contains(seq));
369 meta_files.sort_unstable();
370 let mut meta_files = self
371 .parallel_scheduler
372 .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
373 let meta_file = MetaFile::open(&self.path, seq)?;
374 Ok(meta_file)
375 })?;
376
377 let mut sst_filter = SstFilter::new();
378 for meta_file in meta_files.iter_mut().rev() {
379 sst_filter.apply_filter(meta_file);
380 }
381
382 let inner = self.inner.get_mut();
383 inner.meta_files = meta_files;
384 inner.current_sequence_number = current;
385 Ok(true)
386 }
387
388 #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
390 fn read_blob(&self, seq: u32) -> Result<ArcSlice<u8>> {
391 let path = self.path.join(format!("{seq:08}.blob"));
392 let mmap = unsafe { Mmap::map(&File::open(&path)?)? };
393 #[cfg(unix)]
394 mmap.advise(memmap2::Advice::Sequential)?;
395 #[cfg(unix)]
396 mmap.advise(memmap2::Advice::WillNeed)?;
397 #[cfg(target_os = "linux")]
398 mmap.advise(memmap2::Advice::DontFork)?;
399 #[cfg(target_os = "linux")]
400 mmap.advise(memmap2::Advice::Unmergeable)?;
401 let mut compressed = &mmap[..];
402 let uncompressed_length = compressed.read_u32::<BE>()?;
403
404 let buffer = decompress_into_arc(uncompressed_length, compressed, None, true)?;
405 Ok(ArcSlice::from(buffer))
406 }
407
408 pub fn is_empty(&self) -> bool {
410 self.inner.read().meta_files.is_empty()
411 }
412
413 pub fn write_batch<K: StoreKey + Send + Sync + 'static>(
418 &self,
419 ) -> Result<WriteBatch<K, S, FAMILIES>> {
420 if self.read_only {
421 bail!("Cannot write to a read-only database");
422 }
423 if self
424 .active_write_operation
425 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
426 .is_err()
427 {
428 bail!(
429 "Another write batch or compaction is already active (Only a single write \
430 operations is allowed at a time)"
431 );
432 }
433 let current = self.inner.read().current_sequence_number;
434 Ok(WriteBatch::new(
435 self.path.clone(),
436 current,
437 self.parallel_scheduler.clone(),
438 ))
439 }
440
441 fn open_log(&self) -> Result<BufWriter<File>> {
442 if self.read_only {
443 unreachable!("Only write operations can open the log file");
444 }
445 let log_path = self.path.join("LOG");
446 let log_file = OpenOptions::new()
447 .create(true)
448 .append(true)
449 .open(log_path)?;
450 Ok(BufWriter::new(log_file))
451 }
452
453 pub fn commit_write_batch<K: StoreKey + Send + Sync + 'static>(
456 &self,
457 mut write_batch: WriteBatch<K, S, FAMILIES>,
458 ) -> Result<()> {
459 if self.read_only {
460 unreachable!("It's not possible to create a write batch for a read-only database");
461 }
462 let FinishResult {
463 sequence_number,
464 new_meta_files,
465 new_sst_files,
466 new_blob_files,
467 keys_written,
468 } = write_batch.finish(|family| {
469 let inner = self.inner.read();
470 let set = &inner.accessed_key_hashes[family as usize];
471 let initial_capacity = set.len() * 20 / 19;
474 let mut amqf =
475 qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
476 .unwrap();
477 set.retain(|hash| {
480 amqf.insert_fingerprint(false, *hash)
484 .expect("Failed to insert fingerprint");
485 false
486 });
487 amqf
488 })?;
489 self.commit(CommitOptions {
490 new_meta_files,
491 new_sst_files,
492 new_blob_files,
493 sst_seq_numbers_to_delete: vec![],
494 blob_seq_numbers_to_delete: vec![],
495 sequence_number,
496 keys_written,
497 })?;
498 self.active_write_operation.store(false, Ordering::Release);
499 Ok(())
500 }
501
502 fn commit(
505 &self,
506 CommitOptions {
507 mut new_meta_files,
508 new_sst_files,
509 mut new_blob_files,
510 mut sst_seq_numbers_to_delete,
511 mut blob_seq_numbers_to_delete,
512 sequence_number: mut seq,
513 keys_written,
514 }: CommitOptions,
515 ) -> Result<(), anyhow::Error> {
516 let time = Timestamp::now();
517
518 new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
519
520 let sync_span = tracing::info_span!("sync new files").entered();
521 let mut new_meta_files = self
522 .parallel_scheduler
523 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
524 file.sync_all()?;
525 let meta_file = MetaFile::open(&self.path, seq)?;
526 Ok(meta_file)
527 })?;
528
529 let mut sst_filter = SstFilter::new();
530 for meta_file in new_meta_files.iter_mut().rev() {
531 sst_filter.apply_filter(meta_file);
532 }
533
534 self.parallel_scheduler.block_in_place(|| {
535 for (_, file) in new_sst_files.iter() {
536 file.sync_all()?;
537 }
538 for (_, file) in new_blob_files.iter() {
539 file.sync_all()?;
540 }
541 anyhow::Ok(())
542 })?;
543 drop(sync_span);
544
545 let new_meta_info = new_meta_files
546 .iter()
547 .map(|meta| {
548 let ssts = meta
549 .entries()
550 .iter()
551 .map(|entry| {
552 let seq = entry.sequence_number();
553 let range = entry.range();
554 let size = entry.size();
555 let flags = entry.flags();
556 (seq, range.min_hash, range.max_hash, size, flags)
557 })
558 .collect::<Vec<_>>();
559 (
560 meta.sequence_number(),
561 meta.family(),
562 ssts,
563 meta.obsolete_sst_files().to_vec(),
564 )
565 })
566 .collect::<Vec<_>>();
567
568 let has_delete_file;
569 let mut meta_seq_numbers_to_delete = Vec::new();
570
571 {
572 let mut inner = self.inner.write();
573 for meta_file in inner.meta_files.iter_mut().rev() {
574 sst_filter.apply_filter(meta_file);
575 }
576 inner.meta_files.append(&mut new_meta_files);
577 inner.meta_files.reverse();
579 inner.meta_files.retain(|meta| {
580 if sst_filter.apply_and_get_remove(meta) {
581 meta_seq_numbers_to_delete.push(meta.sequence_number());
582 false
583 } else {
584 true
585 }
586 });
587 inner.meta_files.reverse();
588 has_delete_file = !sst_seq_numbers_to_delete.is_empty()
589 || !blob_seq_numbers_to_delete.is_empty()
590 || !meta_seq_numbers_to_delete.is_empty();
591 if has_delete_file {
592 seq += 1;
593 }
594 inner.current_sequence_number = seq;
595 }
596
597 self.parallel_scheduler.block_in_place(|| {
598 if has_delete_file {
599 sst_seq_numbers_to_delete.sort_unstable();
600 meta_seq_numbers_to_delete.sort_unstable();
601 blob_seq_numbers_to_delete.sort_unstable();
602 let mut buf = Vec::with_capacity(
604 (sst_seq_numbers_to_delete.len()
605 + meta_seq_numbers_to_delete.len()
606 + blob_seq_numbers_to_delete.len())
607 * size_of::<u32>(),
608 );
609 for seq in sst_seq_numbers_to_delete.iter() {
610 buf.write_u32::<BE>(*seq)?;
611 }
612 for seq in meta_seq_numbers_to_delete.iter() {
613 buf.write_u32::<BE>(*seq)?;
614 }
615 for seq in blob_seq_numbers_to_delete.iter() {
616 buf.write_u32::<BE>(*seq)?;
617 }
618 let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
619 file.write_all(&buf)?;
620 file.sync_all()?;
621 }
622
623 let mut current_file = OpenOptions::new()
624 .write(true)
625 .truncate(false)
626 .read(false)
627 .open(self.path.join("CURRENT"))?;
628 current_file.write_u32::<BE>(seq)?;
629 current_file.sync_all()?;
630
631 for seq in sst_seq_numbers_to_delete.iter() {
632 fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
633 }
634 for seq in meta_seq_numbers_to_delete.iter() {
635 fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
636 }
637 for seq in blob_seq_numbers_to_delete.iter() {
638 fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
639 }
640
641 {
642 let mut log = self.open_log()?;
643 writeln!(log, "Time {time}")?;
644 let span = time.until(Timestamp::now())?;
645 writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
646 writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?;
647 for (meta_seq, family, ssts, obsolete) in new_meta_info {
648 for (seq, min, max, size, flags) in ssts {
649 writeln!(
650 log,
651 "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB, {})",
652 range_to_str(min, max),
653 size / 1024 / 1024,
654 flags
655 )?;
656 }
657 for obsolete in obsolete.chunks(15) {
658 write!(log, "{family:3} | {meta_seq:08} |")?;
659 for seq in obsolete {
660 write!(log, " {seq:08}")?;
661 }
662 writeln!(log, " OBSOLETE SST")?;
663 }
664 }
665
666 fn write_seq_numbers<W: std::io::Write, T, I>(
667 log: &mut W,
668 items: I,
669 label: &str,
670 extract_seq: fn(&T) -> u32,
671 ) -> std::io::Result<()>
672 where
673 I: IntoIterator<Item = T>,
674 {
675 let items: Vec<T> = items.into_iter().collect();
676 for chunk in items.chunks(15) {
677 write!(log, " | |")?;
678 for item in chunk {
679 write!(log, " {:08}", extract_seq(item))?;
680 }
681 writeln!(log, " {}", label)?;
682 }
683 Ok(())
684 }
685
686 new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
687 write_seq_numbers(&mut log, new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
688 write_seq_numbers(
689 &mut log,
690 blob_seq_numbers_to_delete,
691 "BLOB DELETED",
692 |&seq| seq,
693 )?;
694 write_seq_numbers(&mut log, sst_seq_numbers_to_delete, "SST DELETED", |&seq| {
695 seq
696 })?;
697 write_seq_numbers(
698 &mut log,
699 meta_seq_numbers_to_delete,
700 "META DELETED",
701 |&seq| seq,
702 )?;
703 #[cfg(feature = "verbose_log")]
704 {
705 writeln!(log, "New database state:")?;
706 writeln!(log, "FAM | META SEQ | SST SEQ FLAGS | RANGE")?;
707 let inner = self.inner.read();
708 let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
709 let mut set = HashSet::new();
710 move |family| set.insert(*family)
711 });
712 for family in families {
713 for meta in inner.meta_files.iter() {
714 if meta.family() != family {
715 continue;
716 }
717 let meta_seq = meta.sequence_number();
718 for entry in meta.entries().iter() {
719 let seq = entry.sequence_number();
720 let range = entry.range();
721 writeln!(
722 log,
723 "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
724 entry.flags(),
725 range_to_str(range.min_hash, range.max_hash)
726 )?;
727 }
728 }
729 }
730 }
731 }
732 anyhow::Ok(())
733 })?;
734 Ok(())
735 }
736
737 pub fn full_compact(&self) -> Result<()> {
740 self.compact(&CompactConfig {
741 min_merge_count: 2,
742 optimal_merge_count: usize::MAX,
743 max_merge_count: usize::MAX,
744 max_merge_bytes: u64::MAX,
745 min_merge_duplication_bytes: 0,
746 optimal_merge_duplication_bytes: u64::MAX,
747 max_merge_segment_count: usize::MAX,
748 })?;
749 Ok(())
750 }
751
752 pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
757 if self.read_only {
758 bail!("Compaction is not allowed on a read only database");
759 }
760 let _span = tracing::info_span!("compact database").entered();
761 if self
762 .active_write_operation
763 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
764 .is_err()
765 {
766 bail!(
767 "Another write batch or compaction is already active (Only a single write \
768 operations is allowed at a time)"
769 );
770 }
771
772 let mut sequence_number;
773 let mut new_meta_files = Vec::new();
774 let mut new_sst_files = Vec::new();
775 let mut sst_seq_numbers_to_delete = Vec::new();
776 let mut blob_seq_numbers_to_delete = Vec::new();
777 let mut keys_written = 0;
778
779 {
780 let inner = self.inner.read();
781 sequence_number = AtomicU32::new(inner.current_sequence_number);
782 self.compact_internal(
783 &inner.meta_files,
784 &sequence_number,
785 &mut new_meta_files,
786 &mut new_sst_files,
787 &mut sst_seq_numbers_to_delete,
788 &mut blob_seq_numbers_to_delete,
789 &mut keys_written,
790 compact_config,
791 )
792 .context("Failed to compact database")?;
793 }
794
795 let has_changes = !new_meta_files.is_empty();
796 if has_changes {
797 self.commit(CommitOptions {
798 new_meta_files,
799 new_sst_files,
800 new_blob_files: Vec::new(),
801 sst_seq_numbers_to_delete,
802 blob_seq_numbers_to_delete,
803 sequence_number: *sequence_number.get_mut(),
804 keys_written,
805 })
806 .context("Failed to commit the database compaction")?;
807 }
808
809 self.active_write_operation.store(false, Ordering::Release);
810
811 Ok(has_changes)
812 }
813
814 fn compact_internal(
816 &self,
817 meta_files: &[MetaFile],
818 sequence_number: &AtomicU32,
819 new_meta_files: &mut Vec<(u32, File)>,
820 new_sst_files: &mut Vec<(u32, File)>,
821 sst_seq_numbers_to_delete: &mut Vec<u32>,
822 blob_seq_numbers_to_delete: &mut Vec<u32>,
823 keys_written: &mut u64,
824 compact_config: &CompactConfig,
825 ) -> Result<()> {
826 if meta_files.is_empty() {
827 return Ok(());
828 }
829
830 struct SstWithRange {
831 meta_index: usize,
832 index_in_meta: u32,
833 seq: u32,
834 range: StaticSortedFileRange,
835 size: u64,
836 flags: MetaEntryFlags,
837 }
838
839 impl Compactable for SstWithRange {
840 fn range(&self) -> RangeInclusive<u64> {
841 self.range.min_hash..=self.range.max_hash
842 }
843
844 fn size(&self) -> u64 {
845 self.size
846 }
847
848 fn category(&self) -> u8 {
849 if self.flags.cold() { 1 } else { 0 }
852 }
853 }
854
855 let ssts_with_ranges = meta_files
856 .iter()
857 .enumerate()
858 .flat_map(|(meta_index, meta)| {
859 meta.entries()
860 .iter()
861 .enumerate()
862 .map(move |(index_in_meta, entry)| SstWithRange {
863 meta_index,
864 index_in_meta: index_in_meta as u32,
865 seq: entry.sequence_number(),
866 range: entry.range(),
867 size: entry.size(),
868 flags: entry.flags(),
869 })
870 })
871 .collect::<Vec<_>>();
872
873 let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
874
875 for sst in ssts_with_ranges {
876 sst_by_family[sst.range.family as usize].push(sst);
877 }
878
879 let key_block_cache = &self.key_block_cache;
880 let value_block_cache = &self.value_block_cache;
881 let path = &self.path;
882
883 let log_mutex = Mutex::new(());
884
885 struct PartialResultPerFamily {
886 new_meta_file: Option<(u32, File)>,
887 new_sst_files: Vec<(u32, File)>,
888 sst_seq_numbers_to_delete: Vec<u32>,
889 blob_seq_numbers_to_delete: Vec<u32>,
890 keys_written: u64,
891 }
892
893 let mut compact_config = compact_config.clone();
894 let merge_jobs = sst_by_family
895 .into_iter()
896 .enumerate()
897 .filter_map(|(family, ssts_with_ranges)| {
898 if compact_config.max_merge_segment_count == 0 {
899 return None;
900 }
901 let (merge_jobs, real_merge_job_size) =
902 get_merge_segments(&ssts_with_ranges, &compact_config);
903 compact_config.max_merge_segment_count -= real_merge_job_size;
904 Some((family, ssts_with_ranges, merge_jobs))
905 })
906 .collect::<Vec<_>>();
907
908 let mut used_key_hashes = [(); FAMILIES].map(|_| Vec::new());
909
910 {
911 for &(family, ..) in merge_jobs.iter() {
912 used_key_hashes[family].extend(
913 meta_files
914 .iter()
915 .filter(|m| m.family() == family as u32)
916 .filter_map(|meta_file| {
917 meta_file.deserialize_used_key_hashes_amqf().transpose()
918 })
919 .collect::<Result<Vec<_>>>()?,
920 );
921 }
922 }
923
924 let result = self
925 .parallel_scheduler
926 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
927 merge_jobs,
928 |(family, ssts_with_ranges, merge_jobs)| {
929 let family = family as u32;
930
931 if merge_jobs.is_empty() {
932 return Ok(PartialResultPerFamily {
933 new_meta_file: None,
934 new_sst_files: Vec::new(),
935 sst_seq_numbers_to_delete: Vec::new(),
936 blob_seq_numbers_to_delete: Vec::new(),
937 keys_written: 0,
938 });
939 }
940
941 let sst_seq_numbers_to_delete = merge_jobs
943 .iter()
944 .filter(|l| l.len() > 1)
945 .flat_map(|l| l.iter().copied())
946 .map(|index| ssts_with_ranges[index].seq)
947 .collect::<Vec<_>>();
948
949 let span = tracing::trace_span!("merge files");
951 enum PartialMergeResult<'l> {
952 Merged {
953 new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
954 blob_seq_numbers_to_delete: Vec<u32>,
955 keys_written: u64,
956 indicies: SmallVec<[usize; 1]>,
957 },
958 Move {
959 seq: u32,
960 meta: StaticSortedFileBuilderMeta<'l>,
961 },
962 }
963 let merge_result = self
964 .parallel_scheduler
965 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
966 merge_jobs,
967 |indicies| {
968 let _span = span.clone().entered();
969 if indicies.len() == 1 {
970 let index = indicies[0];
972 let meta_index = ssts_with_ranges[index].meta_index;
973 let index_in_meta = ssts_with_ranges[index].index_in_meta;
974 let meta_file = &meta_files[meta_index];
975 let entry = meta_file.entry(index_in_meta);
976 let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
977 let meta = StaticSortedFileBuilderMeta {
978 min_hash: entry.min_hash(),
979 max_hash: entry.max_hash(),
980 amqf,
981 key_compression_dictionary_length: entry
982 .key_compression_dictionary_length(),
983 block_count: entry.block_count(),
984 size: entry.size(),
985 flags: entry.flags(),
986 entries: 0,
987 };
988 return Ok(PartialMergeResult::Move {
989 seq: entry.sequence_number(),
990 meta,
991 });
992 }
993
994 fn create_sst_file<'l, S: ParallelScheduler>(
995 parallel_scheduler: &S,
996 entries: &[LookupEntry<'l>],
997 total_key_size: usize,
998 path: &Path,
999 seq: u32,
1000 flags: MetaEntryFlags,
1001 ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
1002 {
1003 let _span =
1004 tracing::trace_span!("write merged sst file").entered();
1005 let (meta, file) = parallel_scheduler.block_in_place(|| {
1006 write_static_stored_file(
1007 entries,
1008 total_key_size,
1009 &path.join(format!("{seq:08}.sst")),
1010 flags,
1011 )
1012 })?;
1013 Ok((seq, file, meta))
1014 }
1015
1016 let iters = indicies
1018 .iter()
1019 .map(|&index| {
1020 let meta_index = ssts_with_ranges[index].meta_index;
1021 let index_in_meta = ssts_with_ranges[index].index_in_meta;
1022 let meta = &meta_files[meta_index];
1023 meta.entry(index_in_meta)
1024 .sst(meta)?
1025 .iter(key_block_cache, value_block_cache)
1026 })
1027 .collect::<Result<Vec<_>>>()?;
1028
1029 let iter = MergeIter::new(iters.into_iter())?;
1030
1031 let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1034
1035 let mut keys_written = 0;
1036
1037 let mut current: Option<LookupEntry<'_>> = None;
1038
1039 #[derive(Default)]
1040 struct Collector<'l> {
1041 entries: Vec<LookupEntry<'l>>,
1042 total_key_size: usize,
1043 total_value_size: usize,
1044 last_entries: Vec<LookupEntry<'l>>,
1045 last_entries_total_key_size: usize,
1046 new_sst_files:
1047 Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1048 }
1049 let mut used_collector = Collector::default();
1050 let mut unused_collector = Collector::default();
1051 for entry in iter {
1052 let entry = entry?;
1053
1054 if let Some(current) = current.take() {
1056 if current.key != entry.key {
1057 let is_used =
1058 used_key_hashes[family as usize].iter().any(
1059 |amqf| amqf.contains_fingerprint(current.hash),
1060 );
1061 let collector = if is_used {
1062 &mut used_collector
1063 } else {
1064 &mut unused_collector
1065 };
1066 let key_size = current.key.len();
1067 let value_size =
1068 current.value.uncompressed_size_in_sst();
1069 collector.total_key_size += key_size;
1070 collector.total_value_size += value_size;
1071
1072 if collector.total_key_size + collector.total_value_size
1073 > DATA_THRESHOLD_PER_COMPACTED_FILE
1074 || collector.entries.len()
1075 >= MAX_ENTRIES_PER_COMPACTED_FILE
1076 {
1077 let selected_total_key_size =
1078 collector.last_entries_total_key_size;
1079 swap(
1080 &mut collector.entries,
1081 &mut collector.last_entries,
1082 );
1083 collector.last_entries_total_key_size =
1084 collector.total_key_size - key_size;
1085 collector.total_key_size = key_size;
1086 collector.total_value_size = value_size;
1087
1088 if !collector.entries.is_empty() {
1089 let seq = sequence_number
1090 .fetch_add(1, Ordering::SeqCst)
1091 + 1;
1092
1093 keys_written += collector.entries.len() as u64;
1094
1095 let mut flags = MetaEntryFlags::default();
1096 flags.set_cold(!is_used);
1097 collector.new_sst_files.push(create_sst_file(
1098 &self.parallel_scheduler,
1099 &collector.entries,
1100 selected_total_key_size,
1101 path,
1102 seq,
1103 flags,
1104 )?);
1105
1106 collector.entries.clear();
1107 }
1108 }
1109
1110 collector.entries.push(current);
1111 } else {
1112 }
1115 }
1116 current = Some(entry);
1117 }
1118 if let Some(entry) = current {
1119 let is_used = used_key_hashes[family as usize]
1120 .iter()
1121 .any(|amqf| amqf.contains_fingerprint(entry.hash));
1122 let collector = if is_used {
1123 &mut used_collector
1124 } else {
1125 &mut unused_collector
1126 };
1127
1128 collector.total_key_size += entry.key.len();
1129 collector.entries.push(entry);
1132 }
1133
1134 for (collector, flags) in [
1136 (&mut used_collector, MetaEntryFlags::WARM),
1137 (&mut unused_collector, MetaEntryFlags::COLD),
1138 ] {
1139 if collector.last_entries.is_empty()
1140 && !collector.entries.is_empty()
1141 {
1142 let seq =
1143 sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1144
1145 keys_written += collector.entries.len() as u64;
1146 collector.new_sst_files.push(create_sst_file(
1147 &self.parallel_scheduler,
1148 &collector.entries,
1149 collector.total_key_size,
1150 path,
1151 seq,
1152 flags,
1153 )?);
1154 } else
1155 if !collector.last_entries.is_empty() {
1159 collector.last_entries.append(&mut collector.entries);
1160
1161 collector.last_entries_total_key_size +=
1162 collector.total_key_size;
1163
1164 let (part1, part2) = collector
1165 .last_entries
1166 .split_at(collector.last_entries.len() / 2);
1167
1168 let seq1 =
1169 sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1170 let seq2 =
1171 sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1172
1173 keys_written += part1.len() as u64;
1174 collector.new_sst_files.push(create_sst_file(
1175 &self.parallel_scheduler,
1176 part1,
1177 collector.last_entries_total_key_size / 2,
1179 path,
1180 seq1,
1181 flags,
1182 )?);
1183
1184 keys_written += part2.len() as u64;
1185 collector.new_sst_files.push(create_sst_file(
1186 &self.parallel_scheduler,
1187 part2,
1188 collector.last_entries_total_key_size / 2,
1189 path,
1190 seq2,
1191 flags,
1192 )?);
1193 }
1194 }
1195 let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1196 new_sst_files.append(&mut used_collector.new_sst_files);
1197 Ok(PartialMergeResult::Merged {
1198 new_sst_files,
1199 blob_seq_numbers_to_delete,
1200 keys_written,
1201 indicies,
1202 })
1203 },
1204 )
1205 .with_context(|| {
1206 format!("Failed to merge database files for family {family}")
1207 })?;
1208
1209 let Some((sst_files_len, blob_delete_len)) = merge_result
1210 .iter()
1211 .map(|r| {
1212 if let PartialMergeResult::Merged {
1213 new_sst_files,
1214 blob_seq_numbers_to_delete,
1215 indicies: _,
1216 keys_written: _,
1217 } = r
1218 {
1219 (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1220 } else {
1221 (0, 0)
1222 }
1223 })
1224 .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1225 else {
1226 unreachable!()
1227 };
1228
1229 let mut new_sst_files = Vec::with_capacity(sst_files_len);
1230 let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1231
1232 let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1233 let mut meta_file_builder = MetaFileBuilder::new(family);
1234
1235 let mut keys_written = 0;
1236 self.parallel_scheduler.block_in_place(|| {
1237 let guard = log_mutex.lock();
1238 let mut log = self.open_log()?;
1239 writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1240 for result in merge_result {
1241 match result {
1242 PartialMergeResult::Merged {
1243 new_sst_files: merged_new_sst_files,
1244 blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1245 keys_written: merged_keys_written,
1246 indicies,
1247 } => {
1248 writeln!(
1249 log,
1250 "{family:3} | {meta_seq:08} | MERGE \
1251 ({merged_keys_written} keys):"
1252 )?;
1253 for i in indicies.iter() {
1254 let seq = ssts_with_ranges[*i].seq;
1255 let (min, max) = ssts_with_ranges[*i].range().into_inner();
1256 writeln!(
1257 log,
1258 "{family:3} | {meta_seq:08} | {seq:08} INPUT | {}",
1259 range_to_str(min, max)
1260 )?;
1261 }
1262 for (seq, file, meta) in merged_new_sst_files {
1263 let min = meta.min_hash;
1264 let max = meta.max_hash;
1265 writeln!(
1266 log,
1267 "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1268 ({})",
1269 range_to_str(min, max),
1270 meta.flags
1271 )?;
1272
1273 meta_file_builder.add(seq, meta);
1274 new_sst_files.push((seq, file));
1275 }
1276 blob_seq_numbers_to_delete
1277 .extend(merged_blob_seq_numbers_to_delete);
1278 keys_written += merged_keys_written;
1279 }
1280 PartialMergeResult::Move { seq, meta } => {
1281 let min = meta.min_hash;
1282 let max = meta.max_hash;
1283 writeln!(
1284 log,
1285 "{family:3} | {meta_seq:08} | {seq:08} MOVED | {}",
1286 range_to_str(min, max)
1287 )?;
1288
1289 meta_file_builder.add(seq, meta);
1290 }
1291 }
1292 }
1293 drop(log);
1294 drop(guard);
1295
1296 anyhow::Ok(())
1297 })?;
1298
1299 for &seq in sst_seq_numbers_to_delete.iter() {
1300 meta_file_builder.add_obsolete_sst_file(seq);
1301 }
1302
1303 let meta_file = {
1304 let _span = tracing::trace_span!("write meta file").entered();
1305 self.parallel_scheduler
1306 .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1307 };
1308
1309 Ok(PartialResultPerFamily {
1310 new_meta_file: Some((meta_seq, meta_file)),
1311 new_sst_files,
1312 sst_seq_numbers_to_delete,
1313 blob_seq_numbers_to_delete,
1314 keys_written,
1315 })
1316 },
1317 )?;
1318
1319 for PartialResultPerFamily {
1320 new_meta_file: inner_new_meta_file,
1321 new_sst_files: mut inner_new_sst_files,
1322 sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1323 blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1324 keys_written: inner_keys_written,
1325 } in result
1326 {
1327 new_meta_files.extend(inner_new_meta_file);
1328 new_sst_files.append(&mut inner_new_sst_files);
1329 sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1330 blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1331 *keys_written += inner_keys_written;
1332 }
1333
1334 Ok(())
1335 }
1336
1337 pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
1340 debug_assert!(family < FAMILIES, "Family index out of bounds");
1341 let hash = hash_key(key);
1342 let inner = self.inner.read();
1343 for meta in inner.meta_files.iter().rev() {
1344 match meta.lookup(
1345 family as u32,
1346 hash,
1347 key,
1348 &self.amqf_cache,
1349 &self.key_block_cache,
1350 &self.value_block_cache,
1351 )? {
1352 MetaLookupResult::FamilyMiss => {
1353 #[cfg(feature = "stats")]
1354 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1355 }
1356 MetaLookupResult::RangeMiss => {
1357 #[cfg(feature = "stats")]
1358 self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1359 }
1360 MetaLookupResult::QuickFilterMiss => {
1361 #[cfg(feature = "stats")]
1362 self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1363 }
1364 MetaLookupResult::SstLookup(result) => match result {
1365 SstLookupResult::Found(result) => {
1366 inner.accessed_key_hashes[family].insert(hash);
1367 match result {
1368 LookupValue::Deleted => {
1369 #[cfg(feature = "stats")]
1370 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1371 return Ok(None);
1372 }
1373 LookupValue::Slice { value } => {
1374 #[cfg(feature = "stats")]
1375 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1376 return Ok(Some(value));
1377 }
1378 LookupValue::Blob { sequence_number } => {
1379 #[cfg(feature = "stats")]
1380 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1381 let blob = self.read_blob(sequence_number)?;
1382 return Ok(Some(blob));
1383 }
1384 }
1385 }
1386 SstLookupResult::NotFound => {
1387 #[cfg(feature = "stats")]
1388 self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1389 }
1390 },
1391 }
1392 }
1393 #[cfg(feature = "stats")]
1394 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1395 Ok(None)
1396 }
1397
1398 #[cfg(feature = "stats")]
1400 pub fn statistics(&self) -> Statistics {
1401 let inner = self.inner.read();
1402 Statistics {
1403 meta_files: inner.meta_files.len(),
1404 sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1405 key_block_cache: CacheStatistics::new(&self.key_block_cache),
1406 value_block_cache: CacheStatistics::new(&self.value_block_cache),
1407 amqf_cache: CacheStatistics::new(&self.amqf_cache),
1408 hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1409 + self.stats.hits_small.load(Ordering::Relaxed)
1410 + self.stats.hits_blob.load(Ordering::Relaxed),
1411 misses: self.stats.miss_global.load(Ordering::Relaxed),
1412 miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1413 miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1414 miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1415 miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1416 }
1417 }
1418
1419 pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1420 Ok(self
1421 .inner
1422 .read()
1423 .meta_files
1424 .iter()
1425 .rev()
1426 .map(|meta_file| {
1427 let entries = meta_file
1428 .entries()
1429 .iter()
1430 .map(|entry| {
1431 let amqf = entry.raw_amqf(meta_file.amqf_data());
1432 MetaFileEntryInfo {
1433 sequence_number: entry.sequence_number(),
1434 min_hash: entry.min_hash(),
1435 max_hash: entry.max_hash(),
1436 sst_size: entry.size(),
1437 flags: entry.flags(),
1438 amqf_size: entry.amqf_size(),
1439 amqf_entries: amqf.len(),
1440 key_compression_dictionary_size: entry
1441 .key_compression_dictionary_length(),
1442 block_count: entry.block_count(),
1443 }
1444 })
1445 .collect();
1446 MetaFileInfo {
1447 sequence_number: meta_file.sequence_number(),
1448 family: meta_file.family(),
1449 obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1450 entries,
1451 }
1452 })
1453 .collect())
1454 }
1455
1456 pub fn shutdown(&self) -> Result<()> {
1458 #[cfg(feature = "print_stats")]
1459 println!("{:#?}", self.statistics());
1460 Ok(())
1461 }
1462}
1463
1464fn range_to_str(min: u64, max: u64) -> String {
1465 use std::fmt::Write;
1466 const DISPLAY_SIZE: usize = 100;
1467 const TOTAL_SIZE: u64 = u64::MAX;
1468 let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1469 let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1470 let mut range_str = String::new();
1471 for i in 0..DISPLAY_SIZE {
1472 if i == start_pos && i == end_pos {
1473 range_str.push('O');
1474 } else if i == start_pos {
1475 range_str.push('[');
1476 } else if i == end_pos {
1477 range_str.push(']');
1478 } else if i > start_pos && i < end_pos {
1479 range_str.push('=');
1480 } else {
1481 range_str.push(' ');
1482 }
1483 }
1484 write!(range_str, " | {min:016x}-{max:016x}").unwrap();
1485 range_str
1486}
1487
1488pub struct MetaFileInfo {
1489 pub sequence_number: u32,
1490 pub family: u32,
1491 pub obsolete_sst_files: Vec<u32>,
1492 pub entries: Vec<MetaFileEntryInfo>,
1493}
1494
1495pub struct MetaFileEntryInfo {
1496 pub sequence_number: u32,
1497 pub min_hash: u64,
1498 pub max_hash: u64,
1499 pub amqf_size: u32,
1500 pub amqf_entries: usize,
1501 pub sst_size: u64,
1502 pub flags: MetaEntryFlags,
1503 pub key_compression_dictionary_size: u16,
1504 pub block_count: u16,
1505}