1use std::{
2 cmp::Ordering,
3 fmt::Display,
4 fs::File,
5 io::{BufReader, Seek},
6 ops::Deref,
7 path::{Path, PathBuf},
8 sync::OnceLock,
9};
10
11use anyhow::{Context, Result, bail};
12use bincode::{Decode, Encode};
13use bitfield::bitfield;
14use byteorder::{BE, ReadBytesExt};
15use memmap2::{Mmap, MmapOptions};
16use smallvec::SmallVec;
17use turbo_bincode::turbo_bincode_decode;
18
19use crate::{
20 QueryKey,
21 lookup_entry::LookupValue,
22 mmap_helper::advise_mmap_for_persistence,
23 static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData},
24};
25
26bitfield! {
27 #[derive(Clone, Copy, Default)]
28 pub struct MetaEntryFlags(u32);
29 impl Debug;
30 impl From<u32>;
31 pub cold, set_cold: 0;
33 pub fresh, set_fresh: 1;
35}
36
37impl MetaEntryFlags {
38 pub const FRESH: MetaEntryFlags = MetaEntryFlags(0b10);
39 pub const COLD: MetaEntryFlags = MetaEntryFlags(0b01);
40 pub const WARM: MetaEntryFlags = MetaEntryFlags(0b00);
41}
42
43impl Display for MetaEntryFlags {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 if self.fresh() {
46 f.pad_integral(true, "", "fresh")
47 } else if self.cold() {
48 f.pad_integral(true, "", "cold")
49 } else {
50 f.pad_integral(true, "", "warm")
51 }
52 }
53}
54
55#[derive(Encode, Decode)]
57pub struct AmqfBincodeWrapper(
58 #[bincode(with = "turbo_bincode::serde_self_describing")] pub qfilter::Filter,
61);
62
63pub struct MetaEntry {
64 sst_data: StaticSortedFileMetaData,
66 family: u32,
68 min_hash: u64,
70 max_hash: u64,
72 size: u64,
74 flags: MetaEntryFlags,
76 start_of_amqf_data_offset: u32,
79 end_of_amqf_data_offset: u32,
82 amqf: OnceLock<qfilter::Filter>,
85 sst: OnceLock<StaticSortedFile>,
87}
88
89impl MetaEntry {
90 pub fn sequence_number(&self) -> u32 {
91 self.sst_data.sequence_number
92 }
93
94 pub fn size(&self) -> u64 {
95 self.size
96 }
97
98 pub fn flags(&self) -> MetaEntryFlags {
99 self.flags
100 }
101
102 pub fn amqf_size(&self) -> u32 {
103 self.end_of_amqf_data_offset - self.start_of_amqf_data_offset
104 }
105
106 pub fn raw_amqf<'l>(&self, amqf_data: &'l [u8]) -> &'l [u8] {
107 amqf_data
108 .get(self.start_of_amqf_data_offset as usize..self.end_of_amqf_data_offset as usize)
109 .expect("AMQF data out of bounds")
110 }
111
112 pub fn deserialize_amqf(&self, meta: &MetaFile) -> Result<qfilter::Filter> {
113 let amqf = self.raw_amqf(meta.amqf_data());
114 Ok(turbo_bincode_decode::<AmqfBincodeWrapper>(amqf)
115 .with_context(|| {
116 format!(
117 "Failed to deserialize AMQF from {:08}.meta for {:08}.sst",
118 meta.sequence_number,
119 self.sequence_number()
120 )
121 })?
122 .0)
123 }
124
125 pub fn amqf(&self, meta: &MetaFile) -> Result<impl Deref<Target = qfilter::Filter>> {
126 self.amqf.get_or_try_init(|| {
127 let amqf = self.deserialize_amqf(meta)?;
128 anyhow::Ok(amqf)
129 })
130 }
131
132 pub fn sst(&self, meta: &MetaFile) -> Result<&StaticSortedFile> {
133 self.sst.get_or_try_init(|| {
134 StaticSortedFile::open(&meta.db_path, self.sst_data).with_context(|| {
135 format!(
136 "Unable to open static sorted file referenced from {:08}.meta",
137 meta.sequence_number()
138 )
139 })
140 })
141 }
142
143 pub fn range(&self) -> StaticSortedFileRange {
145 StaticSortedFileRange {
146 family: self.family,
147 min_hash: self.min_hash,
148 max_hash: self.max_hash,
149 }
150 }
151
152 pub fn min_hash(&self) -> u64 {
153 self.min_hash
154 }
155
156 pub fn max_hash(&self) -> u64 {
157 self.max_hash
158 }
159
160 pub fn block_count(&self) -> u16 {
161 self.sst_data.block_count
162 }
163
164 pub fn sst_metadata(&self) -> StaticSortedFileMetaData {
167 self.sst_data
168 }
169}
170
171pub enum MetaLookupResult {
173 FamilyMiss,
175 RangeMiss,
178 QuickFilterMiss,
180 SstLookup(SstLookupResult),
182}
183
184#[derive(Default)]
186pub struct MetaBatchLookupResult {
187 #[cfg(feature = "stats")]
189 pub family_miss: bool,
190 #[cfg(feature = "stats")]
193 pub range_misses: usize,
194 #[cfg(feature = "stats")]
196 pub quick_filter_misses: usize,
197 #[cfg(feature = "stats")]
199 pub sst_misses: usize,
200 #[cfg(feature = "stats")]
202 pub hits: usize,
203}
204
205#[derive(Clone, Copy)]
207pub struct StaticSortedFileRange {
208 pub family: u32,
209 pub min_hash: u64,
210 pub max_hash: u64,
211}
212
213pub struct MetaFile {
214 db_path: PathBuf,
216 sequence_number: u32,
218 family: u32,
220 entries: Vec<MetaEntry>,
222 obsolete_entries: Vec<u32>,
224 obsolete_sst_files: Vec<u32>,
226 start_of_used_keys_amqf_data_offset: u32,
229 end_of_used_keys_amqf_data_offset: u32,
232 mmap: Mmap,
234}
235
236impl MetaFile {
237 pub fn open(db_path: &Path, sequence_number: u32) -> Result<Self> {
240 let filename = format!("{sequence_number:08}.meta");
241 let path = db_path.join(&filename);
242 Self::open_internal(db_path.to_path_buf(), sequence_number, &path)
243 .with_context(|| format!("Unable to open meta file {filename}"))
244 }
245
246 fn open_internal(db_path: PathBuf, sequence_number: u32, path: &Path) -> Result<Self> {
247 let mut file = BufReader::new(File::open(path)?);
248 let magic = file.read_u32::<BE>()?;
249 if magic != 0xFE4ADA4A {
250 bail!("Invalid magic number");
251 }
252 let family = file.read_u32::<BE>()?;
253 let obsolete_count = file.read_u32::<BE>()?;
254 let mut obsolete_sst_files = Vec::with_capacity(obsolete_count as usize);
255 for _ in 0..obsolete_count {
256 let obsolete_sst = file.read_u32::<BE>()?;
257 obsolete_sst_files.push(obsolete_sst);
258 }
259 let count = file.read_u32::<BE>()?;
260 let mut entries = Vec::with_capacity(count as usize);
261 let mut start_of_amqf_data_offset = 0;
262 for _ in 0..count {
263 let entry = MetaEntry {
264 sst_data: StaticSortedFileMetaData {
265 sequence_number: file.read_u32::<BE>()?,
266 block_count: file.read_u16::<BE>()?,
267 },
268 family,
269 min_hash: file.read_u64::<BE>()?,
270 max_hash: file.read_u64::<BE>()?,
271 size: file.read_u64::<BE>()?,
272 flags: MetaEntryFlags(file.read_u32::<BE>()?),
273 start_of_amqf_data_offset,
274 end_of_amqf_data_offset: file.read_u32::<BE>()?,
275 amqf: OnceLock::new(),
276 sst: OnceLock::new(),
277 };
278 start_of_amqf_data_offset = entry.end_of_amqf_data_offset;
279 entries.push(entry);
280 }
281 let start_of_used_keys_amqf_data_offset = start_of_amqf_data_offset;
282 let end_of_used_keys_amqf_data_offset = file.read_u32::<BE>()?;
283
284 let offset = file.stream_position()?;
285 let file = file.into_inner();
286 let mut options = MmapOptions::new();
287 options.offset(offset);
288 let mmap = unsafe { options.map(&file) }
289 .with_context(|| format!("Failed to mmap meta file {}", path.display()))?;
290 #[cfg(unix)]
291 mmap.advise(memmap2::Advice::Random)?;
292 advise_mmap_for_persistence(&mmap)?;
293 let file = Self {
294 db_path,
295 sequence_number,
296 family,
297 entries,
298 obsolete_entries: Vec::new(),
299 obsolete_sst_files,
300 start_of_used_keys_amqf_data_offset,
301 end_of_used_keys_amqf_data_offset,
302 mmap,
303 };
304 Ok(file)
305 }
306
307 pub fn clear_cache(&mut self) {
308 for entry in self.entries.iter_mut() {
309 entry.amqf.take();
310 entry.sst.take();
311 }
312 }
313
314 pub fn prepare_sst_cache(&self) {
315 for entry in self.entries.iter() {
316 let _ = entry.sst(self);
317 let _ = entry.amqf(self);
318 }
319 }
320
321 pub fn sequence_number(&self) -> u32 {
322 self.sequence_number
323 }
324
325 pub fn family(&self) -> u32 {
326 self.family
327 }
328
329 pub fn entries(&self) -> &[MetaEntry] {
330 &self.entries
331 }
332
333 pub fn entry(&self, index: u32) -> &MetaEntry {
334 let index = index as usize;
335 &self.entries[index]
336 }
337
338 pub fn amqf_data(&self) -> &[u8] {
339 &self.mmap
340 }
341
342 pub fn deserialize_used_key_hashes_amqf(&self) -> Result<Option<qfilter::Filter>> {
343 if self.start_of_used_keys_amqf_data_offset == self.end_of_used_keys_amqf_data_offset {
344 return Ok(None);
345 }
346 let amqf = &self.amqf_data()[self.start_of_used_keys_amqf_data_offset as usize
347 ..self.end_of_used_keys_amqf_data_offset as usize];
348 Ok(Some(pot::from_slice(amqf).with_context(|| {
349 format!(
350 "Failed to deserialize used key hashes AMQF from {:08}.meta",
351 self.sequence_number
352 )
353 })?))
354 }
355
356 pub fn retain_entries(&mut self, mut predicate: impl FnMut(u32) -> bool) -> bool {
357 let old_len = self.entries.len();
358 self.entries.retain(|entry| {
359 if predicate(entry.sst_data.sequence_number) {
360 true
361 } else {
362 self.obsolete_entries.push(entry.sst_data.sequence_number);
363 false
364 }
365 });
366 old_len != self.entries.len()
367 }
368
369 pub fn obsolete_entries(&self) -> &[u32] {
370 &self.obsolete_entries
371 }
372
373 pub fn has_active_entries(&self) -> bool {
374 !self.entries.is_empty()
375 }
376
377 pub fn obsolete_sst_files(&self) -> &[u32] {
378 &self.obsolete_sst_files
379 }
380
381 pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
387 &self,
388 key_family: u32,
389 key_hash: u64,
390 key: &K,
391 key_block_cache: &BlockCache,
392 value_block_cache: &BlockCache,
393 ) -> Result<MetaLookupResult> {
394 if key_family != self.family {
395 return Ok(MetaLookupResult::FamilyMiss);
396 }
397 let mut miss_result = MetaLookupResult::RangeMiss;
398 let mut all_results: SmallVec<[LookupValue; 1]> = SmallVec::new();
399
400 for entry in self.entries.iter().rev() {
401 if key_hash < entry.min_hash || key_hash > entry.max_hash {
402 continue;
403 }
404 let amqf = entry.amqf(self)?;
405 if !amqf.contains_fingerprint(key_hash) {
406 miss_result = MetaLookupResult::QuickFilterMiss;
407 continue;
408 }
409
410 let result = entry.sst(self)?.lookup::<K, FIND_ALL>(
411 key_hash,
412 key,
413 key_block_cache,
414 value_block_cache,
415 )?;
416
417 match result {
418 SstLookupResult::NotFound => {
419 }
421 SstLookupResult::Found(values) => {
422 if !FIND_ALL {
423 return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(values)));
425 }
426 let has_tombstone = values.last().is_some_and(|v| *v == LookupValue::Deleted);
430 all_results.extend(values);
431 if has_tombstone {
432 return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
433 all_results,
434 )));
435 }
436 }
437 }
438 }
439
440 if FIND_ALL && !all_results.is_empty() {
441 return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
442 all_results,
443 )));
444 }
445
446 Ok(miss_result)
447 }
448
449 pub fn batch_lookup<K: QueryKey>(
450 &self,
451 key_family: u32,
452 keys: &[K],
453 cells: &mut [(u64, usize, Option<LookupValue>)],
454 empty_cells: &mut usize,
455 key_block_cache: &BlockCache,
456 value_block_cache: &BlockCache,
457 ) -> Result<MetaBatchLookupResult> {
458 if key_family != self.family {
459 #[cfg(feature = "stats")]
460 return Ok(MetaBatchLookupResult {
461 family_miss: true,
462 ..Default::default()
463 });
464 #[cfg(not(feature = "stats"))]
465 return Ok(MetaBatchLookupResult {});
466 }
467 debug_assert!(
468 cells.is_sorted_by_key(|(hash, _, _)| *hash),
469 "Cells must be sorted by key hash"
470 );
471 #[allow(unused_mut, reason = "It's used when stats are enabled")]
472 let mut lookup_result = MetaBatchLookupResult::default();
473 for entry in self.entries.iter().rev() {
474 let start_index = cells
475 .binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater))
476 .err()
477 .unwrap();
478 if start_index >= cells.len() {
479 #[cfg(feature = "stats")]
480 {
481 lookup_result.range_misses += 1;
482 }
483 continue;
484 }
485 let end_index = cells
486 .binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less))
487 .err()
488 .unwrap()
489 .checked_sub(1);
490 let Some(end_index) = end_index else {
491 #[cfg(feature = "stats")]
492 {
493 lookup_result.range_misses += 1;
494 }
495 continue;
496 };
497 if start_index > end_index {
498 #[cfg(feature = "stats")]
499 {
500 lookup_result.range_misses += 1;
501 }
502 continue;
503 }
504 let amqf = entry.amqf(self)?;
505 for (hash, index, result) in &mut cells[start_index..=end_index] {
506 debug_assert!(
507 *hash >= entry.min_hash && *hash <= entry.max_hash,
508 "Key hash out of range"
509 );
510 if result.is_some() {
511 continue;
512 }
513 if !amqf.contains_fingerprint(*hash) {
514 #[cfg(feature = "stats")]
515 {
516 lookup_result.quick_filter_misses += 1;
517 }
518 continue;
519 }
520 let sst_result = entry.sst(self)?.lookup::<_, false>(
521 *hash,
522 &keys[*index],
523 key_block_cache,
524 value_block_cache,
525 )?;
526 if let SstLookupResult::Found(mut values) = sst_result {
527 debug_assert!(values.len() == 1);
529 let Some(value) = values.pop() else {
530 unreachable!()
531 };
532 *result = Some(value);
533 *empty_cells -= 1;
534 #[cfg(feature = "stats")]
535 {
536 lookup_result.hits += 1;
537 }
538 if *empty_cells == 0 {
539 return Ok(lookup_result);
540 }
541 } else {
542 #[cfg(feature = "stats")]
543 {
544 lookup_result.sst_misses += 1;
545 }
546 }
547 }
548 }
549 Ok(lookup_result)
550 }
551}