1use std::{
2 cmp::Ordering,
3 fmt::Display,
4 fs::File,
5 path::{Path, PathBuf},
6 sync::OnceLock,
7};
8
9use anyhow::{Context, Result, bail};
10use bitfield::bitfield;
11use byteorder::{BE, ReadBytesExt};
12use memmap2::{Mmap, MmapOptions};
13use smallvec::SmallVec;
14use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, big_endian as be};
15
16use crate::{
17 QueryKey,
18 lookup_entry::LookupValue,
19 mmap_helper::advise_mmap_for_persistence,
20 static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData},
21};
22
23bitfield! {
24 #[derive(Clone, Copy, Default)]
25 pub struct MetaEntryFlags(u32);
26 impl Debug;
27 impl From<u32>;
28 pub cold, set_cold: 0;
30 pub fresh, set_fresh: 1;
32}
33
34impl MetaEntryFlags {
35 pub const FRESH: MetaEntryFlags = MetaEntryFlags(0b10);
36 pub const COLD: MetaEntryFlags = MetaEntryFlags(0b01);
37 pub const WARM: MetaEntryFlags = MetaEntryFlags(0b00);
38}
39
40impl Display for MetaEntryFlags {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 if self.fresh() {
43 f.pad_integral(true, "", "fresh")
44 } else if self.cold() {
45 f.pad_integral(true, "", "cold")
46 } else {
47 f.pad_integral(true, "", "warm")
48 }
49 }
50}
51
52#[repr(C, packed)]
56#[derive(FromBytes, IntoBytes, Immutable, KnownLayout, Clone, Copy)]
57pub(crate) struct EntryHeader {
58 sequence_number: be::U32,
59 block_count: be::U16,
60 min_hash: be::U64,
61 max_hash: be::U64,
62 size: be::U64,
63 flags: be::U32,
64 amqf_end_offset: be::U32,
65}
66
67impl EntryHeader {
68 pub(crate) fn new(
69 sequence_number: u32,
70 block_count: u16,
71 min_hash: u64,
72 max_hash: u64,
73 size: u64,
74 flags: MetaEntryFlags,
75 amqf_end_offset: u32,
76 ) -> Self {
77 Self {
78 sequence_number: be::U32::new(sequence_number),
79 block_count: be::U16::new(block_count),
80 min_hash: be::U64::new(min_hash),
81 max_hash: be::U64::new(max_hash),
82 size: be::U64::new(size),
83 flags: be::U32::new(flags.0),
84 amqf_end_offset: be::U32::new(amqf_end_offset),
85 }
86 }
87}
88
89pub struct MetaEntry {
97 sst_data: StaticSortedFileMetaData,
99 family: u32,
101 min_hash: u64,
103 max_hash: u64,
105 size: u64,
107 flags: MetaEntryFlags,
109 amqf_data_offset: std::ops::Range<u32>,
112 amqf: qfilter::FilterRef<'static>,
117 sst: OnceLock<StaticSortedFile>,
119}
120
121unsafe impl Send for MetaEntry {}
123unsafe impl Sync for MetaEntry {}
124
125impl MetaEntry {
126 pub fn sequence_number(&self) -> u32 {
127 self.sst_data.sequence_number
128 }
129
130 pub fn size(&self) -> u64 {
131 self.size
132 }
133
134 pub fn flags(&self) -> MetaEntryFlags {
135 self.flags
136 }
137
138 pub fn amqf_size(&self) -> u32 {
139 self.amqf_data_offset.end - self.amqf_data_offset.start
140 }
141
142 pub fn raw_amqf<'l>(&self, amqf_data: &'l [u8]) -> &'l [u8] {
144 &amqf_data[self.amqf_data_offset.start as usize..self.amqf_data_offset.end as usize]
145 }
146
147 fn sst(&self, meta: &MetaFile) -> Result<&StaticSortedFile> {
148 self.sst.get_or_try_init(|| {
149 StaticSortedFile::open(&meta.db_path, self.sst_data).with_context(|| {
150 format!(
151 "Unable to open static sorted file referenced from {:08}.meta",
152 meta.sequence_number()
153 )
154 })
155 })
156 }
157
158 pub fn range(&self) -> StaticSortedFileRange {
160 StaticSortedFileRange {
161 family: self.family,
162 min_hash: self.min_hash,
163 max_hash: self.max_hash,
164 }
165 }
166
167 pub fn min_hash(&self) -> u64 {
168 self.min_hash
169 }
170
171 pub fn max_hash(&self) -> u64 {
172 self.max_hash
173 }
174
175 pub fn block_count(&self) -> u16 {
176 self.sst_data.block_count
177 }
178
179 pub fn sst_metadata(&self) -> StaticSortedFileMetaData {
182 self.sst_data
183 }
184}
185
186pub enum MetaLookupResult {
188 FamilyMiss,
190 RangeMiss,
193 QuickFilterMiss,
195 SstLookup(SstLookupResult),
197}
198
199#[derive(Default)]
201pub struct MetaBatchLookupResult {
202 #[cfg(feature = "stats")]
204 pub family_miss: bool,
205 #[cfg(feature = "stats")]
208 pub range_misses: usize,
209 #[cfg(feature = "stats")]
211 pub quick_filter_misses: usize,
212 #[cfg(feature = "stats")]
214 pub sst_misses: usize,
215 #[cfg(feature = "stats")]
217 pub hits: usize,
218}
219
220#[derive(Clone, Copy)]
222pub struct StaticSortedFileRange {
223 pub family: u32,
224 pub min_hash: u64,
225 pub max_hash: u64,
226}
227
228pub struct MetaFile {
234 db_path: PathBuf,
236 sequence_number: u32,
238 family: u32,
240 entries: Vec<MetaEntry>,
242 obsolete_entries: Vec<u32>,
244 obsolete_sst_files: Vec<u32>,
246 amqf_data_start: u32,
249 start_of_used_keys_amqf_data_offset: u32,
251 end_of_used_keys_amqf_data_offset: u32,
253 mmap: Mmap,
257}
258
259impl MetaFile {
260 pub fn open(db_path: &Path, sequence_number: u32) -> Result<Self> {
263 let filename = format!("{sequence_number:08}.meta");
264 let path = db_path.join(&filename);
265 Self::open_internal(db_path.to_path_buf(), sequence_number, &path)
266 .with_context(|| format!("Unable to open meta file {filename}"))
267 }
268
269 fn open_internal(db_path: PathBuf, sequence_number: u32, path: &Path) -> Result<Self> {
270 let file = File::open(path).context("Failed to open meta file")?;
271 let mmap = unsafe { MmapOptions::new().map(&file) }.context("Failed to mmap")?;
272 #[cfg(unix)]
273 mmap.advise(memmap2::Advice::Random)
274 .context("Failed to advise mmap")?;
275 advise_mmap_for_persistence(&mmap)?;
276 let mut reader: &[u8] = &mmap;
278 let magic = reader.read_u32::<BE>()?;
279 if magic != 0xFE4ADA4A {
280 bail!("Invalid magic number");
281 }
282 let family = reader.read_u32::<BE>()?;
283 let obsolete_count = reader.read_u32::<BE>()?;
284 let mut obsolete_sst_files = Vec::with_capacity(obsolete_count as usize);
285 for _ in 0..obsolete_count {
286 obsolete_sst_files.push(reader.read_u32::<BE>()?);
287 }
288
289 let count = reader.read_u32::<BE>()?;
290
291 let header_so_far = (mmap.len() - reader.len()) as u32;
294 let amqf_data_start =
295 header_so_far + count * (size_of::<EntryHeader>() as u32) + size_of::<u32>() as u32;
296 let amqf_data = &mmap[amqf_data_start as usize..];
297
298 let mut entries = Vec::with_capacity(count as usize);
300 let mut start_of_amqf_data_offset: u32 = 0;
301 for _ in 0..count {
302 let (header, rest): (Ref<&[u8], EntryHeader>, _) = Ref::from_prefix(reader)
303 .ok()
304 .context("Entry header out of bounds")?;
305 reader = rest;
306 let sst_data = StaticSortedFileMetaData {
307 sequence_number: header.sequence_number.get(),
308 block_count: header.block_count.get(),
309 };
310 let min_hash = header.min_hash.get();
311 let max_hash = header.max_hash.get();
312 let size = header.size.get();
313 let flags = MetaEntryFlags(header.flags.get());
314 let end_of_amqf_data_offset = header.amqf_end_offset.get();
315
316 let amqf_bytes = amqf_data
317 .get(start_of_amqf_data_offset as usize..end_of_amqf_data_offset as usize)
318 .expect("AMQF data out of bounds");
319 let amqf: qfilter::FilterRef<'_> =
321 postcard::from_bytes(amqf_bytes).with_context(|| {
322 format!(
323 "Failed to deserialize AMQF from {:08}.meta for {:08}.sst",
324 sequence_number, sst_data.sequence_number
325 )
326 })?;
327 let amqf: qfilter::FilterRef<'static> = unsafe { std::mem::transmute(amqf) };
330
331 entries.push(MetaEntry {
332 sst_data,
333 family,
334 min_hash,
335 max_hash,
336 size,
337 flags,
338 amqf_data_offset: start_of_amqf_data_offset..end_of_amqf_data_offset,
339 amqf,
340 sst: OnceLock::new(),
341 });
342 start_of_amqf_data_offset = end_of_amqf_data_offset;
343 }
344
345 let start_of_used_keys_amqf_data_offset = start_of_amqf_data_offset;
346 let end_of_used_keys_amqf_data_offset = reader.read_u32::<BE>()?;
347
348 Ok(Self {
349 db_path,
350 sequence_number,
351 family,
352 entries,
353 obsolete_entries: Vec::new(),
354 obsolete_sst_files,
355 amqf_data_start,
356 start_of_used_keys_amqf_data_offset,
357 end_of_used_keys_amqf_data_offset,
358 mmap,
359 })
360 }
361
362 pub fn clear_cache(&mut self) {
363 for entry in self.entries.iter_mut() {
364 entry.sst.take();
365 }
366 }
367
368 pub fn prepare_sst_cache(&self) {
369 for entry in self.entries.iter() {
370 let _ = entry.sst(self);
371 }
372 }
373
374 pub fn sequence_number(&self) -> u32 {
375 self.sequence_number
376 }
377
378 pub fn family(&self) -> u32 {
379 self.family
380 }
381
382 pub fn entries(&self) -> &[MetaEntry] {
383 &self.entries
384 }
385
386 pub fn entry(&self, index: u32) -> &MetaEntry {
387 let index = index as usize;
388 &self.entries[index]
389 }
390
391 pub fn amqf_data(&self) -> &[u8] {
392 &self.mmap[self.amqf_data_start as usize..]
393 }
394
395 pub fn deserialize_used_key_hashes_amqf(&self) -> Result<Option<qfilter::FilterRef<'_>>> {
396 if self.start_of_used_keys_amqf_data_offset == self.end_of_used_keys_amqf_data_offset {
397 return Ok(None);
398 }
399 let amqf = &self.amqf_data()[self.start_of_used_keys_amqf_data_offset as usize
400 ..self.end_of_used_keys_amqf_data_offset as usize];
401 Ok(Some(postcard::from_bytes(amqf).with_context(|| {
402 format!(
403 "Failed to deserialize used key hashes AMQF from {:08}.meta",
404 self.sequence_number
405 )
406 })?))
407 }
408
409 pub fn retain_entries(&mut self, mut predicate: impl FnMut(u32) -> bool) -> bool {
410 let old_len = self.entries.len();
411 self.entries.retain(|entry| {
412 if predicate(entry.sst_data.sequence_number) {
413 true
414 } else {
415 self.obsolete_entries.push(entry.sst_data.sequence_number);
416 false
417 }
418 });
419 old_len != self.entries.len()
420 }
421
422 pub fn obsolete_entries(&self) -> &[u32] {
423 &self.obsolete_entries
424 }
425
426 pub fn has_active_entries(&self) -> bool {
427 !self.entries.is_empty()
428 }
429
430 pub fn obsolete_sst_files(&self) -> &[u32] {
431 &self.obsolete_sst_files
432 }
433
434 pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
440 &self,
441 key_family: u32,
442 key_hash: u64,
443 key: &K,
444 key_block_cache: &BlockCache,
445 value_block_cache: &BlockCache,
446 ) -> Result<MetaLookupResult> {
447 if key_family != self.family {
448 return Ok(MetaLookupResult::FamilyMiss);
449 }
450 let mut miss_result = MetaLookupResult::RangeMiss;
451 let mut all_results: SmallVec<[LookupValue; 1]> = SmallVec::new();
452
453 for entry in self.entries.iter().rev() {
454 if key_hash < entry.min_hash || key_hash > entry.max_hash {
455 continue;
456 }
457 if !entry.amqf.contains_fingerprint(key_hash) {
458 miss_result = MetaLookupResult::QuickFilterMiss;
459 continue;
460 }
461
462 let result = entry.sst(self)?.lookup::<K, FIND_ALL>(
463 key_hash,
464 key,
465 key_block_cache,
466 value_block_cache,
467 )?;
468
469 match result {
470 SstLookupResult::NotFound => {
471 }
473 SstLookupResult::Found(values) => {
474 if !FIND_ALL {
475 return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(values)));
477 }
478 let has_tombstone = values.last().is_some_and(|v| *v == LookupValue::Deleted);
482 all_results.extend(values);
483 if has_tombstone {
484 return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
485 all_results,
486 )));
487 }
488 }
489 }
490 }
491
492 if FIND_ALL && !all_results.is_empty() {
493 return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
494 all_results,
495 )));
496 }
497
498 Ok(miss_result)
499 }
500
501 pub fn batch_lookup<K: QueryKey>(
502 &self,
503 key_family: u32,
504 keys: &[K],
505 cells: &mut [(u64, usize, Option<LookupValue>)],
506 empty_cells: &mut usize,
507 key_block_cache: &BlockCache,
508 value_block_cache: &BlockCache,
509 ) -> Result<MetaBatchLookupResult> {
510 if key_family != self.family {
511 #[cfg(feature = "stats")]
512 return Ok(MetaBatchLookupResult {
513 family_miss: true,
514 ..Default::default()
515 });
516 #[cfg(not(feature = "stats"))]
517 return Ok(MetaBatchLookupResult {});
518 }
519 debug_assert!(
520 cells.is_sorted_by_key(|(hash, _, _)| *hash),
521 "Cells must be sorted by key hash"
522 );
523 #[allow(unused_mut, reason = "It's used when stats are enabled")]
524 let mut lookup_result = MetaBatchLookupResult::default();
525 for entry in self.entries.iter().rev() {
526 let start_index = cells
527 .binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater))
528 .err()
529 .unwrap();
530 if start_index >= cells.len() {
531 #[cfg(feature = "stats")]
532 {
533 lookup_result.range_misses += 1;
534 }
535 continue;
536 }
537 let end_index = cells
538 .binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less))
539 .err()
540 .unwrap()
541 .checked_sub(1);
542 let Some(end_index) = end_index else {
543 #[cfg(feature = "stats")]
544 {
545 lookup_result.range_misses += 1;
546 }
547 continue;
548 };
549 if start_index > end_index {
550 #[cfg(feature = "stats")]
551 {
552 lookup_result.range_misses += 1;
553 }
554 continue;
555 }
556 for (hash, index, result) in &mut cells[start_index..=end_index] {
557 debug_assert!(
558 *hash >= entry.min_hash && *hash <= entry.max_hash,
559 "Key hash out of range"
560 );
561 if result.is_some() {
562 continue;
563 }
564 if !entry.amqf.contains_fingerprint(*hash) {
565 #[cfg(feature = "stats")]
566 {
567 lookup_result.quick_filter_misses += 1;
568 }
569 continue;
570 }
571 let sst_result = entry.sst(self)?.lookup::<_, false>(
572 *hash,
573 &keys[*index],
574 key_block_cache,
575 value_block_cache,
576 )?;
577 if let SstLookupResult::Found(mut values) = sst_result {
578 debug_assert!(values.len() == 1);
580 let Some(value) = values.pop() else {
581 unreachable!()
582 };
583 *result = Some(value);
584 *empty_cells -= 1;
585 #[cfg(feature = "stats")]
586 {
587 lookup_result.hits += 1;
588 }
589 if *empty_cells == 0 {
590 return Ok(lookup_result);
591 }
592 } else {
593 #[cfg(feature = "stats")]
594 {
595 lookup_result.sst_misses += 1;
596 }
597 }
598 }
599 }
600 Ok(lookup_result)
601 }
602}