1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{self, File, OpenOptions, ReadDir},
5 io::{BufWriter, Write},
6 mem::take,
7 ops::RangeInclusive,
8 path::{Path, PathBuf},
9 sync::atomic::{AtomicBool, AtomicU32, Ordering},
10};
11
12use anyhow::{Context, Result, bail};
13use byteorder::{BE, ReadBytesExt, WriteBytesExt};
14use dashmap::DashSet;
15use jiff::Timestamp;
16use memmap2::Mmap;
17use nohash_hasher::BuildNoHashHasher;
18use parking_lot::{Mutex, RwLock};
19use smallvec::SmallVec;
20use tracing::span::EnteredSpan;
21
22pub use crate::compaction::selector::CompactConfig;
23use crate::{
24 DbConfig, FamilyKind, QueryKey,
25 arc_bytes::ArcBytes,
26 compaction::selector::{Compactable, get_merge_segments},
27 compression::{checksum_block, decompress_into_arc},
28 constants::{
29 DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE,
30 MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE,
31 },
32 key::{StoreKey, hash_key},
33 lookup_entry::{LazyLookupValue, LookupEntry, LookupValue},
34 merge_iter::MergeIter,
35 meta_file::{MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
36 meta_file_builder::MetaFileBuilder,
37 mmap_helper::advise_mmap_for_persistence,
38 parallel_scheduler::ParallelScheduler,
39 sst_filter::SstFilter,
40 static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile},
41 static_sorted_file_builder::{StaticSortedFileBuilderMeta, StreamingSstWriter},
42 write_batch::{FinishResult, WriteBatch},
43};
44
45#[cfg(feature = "stats")]
46#[derive(Debug)]
47pub struct CacheStatistics {
48 pub hit_rate: f32,
49 pub fill: f32,
50 pub items: usize,
51 pub size: u64,
52 pub hits: u64,
53 pub misses: u64,
54}
55
56#[cfg(feature = "stats")]
57impl CacheStatistics {
58 fn new<Key, Val, We, B, L>(cache: &quick_cache::sync::Cache<Key, Val, We, B, L>) -> Self
59 where
60 Key: Eq + std::hash::Hash,
61 Val: Clone,
62 We: quick_cache::Weighter<Key, Val> + Clone,
63 B: std::hash::BuildHasher + Clone,
64 L: quick_cache::Lifecycle<Key, Val> + Clone,
65 {
66 let size = cache.weight();
67 let hits = cache.hits();
68 let misses = cache.misses();
69 Self {
70 hit_rate: hits as f32 / (hits + misses) as f32,
71 fill: size as f32 / cache.capacity() as f32,
72 items: cache.len(),
73 size,
74 hits,
75 misses,
76 }
77 }
78}
79
80#[cfg(feature = "stats")]
81#[derive(Debug)]
82pub struct Statistics {
83 pub meta_files: usize,
84 pub sst_files: usize,
85 pub key_block_cache: CacheStatistics,
86 pub value_block_cache: CacheStatistics,
87 pub hits: u64,
88 pub misses: u64,
89 pub miss_family: u64,
90 pub miss_range: u64,
91 pub miss_amqf: u64,
92 pub miss_key: u64,
93}
94
95#[cfg(feature = "stats")]
96#[derive(Default)]
97struct TrackedStats {
98 hits_deleted: std::sync::atomic::AtomicU64,
99 hits_small: std::sync::atomic::AtomicU64,
100 hits_blob: std::sync::atomic::AtomicU64,
101 miss_family: std::sync::atomic::AtomicU64,
102 miss_range: std::sync::atomic::AtomicU64,
103 miss_amqf: std::sync::atomic::AtomicU64,
104 miss_key: std::sync::atomic::AtomicU64,
105 miss_global: std::sync::atomic::AtomicU64,
106}
107
108pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
111 parallel_scheduler: S,
112 path: PathBuf,
114 read_only: bool,
117 inner: RwLock<Inner<FAMILIES>>,
119 active_write_operation: AtomicBool,
122 key_block_cache: BlockCache,
124 value_block_cache: BlockCache,
126 config: DbConfig<FAMILIES>,
128 #[cfg(feature = "stats")]
130 stats: TrackedStats,
131}
132
133struct Inner<const FAMILIES: usize> {
135 meta_files: Vec<MetaFile>,
137 current_sequence_number: u32,
139 accessed_key_hashes: [DashSet<u64, BuildNoHashHasher<u64>>; FAMILIES],
143}
144
145pub struct CommitOptions {
146 new_meta_files: Vec<(u32, File)>,
147 new_sst_files: Vec<(u32, File)>,
148 new_blob_files: Vec<(u32, File)>,
149 sst_seq_numbers_to_delete: Vec<u32>,
150 blob_seq_numbers_to_delete: Vec<u32>,
151 sequence_number: u32,
152 keys_written: u64,
153}
154
155impl<S: ParallelScheduler + Default, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
156 pub fn open(path: PathBuf) -> Result<Self> {
161 Self::open_with_parallel_scheduler(path, Default::default())
162 }
163
164 pub fn open_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
166 Self::open_with_config_and_parallel_scheduler(path, config, Default::default())
167 }
168
169 pub fn open_read_only_with_config(path: PathBuf, config: DbConfig<FAMILIES>) -> Result<Self> {
172 Self::open_read_only_with_parallel_scheduler(path, config, Default::default())
173 }
174}
175
176impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES> {
177 fn new(
178 path: PathBuf,
179 read_only: bool,
180 parallel_scheduler: S,
181 config: DbConfig<FAMILIES>,
182 ) -> Self {
183 Self {
184 parallel_scheduler,
185 path,
186 read_only,
187 inner: RwLock::new(Inner {
188 meta_files: Vec::new(),
189 current_sequence_number: 0,
190 accessed_key_hashes: [(); FAMILIES]
191 .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())),
192 }),
193 active_write_operation: AtomicBool::new(false),
194 key_block_cache: BlockCache::with(
195 KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE,
196 KEY_BLOCK_CACHE_SIZE,
197 Default::default(),
198 Default::default(),
199 Default::default(),
200 ),
201 value_block_cache: BlockCache::with(
202 VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE,
203 VALUE_BLOCK_CACHE_SIZE,
204 Default::default(),
205 Default::default(),
206 Default::default(),
207 ),
208 config,
209 #[cfg(feature = "stats")]
210 stats: TrackedStats::default(),
211 }
212 }
213
214 pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result<Self> {
219 Self::open_with_config_and_parallel_scheduler(path, DbConfig::default(), parallel_scheduler)
220 }
221
222 pub fn open_with_config_and_parallel_scheduler(
224 path: PathBuf,
225 config: DbConfig<FAMILIES>,
226 parallel_scheduler: S,
227 ) -> Result<Self> {
228 let mut db = Self::new(path, false, parallel_scheduler, config);
229 db.open_directory(false)?;
230 Ok(db)
231 }
232
233 fn open_read_only_with_parallel_scheduler(
236 path: PathBuf,
237 config: DbConfig<FAMILIES>,
238 parallel_scheduler: S,
239 ) -> Result<Self> {
240 let mut db = Self::new(path, true, parallel_scheduler, config);
241 db.open_directory(false)?;
242 Ok(db)
243 }
244
245 fn open_directory(&mut self, read_only: bool) -> Result<()> {
247 match fs::read_dir(&self.path) {
248 Ok(entries) => {
249 if !self
250 .load_directory(entries, read_only)
251 .context("Loading persistence directory failed")?
252 {
253 if read_only {
254 bail!("Failed to open database");
255 }
256 self.init_directory()
257 .context("Initializing persistence directory failed")?;
258 }
259 Ok(())
260 }
261 Err(e) => {
262 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
263 self.create_and_init_directory()
264 .context("Creating and initializing persistence directory failed")?;
265 Ok(())
266 } else {
267 Err(e).context("Failed to open database")
268 }
269 }
270 }
271 }
272
273 fn create_and_init_directory(&mut self) -> Result<()> {
275 fs::create_dir_all(&self.path)?;
276 self.init_directory()
277 }
278
279 fn init_directory(&mut self) -> Result<()> {
281 let mut current = File::create(self.path.join("CURRENT"))?;
282 current.write_u32::<BE>(0)?;
283 current.flush()?;
284 Ok(())
285 }
286
287 fn load_directory(&mut self, entries: ReadDir, read_only: bool) -> Result<bool> {
289 let mut meta_files = Vec::new();
290 let mut current_file = match File::open(self.path.join("CURRENT")) {
291 Ok(file) => file,
292 Err(e) => {
293 if !read_only && e.kind() == std::io::ErrorKind::NotFound {
294 return Ok(false);
295 } else {
296 return Err(e).context("Failed to open CURRENT file");
297 }
298 }
299 };
300 let current = current_file.read_u32::<BE>()?;
301 drop(current_file);
302
303 let mut deleted_files = HashSet::new();
304 for entry in entries {
305 let entry = entry?;
306 let path = entry.path();
307 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
308 let seq: u32 = path
309 .file_stem()
310 .context("File has no file stem")?
311 .to_str()
312 .context("File stem is not valid utf-8")?
313 .parse()?;
314 if deleted_files.contains(&seq) {
315 continue;
316 }
317 if seq > current {
318 if !read_only {
319 fs::remove_file(&path)?;
320 }
321 } else {
322 match ext {
323 "meta" => {
324 meta_files.push(seq);
325 }
326 "del" => {
327 let mut content = &*fs::read(&path)?;
328 let mut no_existing_files = true;
329 while !content.is_empty() {
330 let seq = content.read_u32::<BE>()?;
331 deleted_files.insert(seq);
332 if !read_only {
333 let sst_file = self.path.join(format!("{seq:08}.sst"));
335 let meta_file = self.path.join(format!("{seq:08}.meta"));
336 let blob_file = self.path.join(format!("{seq:08}.blob"));
337 for path in [sst_file, meta_file, blob_file] {
338 if fs::exists(&path)? {
339 fs::remove_file(path)?;
340 no_existing_files = false;
341 }
342 }
343 }
344 }
345 if !read_only && no_existing_files {
346 fs::remove_file(&path)?;
347 }
348 }
349 "blob" | "sst" => {
350 }
352 _ => {
353 if !path
354 .file_name()
355 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
356 {
357 bail!("Unexpected file in persistence directory: {:?}", path);
358 }
359 }
360 }
361 }
362 } else {
363 match path.file_stem().and_then(|s| s.to_str()) {
364 Some("CURRENT") => {
365 }
367 Some("LOG") => {
368 }
370 _ => {
371 if !path
372 .file_name()
373 .is_some_and(|s| s.as_encoded_bytes().starts_with(b"."))
374 {
375 bail!("Unexpected file in persistence directory: {:?}", path);
376 }
377 }
378 }
379 }
380 }
381
382 meta_files.retain(|seq| !deleted_files.contains(seq));
383 meta_files.sort_unstable();
384 let mut meta_files = self
385 .parallel_scheduler
386 .parallel_map_collect::<_, _, Result<Vec<MetaFile>>>(&meta_files, |&seq| {
387 let meta_file = MetaFile::open(&self.path, seq)?;
388 Ok(meta_file)
389 })?;
390
391 let mut sst_filter = SstFilter::new();
392 for meta_file in meta_files.iter_mut().rev() {
393 sst_filter.apply_filter(meta_file);
394 }
395
396 let inner = self.inner.get_mut();
397 inner.meta_files = meta_files;
398 inner.current_sequence_number = current;
399 Ok(true)
400 }
401
402 #[tracing::instrument(level = "info", name = "reading database blob", skip_all)]
404 fn read_blob(&self, seq: u32) -> Result<ArcBytes> {
405 let path = self.path.join(format!("{seq:08}.blob"));
406 let file = File::open(&path)
407 .with_context(|| format!("Failed to open blob file {}", path.display()))?;
408 let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
409 format!(
410 "Failed to mmap blob file {} ({} bytes)",
411 path.display(),
412 file.metadata().map(|m| m.len()).unwrap_or(0)
413 )
414 })?;
415 #[cfg(unix)]
416 mmap.advise(memmap2::Advice::Sequential)?;
417 #[cfg(unix)]
418 mmap.advise(memmap2::Advice::WillNeed)?;
419 advise_mmap_for_persistence(&mmap)?;
420 let mut reader = &mmap[..];
421 let uncompressed_length = reader
422 .read_u32::<BE>()
423 .context("Failed to read uncompressed length from blob file")?;
424 let expected_checksum = reader.read_u32::<BE>()?;
425
426 let actual_checksum = checksum_block(reader);
428 if actual_checksum != expected_checksum {
429 bail!(
430 "Cache corruption detected: checksum mismatch in blob file {:08}.blob (expected \
431 {:08x}, got {:08x})",
432 seq,
433 expected_checksum,
434 actual_checksum
435 );
436 }
437
438 let buffer = decompress_into_arc(uncompressed_length, reader)?;
439 Ok(ArcBytes::from(buffer))
440 }
441
442 pub fn is_empty(&self) -> bool {
444 self.inner.read().meta_files.is_empty()
445 }
446
447 pub fn write_batch<K: StoreKey + Send + Sync>(&self) -> Result<WriteBatch<K, S, FAMILIES>> {
452 if self.read_only {
453 bail!("Cannot write to a read-only database");
454 }
455 if self
456 .active_write_operation
457 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
458 .is_err()
459 {
460 bail!(
461 "Another write batch or compaction is already active (Only a single write \
462 operations is allowed at a time)"
463 );
464 }
465 let current = self.inner.read().current_sequence_number;
466 Ok(WriteBatch::new(
467 self.path.clone(),
468 current,
469 self.parallel_scheduler.clone(),
470 self.config.family_configs,
471 ))
472 }
473
474 pub fn clear_cache(&self) {
476 self.key_block_cache.clear();
477 self.value_block_cache.clear();
478 for meta in self.inner.write().meta_files.iter_mut() {
479 meta.clear_cache();
480 }
481 }
482
483 pub fn clear_block_caches(&self) {
485 self.key_block_cache.clear();
486 self.value_block_cache.clear();
487 }
488
489 pub fn prepare_all_sst_caches(&self) {
492 for meta in self.inner.write().meta_files.iter_mut() {
493 meta.prepare_sst_cache();
494 }
495 }
496
497 fn open_log(&self) -> Result<BufWriter<File>> {
498 if self.read_only {
499 unreachable!("Only write operations can open the log file");
500 }
501 let log_path = self.path.join("LOG");
502 let log_file = OpenOptions::new()
503 .create(true)
504 .append(true)
505 .open(log_path)?;
506 Ok(BufWriter::new(log_file))
507 }
508
509 pub fn commit_write_batch<K: StoreKey + Send + Sync>(
512 &self,
513 mut write_batch: WriteBatch<K, S, FAMILIES>,
514 ) -> Result<()> {
515 if self.read_only {
516 unreachable!("It's not possible to create a write batch for a read-only database");
517 }
518 let FinishResult {
519 sequence_number,
520 new_meta_files,
521 new_sst_files,
522 new_blob_files,
523 keys_written,
524 } = write_batch.finish(|family| {
525 let inner = self.inner.read();
526 let set = &inner.accessed_key_hashes[family as usize];
527 let initial_capacity = set.len() * 20 / 19;
530 let mut amqf =
531 qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8)
532 .unwrap();
533 set.retain(|hash| {
536 amqf.insert_fingerprint(false, *hash)
540 .expect("Failed to insert fingerprint");
541 false
542 });
543 amqf
544 })?;
545 self.commit(CommitOptions {
546 new_meta_files,
547 new_sst_files,
548 new_blob_files,
549 sst_seq_numbers_to_delete: vec![],
550 blob_seq_numbers_to_delete: vec![],
551 sequence_number,
552 keys_written,
553 })?;
554 self.active_write_operation.store(false, Ordering::Release);
555 Ok(())
556 }
557
558 fn commit(
561 &self,
562 CommitOptions {
563 mut new_meta_files,
564 new_sst_files,
565 mut new_blob_files,
566 mut sst_seq_numbers_to_delete,
567 mut blob_seq_numbers_to_delete,
568 sequence_number: mut seq,
569 keys_written,
570 }: CommitOptions,
571 ) -> Result<(), anyhow::Error> {
572 let time = Timestamp::now();
573
574 new_meta_files.sort_unstable_by_key(|(seq, _)| *seq);
575
576 let sync_span = tracing::info_span!("sync new files").entered();
577 let mut new_meta_files = self
578 .parallel_scheduler
579 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
580 file.sync_all()?;
581 let meta_file = MetaFile::open(&self.path, seq)?;
582 Ok(meta_file)
583 })?;
584
585 let mut sst_filter = SstFilter::new();
586 for meta_file in new_meta_files.iter_mut().rev() {
587 sst_filter.apply_filter(meta_file);
588 }
589
590 self.parallel_scheduler.block_in_place(|| {
591 for (_, file) in new_sst_files.iter() {
592 file.sync_all()?;
593 }
594 for (_, file) in new_blob_files.iter() {
595 file.sync_all()?;
596 }
597 anyhow::Ok(())
598 })?;
599 drop(sync_span);
600
601 let new_meta_info = new_meta_files
602 .iter()
603 .map(|meta| {
604 let ssts = meta
605 .entries()
606 .iter()
607 .map(|entry| {
608 let seq = entry.sequence_number();
609 let range = entry.range();
610 let size = entry.size();
611 let flags = entry.flags();
612 (seq, range.min_hash, range.max_hash, size, flags)
613 })
614 .collect::<Vec<_>>();
615 (
616 meta.sequence_number(),
617 meta.family(),
618 ssts,
619 meta.obsolete_sst_files().to_vec(),
620 )
621 })
622 .collect::<Vec<_>>();
623
624 let has_delete_file;
625 let mut meta_seq_numbers_to_delete = Vec::new();
626
627 {
628 let mut inner = self.inner.write();
629 for meta_file in inner.meta_files.iter_mut().rev() {
630 sst_filter.apply_filter(meta_file);
631 }
632 inner.meta_files.append(&mut new_meta_files);
633 inner.meta_files.reverse();
635 inner.meta_files.retain(|meta| {
636 if sst_filter.apply_and_get_remove(meta) {
637 meta_seq_numbers_to_delete.push(meta.sequence_number());
638 false
639 } else {
640 true
641 }
642 });
643 inner.meta_files.reverse();
644 has_delete_file = !sst_seq_numbers_to_delete.is_empty()
645 || !blob_seq_numbers_to_delete.is_empty()
646 || !meta_seq_numbers_to_delete.is_empty();
647 if has_delete_file {
648 seq += 1;
649 }
650 inner.current_sequence_number = seq;
651 }
652
653 self.parallel_scheduler.block_in_place(|| {
654 if has_delete_file {
655 sst_seq_numbers_to_delete.sort_unstable();
656 meta_seq_numbers_to_delete.sort_unstable();
657 blob_seq_numbers_to_delete.sort_unstable();
658 let mut buf = Vec::with_capacity(
660 (sst_seq_numbers_to_delete.len()
661 + meta_seq_numbers_to_delete.len()
662 + blob_seq_numbers_to_delete.len())
663 * size_of::<u32>(),
664 );
665 for seq in sst_seq_numbers_to_delete.iter() {
666 buf.write_u32::<BE>(*seq)?;
667 }
668 for seq in meta_seq_numbers_to_delete.iter() {
669 buf.write_u32::<BE>(*seq)?;
670 }
671 for seq in blob_seq_numbers_to_delete.iter() {
672 buf.write_u32::<BE>(*seq)?;
673 }
674 let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
675 file.write_all(&buf)?;
676 file.sync_all()?;
677 }
678
679 let mut current_file = OpenOptions::new()
680 .write(true)
681 .truncate(false)
682 .read(false)
683 .open(self.path.join("CURRENT"))?;
684 current_file.write_u32::<BE>(seq)?;
685 current_file.sync_all()?;
686
687 for seq in sst_seq_numbers_to_delete.iter() {
688 fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
689 }
690 for seq in meta_seq_numbers_to_delete.iter() {
691 fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
692 }
693 for seq in blob_seq_numbers_to_delete.iter() {
694 fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
695 }
696
697 {
698 let mut log = self.open_log()?;
699 writeln!(log, "Time {time}")?;
700 let span = time.until(Timestamp::now())?;
701 writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
702 writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?;
703 for (meta_seq, family, ssts, obsolete) in new_meta_info {
704 for (seq, min, max, size, flags) in ssts {
705 writeln!(
706 log,
707 "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB, {})",
708 range_to_str(min, max),
709 size / 1024 / 1024,
710 flags
711 )?;
712 }
713 for obsolete in obsolete.chunks(15) {
714 write!(log, "{family:3} | {meta_seq:08} |")?;
715 for seq in obsolete {
716 write!(log, " {seq:08}")?;
717 }
718 writeln!(log, " OBSOLETE SST")?;
719 }
720 }
721
722 fn write_seq_numbers<W: std::io::Write, T, I>(
723 log: &mut W,
724 items: I,
725 label: &str,
726 extract_seq: fn(&T) -> u32,
727 ) -> std::io::Result<()>
728 where
729 I: IntoIterator<Item = T>,
730 {
731 let items: Vec<T> = items.into_iter().collect();
732 for chunk in items.chunks(15) {
733 write!(log, " | |")?;
734 for item in chunk {
735 write!(log, " {:08}", extract_seq(item))?;
736 }
737 writeln!(log, " {}", label)?;
738 }
739 Ok(())
740 }
741
742 new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
743 write_seq_numbers(&mut log, new_blob_files, "NEW BLOB", |&(seq, _)| seq)?;
744 write_seq_numbers(
745 &mut log,
746 blob_seq_numbers_to_delete,
747 "BLOB DELETED",
748 |&seq| seq,
749 )?;
750 write_seq_numbers(&mut log, sst_seq_numbers_to_delete, "SST DELETED", |&seq| {
751 seq
752 })?;
753 write_seq_numbers(
754 &mut log,
755 meta_seq_numbers_to_delete,
756 "META DELETED",
757 |&seq| seq,
758 )?;
759 #[cfg(feature = "verbose_log")]
760 {
761 writeln!(log, "New database state:")?;
762 writeln!(log, "FAM | META SEQ | SST SEQ FLAGS | RANGE")?;
763 let inner = self.inner.read();
764 let families = inner.meta_files.iter().map(|meta| meta.family()).filter({
765 let mut set = HashSet::new();
766 move |family| set.insert(*family)
767 });
768 for family in families {
769 for meta in inner.meta_files.iter() {
770 if meta.family() != family {
771 continue;
772 }
773 let meta_seq = meta.sequence_number();
774 for entry in meta.entries().iter() {
775 let seq = entry.sequence_number();
776 let range = entry.range();
777 writeln!(
778 log,
779 "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}",
780 entry.flags(),
781 range_to_str(range.min_hash, range.max_hash)
782 )?;
783 }
784 }
785 }
786 }
787 }
788 anyhow::Ok(())
789 })?;
790 Ok(())
791 }
792
793 pub fn full_compact(&self) -> Result<()> {
796 self.compact(&CompactConfig {
797 min_merge_count: 2,
798 optimal_merge_count: usize::MAX,
799 max_merge_count: usize::MAX,
800 max_merge_bytes: u64::MAX,
801 min_merge_duplication_bytes: 0,
802 optimal_merge_duplication_bytes: u64::MAX,
803 max_merge_segment_count: usize::MAX,
804 })?;
805 Ok(())
806 }
807
808 pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
813 if self.read_only {
814 bail!("Compaction is not allowed on a read only database");
815 }
816 let _span = tracing::info_span!("compact database").entered();
817 if self
818 .active_write_operation
819 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
820 .is_err()
821 {
822 bail!(
823 "Another write batch or compaction is already active (Only a single write \
824 operations is allowed at a time)"
825 );
826 }
827
828 self.clear_cache();
833
834 let mut sequence_number;
835 let mut new_meta_files = Vec::new();
836 let mut new_sst_files = Vec::new();
837 let mut sst_seq_numbers_to_delete = Vec::new();
838 let mut blob_seq_numbers_to_delete = Vec::new();
839 let mut keys_written = 0;
840
841 {
842 let inner = self.inner.read();
843 sequence_number = AtomicU32::new(inner.current_sequence_number);
844 self.compact_internal(
845 &inner.meta_files,
846 &sequence_number,
847 &mut new_meta_files,
848 &mut new_sst_files,
849 &mut sst_seq_numbers_to_delete,
850 &mut blob_seq_numbers_to_delete,
851 &mut keys_written,
852 compact_config,
853 )
854 .context("Failed to compact database")?;
855 }
856
857 let has_changes = !new_meta_files.is_empty();
858 if has_changes {
859 self.commit(CommitOptions {
860 new_meta_files,
861 new_sst_files,
862 new_blob_files: Vec::new(),
863 sst_seq_numbers_to_delete,
864 blob_seq_numbers_to_delete,
865 sequence_number: *sequence_number.get_mut(),
866 keys_written,
867 })
868 .context("Failed to commit the database compaction")?;
869 }
870
871 self.active_write_operation.store(false, Ordering::Release);
872
873 Ok(has_changes)
874 }
875
876 fn compact_internal(
878 &self,
879 meta_files: &[MetaFile],
880 sequence_number: &AtomicU32,
881 new_meta_files: &mut Vec<(u32, File)>,
882 new_sst_files: &mut Vec<(u32, File)>,
883 sst_seq_numbers_to_delete: &mut Vec<u32>,
884 blob_seq_numbers_to_delete: &mut Vec<u32>,
885 keys_written: &mut u64,
886 compact_config: &CompactConfig,
887 ) -> Result<()> {
888 if meta_files.is_empty() {
889 return Ok(());
890 }
891
892 struct SstWithRange {
893 meta_index: usize,
894 index_in_meta: u32,
895 seq: u32,
896 range: StaticSortedFileRange,
897 size: u64,
898 flags: MetaEntryFlags,
899 }
900
901 impl Compactable for SstWithRange {
902 fn range(&self) -> RangeInclusive<u64> {
903 self.range.min_hash..=self.range.max_hash
904 }
905
906 fn size(&self) -> u64 {
907 self.size
908 }
909
910 fn category(&self) -> u8 {
911 if self.flags.cold() { 1 } else { 0 }
914 }
915 }
916
917 let ssts_with_ranges = meta_files
918 .iter()
919 .enumerate()
920 .flat_map(|(meta_index, meta)| {
921 meta.entries()
922 .iter()
923 .enumerate()
924 .map(move |(index_in_meta, entry)| SstWithRange {
925 meta_index,
926 index_in_meta: index_in_meta as u32,
927 seq: entry.sequence_number(),
928 range: entry.range(),
929 size: entry.size(),
930 flags: entry.flags(),
931 })
932 })
933 .collect::<Vec<_>>();
934
935 let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new());
936
937 for sst in ssts_with_ranges {
938 sst_by_family[sst.range.family as usize].push(sst);
939 }
940
941 let path = &self.path;
942
943 let log_mutex = Mutex::new(());
944
945 struct PartialResultPerFamily {
946 new_meta_file: Option<(u32, File)>,
947 new_sst_files: Vec<(u32, File)>,
948 sst_seq_numbers_to_delete: Vec<u32>,
949 blob_seq_numbers_to_delete: Vec<u32>,
950 keys_written: u64,
951 }
952
953 let mut compact_config = compact_config.clone();
954 let merge_jobs = sst_by_family
955 .into_iter()
956 .enumerate()
957 .filter_map(|(family, ssts_with_ranges)| {
958 if compact_config.max_merge_segment_count == 0 {
959 return None;
960 }
961 let (merge_jobs, real_merge_job_size) =
962 get_merge_segments(&ssts_with_ranges, &compact_config);
963 compact_config.max_merge_segment_count -= real_merge_job_size;
964 Some((family, ssts_with_ranges, merge_jobs))
965 })
966 .collect::<Vec<_>>();
967
968 let result = self
969 .parallel_scheduler
970 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
971 merge_jobs,
972 |(family, ssts_with_ranges, merge_jobs)| {
973 let family = family as u32;
974
975 if merge_jobs.is_empty() {
976 return Ok(PartialResultPerFamily {
977 new_meta_file: None,
978 new_sst_files: Vec::new(),
979 sst_seq_numbers_to_delete: Vec::new(),
980 blob_seq_numbers_to_delete: Vec::new(),
981 keys_written: 0,
982 });
983 }
984
985 let used_key_hashes: Option<qfilter::Filter> = {
990 let filters: Vec<qfilter::Filter> = meta_files
991 .iter()
992 .filter(|m| m.family() == family)
993 .filter_map(|meta_file| {
994 meta_file.deserialize_used_key_hashes_amqf().transpose()
995 })
996 .collect::<Result<Vec<_>>>()?
997 .into_iter()
998 .filter(|amqf| !amqf.is_empty())
999 .collect();
1000 if filters.is_empty() {
1001 None
1002 } else if filters.len() == 1 {
1003 filters.into_iter().next()
1005 } else {
1006 let total_len: u64 = filters.iter().map(|f| f.len()).sum();
1007 let mut merged =
1008 qfilter::Filter::with_fingerprint_size(total_len, u64::BITS as u8)
1009 .expect("Failed to create merged AMQF filter");
1010 for filter in &filters {
1011 merged
1012 .merge(false, filter)
1013 .expect("Failed to merge AMQF filters");
1014 }
1015 merged.shrink_to_fit();
1016 Some(merged)
1017 }
1018 };
1019
1020 let sst_seq_numbers_to_delete = merge_jobs
1022 .iter()
1023 .filter(|l| l.len() > 1)
1024 .flat_map(|l| l.iter().copied())
1025 .map(|index| ssts_with_ranges[index].seq)
1026 .collect::<Vec<_>>();
1027
1028 let span = tracing::trace_span!("merge files");
1030 enum PartialMergeResult<'l> {
1031 Merged {
1032 new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1033 blob_seq_numbers_to_delete: Vec<u32>,
1034 keys_written: u64,
1035 indices: SmallVec<[usize; 1]>,
1036 },
1037 Move {
1038 seq: u32,
1039 meta: StaticSortedFileBuilderMeta<'l>,
1040 },
1041 }
1042 let merge_result = self
1043 .parallel_scheduler
1044 .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(merge_jobs, |indices| {
1045 let _span = span.clone().entered();
1046 if indices.len() == 1 {
1047 let index = indices[0];
1049 let meta_index = ssts_with_ranges[index].meta_index;
1050 let index_in_meta = ssts_with_ranges[index].index_in_meta;
1051 let meta_file = &meta_files[meta_index];
1052 let entry = meta_file.entry(index_in_meta);
1053 let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data()));
1054 let meta = StaticSortedFileBuilderMeta {
1055 min_hash: entry.min_hash(),
1056 max_hash: entry.max_hash(),
1057 amqf,
1058 block_count: entry.block_count(),
1059 size: entry.size(),
1060 flags: entry.flags(),
1061 entries: 0,
1062 };
1063 return Ok(PartialMergeResult::Move {
1064 seq: entry.sequence_number(),
1065 meta,
1066 });
1067 }
1068
1069 let iters = indices
1073 .iter()
1074 .map(|&index| {
1075 let meta_index = ssts_with_ranges[index].meta_index;
1076 let index_in_meta = ssts_with_ranges[index].index_in_meta;
1077 let entry = meta_files[meta_index].entry(index_in_meta);
1078 StaticSortedFile::open_for_compaction(
1079 path,
1080 entry.sst_metadata(),
1081 )?
1082 .try_into_iter()
1083 })
1084 .collect::<Result<Vec<_>>>()?;
1085
1086 let iter = MergeIter::new(iters.into_iter())?;
1087
1088 let blob_seq_numbers_to_delete: Vec<u32> = Vec::new();
1091
1092 struct Collector {
1093 writer: Option<(u32, StreamingSstWriter<LookupEntry>)>,
1100 flags: MetaEntryFlags,
1101 new_sst_files:
1102 Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
1103 last_hash: Option<u64>,
1106 }
1107 impl Collector {
1108 fn new(flags: MetaEntryFlags) -> Self {
1109 Self {
1110 writer: None,
1111 flags,
1112 new_sst_files: Vec::new(),
1113 last_hash: None,
1114 }
1115 }
1116
1117 fn ensure_writer(
1119 &mut self,
1120 path: &Path,
1121 sequence_number: &AtomicU32,
1122 ) -> Result<&mut StreamingSstWriter<LookupEntry>>
1123 {
1124 if self.writer.is_none() {
1125 let seq =
1126 sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1127 let sst_path = path.join(format!("{seq:08}.sst"));
1128 let writer = StreamingSstWriter::new(
1129 &sst_path,
1130 self.flags,
1131 MAX_ENTRIES_PER_COMPACTED_FILE as u64,
1132 )?;
1133 self.writer = Some((seq, writer));
1134 }
1135 Ok(&mut self.writer.as_mut().unwrap().1)
1136 }
1137
1138 fn close_sst_file(&mut self, keys_written: &mut u64) -> Result<()> {
1142 if let Some((seq, writer)) = self.writer.take() {
1143 let _span =
1144 tracing::trace_span!("close merged sst file").entered();
1145 let (meta, file) = writer.close()?;
1146 *keys_written += meta.entries;
1147 self.new_sst_files.push((seq, file, meta));
1148 }
1149 Ok(())
1150 }
1151
1152 fn add_entry(
1156 &mut self,
1157 entry: LookupEntry,
1158 path: &Path,
1159 sequence_number: &AtomicU32,
1160 keys_written: &mut u64,
1161 ) -> Result<()> {
1162 let key_changed = self.last_hash != Some(entry.hash);
1163 if key_changed
1166 && let Some((_, ref writer)) = self.writer
1167 && writer.is_full(
1168 MAX_ENTRIES_PER_COMPACTED_FILE,
1169 DATA_THRESHOLD_PER_COMPACTED_FILE,
1170 )
1171 {
1172 self.close_sst_file(keys_written)?;
1173 }
1174 self.last_hash = Some(entry.hash);
1175 let writer = self.ensure_writer(path, sequence_number)?;
1176 writer.add(entry)?;
1177 Ok(())
1178 }
1179 }
1180 #[cfg(debug_assertions)]
1181 impl Drop for Collector {
1182 fn drop(&mut self) {
1183 if !std::thread::panicking() {
1184 assert!(
1185 self.writer.is_none(),
1186 "Collector dropped with an open writer"
1187 );
1188 }
1189 }
1190 }
1191 let mut used_collector = Collector::new(MetaEntryFlags::WARM);
1192 let mut unused_collector = Collector::new(MetaEntryFlags::COLD);
1193 let mut current_key: Option<ArcBytes> = None;
1194 let mut keys_written = 0;
1195
1196 let mut skip_remaining_for_this_key = false;
1203 let family_config = &self.config.family_configs[family as usize];
1204
1205 for entry in iter {
1206 let entry = entry?;
1207 if current_key.as_ref() != Some(&entry.key) {
1208 skip_remaining_for_this_key = false;
1210 current_key = Some(entry.key.clone());
1211 }
1212 if !skip_remaining_for_this_key {
1213 let is_used = used_key_hashes
1214 .as_ref()
1215 .is_some_and(|amqf| amqf.contains_fingerprint(entry.hash));
1216 let collector = if is_used {
1217 &mut used_collector
1218 } else {
1219 &mut unused_collector
1220 };
1221 match family_config.kind {
1222 FamilyKind::MultiValue => {
1223 if matches!(
1226 entry.value,
1227 LazyLookupValue::Eager(LookupValue::Deleted)
1228 ) {
1229 skip_remaining_for_this_key = true;
1230 }
1231 }
1232 FamilyKind::SingleValue => {
1233 skip_remaining_for_this_key = true;
1236 }
1237 }
1238 collector.add_entry(
1239 entry,
1240 path,
1241 sequence_number,
1242 &mut keys_written,
1243 )?;
1244 }
1245 }
1246
1247 used_collector.close_sst_file(&mut keys_written)?;
1249 unused_collector.close_sst_file(&mut keys_written)?;
1250
1251 let mut new_sst_files = take(&mut unused_collector.new_sst_files);
1252 new_sst_files.append(&mut used_collector.new_sst_files);
1253 Ok(PartialMergeResult::Merged {
1254 new_sst_files,
1255 blob_seq_numbers_to_delete,
1256 keys_written,
1257 indices,
1258 })
1259 })
1260 .with_context(|| {
1261 format!("Failed to merge database files for family {family}")
1262 })?;
1263
1264 let Some((sst_files_len, blob_delete_len)) = merge_result
1265 .iter()
1266 .map(|r| {
1267 if let PartialMergeResult::Merged {
1268 new_sst_files,
1269 blob_seq_numbers_to_delete,
1270 indices: _,
1271 keys_written: _,
1272 } = r
1273 {
1274 (new_sst_files.len(), blob_seq_numbers_to_delete.len())
1275 } else {
1276 (0, 0)
1277 }
1278 })
1279 .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2))
1280 else {
1281 unreachable!()
1282 };
1283
1284 let mut new_sst_files = Vec::with_capacity(sst_files_len);
1285 let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len);
1286
1287 let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
1288 let mut meta_file_builder = MetaFileBuilder::new(family);
1289
1290 let mut keys_written = 0;
1291 self.parallel_scheduler.block_in_place(|| {
1292 let guard = log_mutex.lock();
1293 let mut log = self.open_log()?;
1294 writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?;
1295 for result in merge_result {
1296 match result {
1297 PartialMergeResult::Merged {
1298 new_sst_files: merged_new_sst_files,
1299 blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete,
1300 keys_written: merged_keys_written,
1301 indices,
1302 } => {
1303 writeln!(
1304 log,
1305 "{family:3} | {meta_seq:08} | MERGE \
1306 ({merged_keys_written} keys):"
1307 )?;
1308 for i in indices.iter() {
1309 let seq = ssts_with_ranges[*i].seq;
1310 let (min, max) = ssts_with_ranges[*i].range().into_inner();
1311 writeln!(
1312 log,
1313 "{family:3} | {meta_seq:08} | {seq:08} INPUT | {}",
1314 range_to_str(min, max)
1315 )?;
1316 }
1317 for (seq, file, meta) in merged_new_sst_files {
1318 let min = meta.min_hash;
1319 let max = meta.max_hash;
1320 writeln!(
1321 log,
1322 "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \
1323 ({})",
1324 range_to_str(min, max),
1325 meta.flags
1326 )?;
1327
1328 meta_file_builder.add(seq, meta);
1329 new_sst_files.push((seq, file));
1330 }
1331 blob_seq_numbers_to_delete
1332 .extend(merged_blob_seq_numbers_to_delete);
1333 keys_written += merged_keys_written;
1334 }
1335 PartialMergeResult::Move { seq, meta } => {
1336 let min = meta.min_hash;
1337 let max = meta.max_hash;
1338 writeln!(
1339 log,
1340 "{family:3} | {meta_seq:08} | {seq:08} MOVED | {}",
1341 range_to_str(min, max)
1342 )?;
1343
1344 meta_file_builder.add(seq, meta);
1345 }
1346 }
1347 }
1348 drop(log);
1349 drop(guard);
1350
1351 anyhow::Ok(())
1352 })?;
1353
1354 for &seq in sst_seq_numbers_to_delete.iter() {
1355 meta_file_builder.add_obsolete_sst_file(seq);
1356 }
1357
1358 let meta_file = {
1359 let _span = tracing::trace_span!("write meta file").entered();
1360 self.parallel_scheduler
1361 .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))?
1362 };
1363
1364 Ok(PartialResultPerFamily {
1365 new_meta_file: Some((meta_seq, meta_file)),
1366 new_sst_files,
1367 sst_seq_numbers_to_delete,
1368 blob_seq_numbers_to_delete,
1369 keys_written,
1370 })
1371 },
1372 )?;
1373
1374 for PartialResultPerFamily {
1375 new_meta_file: inner_new_meta_file,
1376 new_sst_files: mut inner_new_sst_files,
1377 sst_seq_numbers_to_delete: mut inner_sst_seq_numbers_to_delete,
1378 blob_seq_numbers_to_delete: mut inner_blob_seq_numbers_to_delete,
1379 keys_written: inner_keys_written,
1380 } in result
1381 {
1382 new_meta_files.extend(inner_new_meta_file);
1383 new_sst_files.append(&mut inner_new_sst_files);
1384 sst_seq_numbers_to_delete.append(&mut inner_sst_seq_numbers_to_delete);
1385 blob_seq_numbers_to_delete.append(&mut inner_blob_seq_numbers_to_delete);
1386 *keys_written += inner_keys_written;
1387 }
1388
1389 Ok(())
1390 }
1391
1392 pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcBytes>> {
1395 debug_assert!(family < FAMILIES, "Family index out of bounds");
1396 if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1397 panic!(
1399 "only single valued tables can be queried with `get', call `get_multiple` instead"
1400 )
1401 }
1402 let span = tracing::trace_span!(
1403 "database read",
1404 name = family,
1405 result_size = tracing::field::Empty
1406 )
1407 .entered();
1408 let results = self.get_impl::<K, false>(family, key, &span)?;
1409 debug_assert!(results.len() <= 1, "get() should return at most one result");
1410 Ok(results.into_iter().next())
1411 }
1412
1413 pub fn get_multiple<K: QueryKey>(
1423 &self,
1424 family: usize,
1425 key: &K,
1426 ) -> Result<SmallVec<[ArcBytes; 1]>> {
1427 debug_assert!(family < FAMILIES, "Family index out of bounds");
1428 if self.config.family_configs[family].kind != FamilyKind::MultiValue {
1429 panic!("only multi-valued tables can be queried with `get_multiple`")
1431 }
1432 let span = tracing::trace_span!(
1433 "database read multiple",
1434 name = family,
1435 result_count = tracing::field::Empty,
1436 result_size = tracing::field::Empty
1437 )
1438 .entered();
1439 let results = self.get_impl::<K, true>(family, key, &span)?;
1440 Ok(results)
1441 }
1442
1443 fn get_impl<K: QueryKey, const FIND_ALL: bool>(
1448 &self,
1449 family: usize,
1450 key: &K,
1451 span: &EnteredSpan,
1452 ) -> Result<SmallVec<[ArcBytes; 1]>> {
1453 let hash = hash_key(key);
1454 let inner = self.inner.read();
1455 let mut output: SmallVec<[ArcBytes; 1]> = SmallVec::new();
1456 #[cfg(feature = "stats")]
1459 let mut found_in_sst = false;
1460
1461 let mut size = 0;
1462
1463 for meta in inner.meta_files.iter().rev() {
1464 match meta.lookup::<K, FIND_ALL>(
1465 family as u32,
1466 hash,
1467 key,
1468 &self.key_block_cache,
1469 &self.value_block_cache,
1470 )? {
1471 MetaLookupResult::FamilyMiss => {
1472 #[cfg(feature = "stats")]
1473 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1474 }
1475 MetaLookupResult::RangeMiss => {
1476 #[cfg(feature = "stats")]
1477 self.stats.miss_range.fetch_add(1, Ordering::Relaxed);
1478 }
1479 MetaLookupResult::QuickFilterMiss => {
1480 #[cfg(feature = "stats")]
1481 self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed);
1482 }
1483 MetaLookupResult::SstLookup(result) => match result {
1484 SstLookupResult::Found(values) => {
1485 #[cfg(feature = "stats")]
1486 {
1487 found_in_sst = true;
1488 }
1489 inner.accessed_key_hashes[family].insert(hash);
1490 for value in values {
1493 match value {
1494 LookupValue::Deleted => {
1495 #[cfg(feature = "stats")]
1496 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1497 if !FIND_ALL {
1498 span.record("result_size", "deleted");
1499 return Ok(SmallVec::new());
1500 }
1501 if output.is_empty() {
1505 span.record("result_size", "deleted");
1506 } else {
1507 span.record("result_size", size);
1508 }
1509 return Ok(output);
1510 }
1511 LookupValue::Slice { value } => {
1512 #[cfg(feature = "stats")]
1513 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1514 if !FIND_ALL {
1515 span.record("result_size", value.len());
1516 return Ok(SmallVec::from_buf([value]));
1517 }
1518 size += value.len();
1519 output.push(value);
1520 }
1521 LookupValue::Blob { sequence_number } => {
1522 #[cfg(feature = "stats")]
1523 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1524 let blob = self.read_blob(sequence_number)?;
1525 if !FIND_ALL {
1526 span.record("result_size", blob.len());
1527 return Ok(SmallVec::from_buf([blob]));
1528 }
1529 size += blob.len();
1530 output.push(blob);
1531 }
1532 }
1533 }
1534 }
1535 SstLookupResult::NotFound => {
1536 #[cfg(feature = "stats")]
1537 self.stats.miss_key.fetch_add(1, Ordering::Relaxed);
1538 }
1539 },
1540 }
1541 }
1542
1543 #[cfg(feature = "stats")]
1544 if !found_in_sst {
1545 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1546 }
1547
1548 if FIND_ALL {
1549 span.record("result_count", output.len());
1550 }
1551 if output.is_empty() {
1552 span.record("result_size", "not_found");
1553 } else {
1554 span.record("result_size", size);
1555 }
1556 Ok(output)
1557 }
1558
1559 pub fn batch_get<K: QueryKey>(
1560 &self,
1561 family: usize,
1562 keys: &[K],
1563 ) -> Result<Vec<Option<ArcBytes>>> {
1564 debug_assert!(family < FAMILIES, "Family index out of bounds");
1565 if self.config.family_configs[family].kind != FamilyKind::SingleValue {
1566 panic!("only single valued tables can be queried with `batch_get'")
1568 }
1569 let span = tracing::trace_span!(
1570 "database batch read",
1571 name = family,
1572 keys = keys.len(),
1573 not_found = tracing::field::Empty,
1574 deleted = tracing::field::Empty,
1575 result_size = tracing::field::Empty
1576 )
1577 .entered();
1578 let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
1579 let mut empty_cells = keys.len();
1580 for (index, key) in keys.iter().enumerate() {
1581 let hash = hash_key(key);
1582 cells.push((hash, index, None));
1583 }
1584 cells.sort_by_key(|(hash, _, _)| *hash);
1585 let inner = self.inner.read();
1586 for meta in inner.meta_files.iter().rev() {
1587 let _result = meta.batch_lookup(
1588 family as u32,
1589 keys,
1590 &mut cells,
1591 &mut empty_cells,
1592 &self.key_block_cache,
1593 &self.value_block_cache,
1594 )?;
1595
1596 #[cfg(feature = "stats")]
1597 {
1598 let crate::meta_file::MetaBatchLookupResult {
1599 family_miss,
1600 range_misses,
1601 quick_filter_misses,
1602 sst_misses,
1603 hits: _,
1604 } = _result;
1605 if family_miss {
1606 self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
1607 }
1608 if range_misses > 0 {
1609 self.stats
1610 .miss_range
1611 .fetch_add(range_misses as u64, Ordering::Relaxed);
1612 }
1613 if quick_filter_misses > 0 {
1614 self.stats
1615 .miss_amqf
1616 .fetch_add(quick_filter_misses as u64, Ordering::Relaxed);
1617 }
1618 if sst_misses > 0 {
1619 self.stats
1620 .miss_key
1621 .fetch_add(sst_misses as u64, Ordering::Relaxed);
1622 }
1623 }
1624
1625 if empty_cells == 0 {
1626 break;
1627 }
1628 }
1629 let mut deleted = 0;
1630 let mut not_found = 0;
1631 let mut result_size = 0;
1632 let mut results = vec![None; keys.len()];
1633 for (hash, index, result) in cells {
1634 if let Some(result) = result {
1635 inner.accessed_key_hashes[family].insert(hash);
1636 let result = match result {
1637 LookupValue::Deleted => {
1638 #[cfg(feature = "stats")]
1639 self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
1640 deleted += 1;
1641 None
1642 }
1643 LookupValue::Slice { value } => {
1644 #[cfg(feature = "stats")]
1645 self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
1646 result_size += value.len();
1647 Some(value)
1648 }
1649 LookupValue::Blob { sequence_number } => {
1650 #[cfg(feature = "stats")]
1651 self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
1652 let blob = self.read_blob(sequence_number)?;
1653 result_size += blob.len();
1654 Some(blob)
1655 }
1656 };
1657 results[index] = result;
1658 } else {
1659 #[cfg(feature = "stats")]
1660 self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
1661 not_found += 1;
1662 }
1663 }
1664 span.record("not_found", not_found);
1665 span.record("deleted", deleted);
1666 span.record("result_size", result_size);
1667 Ok(results)
1668 }
1669
1670 #[cfg(feature = "stats")]
1672 pub fn statistics(&self) -> Statistics {
1673 let inner = self.inner.read();
1674 Statistics {
1675 meta_files: inner.meta_files.len(),
1676 sst_files: inner.meta_files.iter().map(|m| m.entries().len()).sum(),
1677 key_block_cache: CacheStatistics::new(&self.key_block_cache),
1678 value_block_cache: CacheStatistics::new(&self.value_block_cache),
1679 hits: self.stats.hits_deleted.load(Ordering::Relaxed)
1680 + self.stats.hits_small.load(Ordering::Relaxed)
1681 + self.stats.hits_blob.load(Ordering::Relaxed),
1682 misses: self.stats.miss_global.load(Ordering::Relaxed),
1683 miss_family: self.stats.miss_family.load(Ordering::Relaxed),
1684 miss_range: self.stats.miss_range.load(Ordering::Relaxed),
1685 miss_amqf: self.stats.miss_amqf.load(Ordering::Relaxed),
1686 miss_key: self.stats.miss_key.load(Ordering::Relaxed),
1687 }
1688 }
1689
1690 pub fn meta_info(&self) -> Result<Vec<MetaFileInfo>> {
1691 Ok(self
1692 .inner
1693 .read()
1694 .meta_files
1695 .iter()
1696 .rev()
1697 .map(|meta_file| {
1698 let entries = meta_file
1699 .entries()
1700 .iter()
1701 .map(|entry| {
1702 let amqf = entry.raw_amqf(meta_file.amqf_data());
1703 MetaFileEntryInfo {
1704 sequence_number: entry.sequence_number(),
1705 min_hash: entry.min_hash(),
1706 max_hash: entry.max_hash(),
1707 sst_size: entry.size(),
1708 flags: entry.flags(),
1709 amqf_size: entry.amqf_size(),
1710 amqf_entries: amqf.len(),
1711 block_count: entry.block_count(),
1712 }
1713 })
1714 .collect();
1715 MetaFileInfo {
1716 sequence_number: meta_file.sequence_number(),
1717 family: meta_file.family(),
1718 obsolete_sst_files: meta_file.obsolete_sst_files().to_vec(),
1719 entries,
1720 }
1721 })
1722 .collect())
1723 }
1724
1725 pub fn shutdown(&self) -> Result<()> {
1727 #[cfg(feature = "print_stats")]
1728 println!("{:#?}", self.statistics());
1729 Ok(())
1730 }
1731}
1732
1733fn range_to_str(min: u64, max: u64) -> String {
1734 use std::fmt::Write;
1735 const DISPLAY_SIZE: usize = 100;
1736 const TOTAL_SIZE: u64 = u64::MAX;
1737 let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1738 let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize;
1739 let mut range_str = String::new();
1740 for i in 0..DISPLAY_SIZE {
1741 if i == start_pos && i == end_pos {
1742 range_str.push('O');
1743 } else if i == start_pos {
1744 range_str.push('[');
1745 } else if i == end_pos {
1746 range_str.push(']');
1747 } else if i > start_pos && i < end_pos {
1748 range_str.push('=');
1749 } else {
1750 range_str.push(' ');
1751 }
1752 }
1753 write!(range_str, " | {min:016x}-{max:016x}").unwrap();
1754 range_str
1755}
1756
1757pub struct MetaFileInfo {
1758 pub sequence_number: u32,
1759 pub family: u32,
1760 pub obsolete_sst_files: Vec<u32>,
1761 pub entries: Vec<MetaFileEntryInfo>,
1762}
1763
1764pub struct MetaFileEntryInfo {
1765 pub sequence_number: u32,
1766 pub min_hash: u64,
1767 pub max_hash: u64,
1768 pub amqf_size: u32,
1769 pub amqf_entries: usize,
1770 pub sst_size: u64,
1771 pub flags: MetaEntryFlags,
1772 pub block_count: u16,
1773}