Skip to main content

turbo_persistence/
meta_file.rs

1use std::{
2    cmp::Ordering,
3    fmt::Display,
4    fs::File,
5    path::{Path, PathBuf},
6    sync::OnceLock,
7};
8
9use anyhow::{Context, Result, bail};
10use bitfield::bitfield;
11use byteorder::{BE, ReadBytesExt};
12use memmap2::{Mmap, MmapOptions};
13use smallvec::SmallVec;
14use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, big_endian as be};
15
16use crate::{
17    QueryKey,
18    lookup_entry::LookupValue,
19    mmap_helper::advise_mmap_for_persistence,
20    static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData},
21};
22
23bitfield! {
24    #[derive(Clone, Copy, Default)]
25    pub struct MetaEntryFlags(u32);
26    impl Debug;
27    impl From<u32>;
28    /// The SST file was compacted and none of the entries have been accessed recently.
29    pub cold, set_cold: 0;
30    /// The SST file was freshly written and has not been compacted yet.
31    pub fresh, set_fresh: 1;
32}
33
34impl MetaEntryFlags {
35    pub const FRESH: MetaEntryFlags = MetaEntryFlags(0b10);
36    pub const COLD: MetaEntryFlags = MetaEntryFlags(0b01);
37    pub const WARM: MetaEntryFlags = MetaEntryFlags(0b00);
38}
39
40impl Display for MetaEntryFlags {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        if self.fresh() {
43            f.pad_integral(true, "", "fresh")
44        } else if self.cold() {
45            f.pad_integral(true, "", "cold")
46        } else {
47            f.pad_integral(true, "", "warm")
48        }
49    }
50}
51
52/// On-disk layout of a single entry header in the `.meta` file.
53///
54/// Fields are big-endian to match the existing wire format written by [`MetaFileBuilder`].
55#[repr(C, packed)]
56#[derive(FromBytes, IntoBytes, Immutable, KnownLayout, Clone, Copy)]
57pub(crate) struct EntryHeader {
58    sequence_number: be::U32,
59    block_count: be::U16,
60    min_hash: be::U64,
61    max_hash: be::U64,
62    size: be::U64,
63    flags: be::U32,
64    amqf_end_offset: be::U32,
65}
66
67impl EntryHeader {
68    pub(crate) fn new(
69        sequence_number: u32,
70        block_count: u16,
71        min_hash: u64,
72        max_hash: u64,
73        size: u64,
74        flags: MetaEntryFlags,
75        amqf_end_offset: u32,
76    ) -> Self {
77        Self {
78            sequence_number: be::U32::new(sequence_number),
79            block_count: be::U16::new(block_count),
80            min_hash: be::U64::new(min_hash),
81            max_hash: be::U64::new(max_hash),
82            size: be::U64::new(size),
83            flags: be::U32::new(flags.0),
84            amqf_end_offset: be::U32::new(amqf_end_offset),
85        }
86    }
87}
88
89/// # Safety
90///
91/// `MetaEntry` stores a `FilterRef<'static>` with a transmuted lifetime that actually borrows
92/// from the parent [`MetaFile`]'s mmap. This is safe because entries are only accessed by
93/// reference through `MetaFile` and are never moved out.
94///
95/// For this reason this type should not implement Clone or Copy.
96pub struct MetaEntry {
97    /// The metadata for the static sorted file.
98    sst_data: StaticSortedFileMetaData,
99    /// The key family of the SST file.
100    family: u32,
101    /// The minimum hash value of the keys in the SST file.
102    min_hash: u64,
103    /// The maximum hash value of the keys in the SST file.
104    max_hash: u64,
105    /// The size of the SST file in bytes.
106    size: u64,
107    /// The status flags for this entry.
108    flags: MetaEntryFlags,
109    /// Byte offset range of the raw AMQF data within the mmap, used for carrying forward
110    /// serialized bytes during compaction without re-serializing.
111    amqf_data_offset: std::ops::Range<u32>,
112    /// The AMQF filter for this file, eagerly deserialized as a zero-copy [`qfilter::FilterRef`]
113    /// that borrows directly from the parent [`MetaFile`]'s memory-mapped file.
114    ///
115    /// The `'static` lifetime is transmuted — the actual borrow is from `MetaFile::mmap`.
116    amqf: qfilter::FilterRef<'static>,
117    /// The static sorted file that is lazily loaded
118    sst: OnceLock<StaticSortedFile>,
119}
120
121// Safety: FilterRef is a read-only view into the mmap which is Send+Sync.
122unsafe impl Send for MetaEntry {}
123unsafe impl Sync for MetaEntry {}
124
125impl MetaEntry {
126    pub fn sequence_number(&self) -> u32 {
127        self.sst_data.sequence_number
128    }
129
130    pub fn size(&self) -> u64 {
131        self.size
132    }
133
134    pub fn flags(&self) -> MetaEntryFlags {
135        self.flags
136    }
137
138    pub fn amqf_size(&self) -> u32 {
139        self.amqf_data_offset.end - self.amqf_data_offset.start
140    }
141
142    /// Returns the raw serialized AMQF bytes from the mmap.
143    pub fn raw_amqf<'l>(&self, amqf_data: &'l [u8]) -> &'l [u8] {
144        &amqf_data[self.amqf_data_offset.start as usize..self.amqf_data_offset.end as usize]
145    }
146
147    fn sst(&self, meta: &MetaFile) -> Result<&StaticSortedFile> {
148        self.sst.get_or_try_init(|| {
149            StaticSortedFile::open(&meta.db_path, self.sst_data).with_context(|| {
150                format!(
151                    "Unable to open static sorted file referenced from {:08}.meta",
152                    meta.sequence_number()
153                )
154            })
155        })
156    }
157
158    /// Returns the key family and hash range of this file.
159    pub fn range(&self) -> StaticSortedFileRange {
160        StaticSortedFileRange {
161            family: self.family,
162            min_hash: self.min_hash,
163            max_hash: self.max_hash,
164        }
165    }
166
167    pub fn min_hash(&self) -> u64 {
168        self.min_hash
169    }
170
171    pub fn max_hash(&self) -> u64 {
172        self.max_hash
173    }
174
175    pub fn block_count(&self) -> u16 {
176        self.sst_data.block_count
177    }
178
179    /// Returns the SST metadata needed to open the file independently.
180    /// Used during compaction to avoid caching mmaps on the MetaEntry.
181    pub fn sst_metadata(&self) -> StaticSortedFileMetaData {
182        self.sst_data
183    }
184}
185
186/// The result of a lookup operation.
187pub enum MetaLookupResult {
188    /// The key was not found because it is from a different key family.
189    FamilyMiss,
190    /// The key was not found because it is out of the range of this SST file. But it was the
191    /// correct key family.
192    RangeMiss,
193    /// The key was not found because it was not in the AMQF filter. But it was in the range.
194    QuickFilterMiss,
195    /// The key was looked up in the SST file. It was in the AMQF filter.
196    SstLookup(SstLookupResult),
197}
198
199/// The result of a batch lookup operation.
200#[derive(Default)]
201pub struct MetaBatchLookupResult {
202    /// The key was not found because it is from a different key family.
203    #[cfg(feature = "stats")]
204    pub family_miss: bool,
205    /// The key was not found because it is out of the range of this SST file. But it was the
206    /// correct key family.
207    #[cfg(feature = "stats")]
208    pub range_misses: usize,
209    /// The key was not found because it was not in the AMQF filter. But it was in the range.
210    #[cfg(feature = "stats")]
211    pub quick_filter_misses: usize,
212    /// The key was unsuccessfully looked up in the SST file. It was in the AMQF filter.
213    #[cfg(feature = "stats")]
214    pub sst_misses: usize,
215    /// The key was found in the SST file.
216    #[cfg(feature = "stats")]
217    pub hits: usize,
218}
219
220/// The key family and hash range of an SST file.
221#[derive(Clone, Copy)]
222pub struct StaticSortedFileRange {
223    pub family: u32,
224    pub min_hash: u64,
225    pub max_hash: u64,
226}
227
228/// # Safety
229///
230/// `entries` **must** be declared before `mmap` so that Rust's field drop order (declaration
231/// order) drops all `FilterRef`s before the mmap is unmapped.  Reordering these fields would
232/// be unsound.
233pub struct MetaFile {
234    /// The database path
235    db_path: PathBuf,
236    /// The sequence number of this file.
237    sequence_number: u32,
238    /// The key family of the SST files in this meta file.
239    family: u32,
240    /// The entries of the file. Dropped before `mmap` (field declaration order).
241    entries: Vec<MetaEntry>,
242    /// The entries that have been marked as obsolete.
243    obsolete_entries: Vec<u32>,
244    /// The obsolete SST files.
245    obsolete_sst_files: Vec<u32>,
246    /// Byte offset within the mmap where the AMQF data region starts (i.e. the header length).
247    /// Entry AMQF offsets and used-keys offsets are relative to this position.
248    amqf_data_start: u32,
249    /// The offset of the start of the "used keys" AMQF data relative to the AMQF data region.
250    start_of_used_keys_amqf_data_offset: u32,
251    /// The offset of the end of the "used keys" AMQF data relative to the AMQF data region.
252    end_of_used_keys_amqf_data_offset: u32,
253    /// The memory mapped file.
254    /// The entire memory-mapped file. Must be the last field that matters for drop order —
255    /// `entries` contains `FilterRef`s that borrow from this mmap.
256    mmap: Mmap,
257}
258
259impl MetaFile {
260    /// Opens a meta file at the given path. Memory maps the entire file and eagerly deserializes
261    /// all AMQF filters as zero-copy [`qfilter::FilterRef`]s that borrow from the mmap.
262    pub fn open(db_path: &Path, sequence_number: u32) -> Result<Self> {
263        let filename = format!("{sequence_number:08}.meta");
264        let path = db_path.join(&filename);
265        Self::open_internal(db_path.to_path_buf(), sequence_number, &path)
266            .with_context(|| format!("Unable to open meta file {filename}"))
267    }
268
269    fn open_internal(db_path: PathBuf, sequence_number: u32, path: &Path) -> Result<Self> {
270        let file = File::open(path).context("Failed to open meta file")?;
271        let mmap = unsafe { MmapOptions::new().map(&file) }.context("Failed to mmap")?;
272        #[cfg(unix)]
273        mmap.advise(memmap2::Advice::Random)
274            .context("Failed to advise mmap")?;
275        advise_mmap_for_persistence(&mmap)?;
276        // Parse the header from the mmap via ReadBytesExt on &[u8].
277        let mut reader: &[u8] = &mmap;
278        let magic = reader.read_u32::<BE>()?;
279        if magic != 0xFE4ADA4A {
280            bail!("Invalid magic number");
281        }
282        let family = reader.read_u32::<BE>()?;
283        let obsolete_count = reader.read_u32::<BE>()?;
284        let mut obsolete_sst_files = Vec::with_capacity(obsolete_count as usize);
285        for _ in 0..obsolete_count {
286            obsolete_sst_files.push(reader.read_u32::<BE>()?);
287        }
288
289        let count = reader.read_u32::<BE>()?;
290
291        // Compute where the AMQF data region starts so we can deserialize filters inline.
292        // Remaining header: count * ENTRY_HEADER_SIZE + used_keys_end_offset.
293        let header_so_far = (mmap.len() - reader.len()) as u32;
294        let amqf_data_start =
295            header_so_far + count * (size_of::<EntryHeader>() as u32) + size_of::<u32>() as u32;
296        let amqf_data = &mmap[amqf_data_start as usize..];
297
298        // Parse entries and eagerly deserialize AMQF filters as zero-copy FilterRefs.
299        let mut entries = Vec::with_capacity(count as usize);
300        let mut start_of_amqf_data_offset: u32 = 0;
301        for _ in 0..count {
302            let (header, rest): (Ref<&[u8], EntryHeader>, _) = Ref::from_prefix(reader)
303                .ok()
304                .context("Entry header out of bounds")?;
305            reader = rest;
306            let sst_data = StaticSortedFileMetaData {
307                sequence_number: header.sequence_number.get(),
308                block_count: header.block_count.get(),
309            };
310            let min_hash = header.min_hash.get();
311            let max_hash = header.max_hash.get();
312            let size = header.size.get();
313            let flags = MetaEntryFlags(header.flags.get());
314            let end_of_amqf_data_offset = header.amqf_end_offset.get();
315
316            let amqf_bytes = amqf_data
317                .get(start_of_amqf_data_offset as usize..end_of_amqf_data_offset as usize)
318                .expect("AMQF data out of bounds");
319            // Deserialize the filter borrowing from the mmap, then erase the lifetime.
320            let amqf: qfilter::FilterRef<'_> =
321                postcard::from_bytes(amqf_bytes).with_context(|| {
322                    format!(
323                        "Failed to deserialize AMQF from {:08}.meta for {:08}.sst",
324                        sequence_number, sst_data.sequence_number
325                    )
326                })?;
327            // Safety: the mmap is kept alive by MetaFile and is dropped after entries (field
328            // declaration order), so the borrow remains valid for the lifetime of the MetaEntry.
329            let amqf: qfilter::FilterRef<'static> = unsafe { std::mem::transmute(amqf) };
330
331            entries.push(MetaEntry {
332                sst_data,
333                family,
334                min_hash,
335                max_hash,
336                size,
337                flags,
338                amqf_data_offset: start_of_amqf_data_offset..end_of_amqf_data_offset,
339                amqf,
340                sst: OnceLock::new(),
341            });
342            start_of_amqf_data_offset = end_of_amqf_data_offset;
343        }
344
345        let start_of_used_keys_amqf_data_offset = start_of_amqf_data_offset;
346        let end_of_used_keys_amqf_data_offset = reader.read_u32::<BE>()?;
347
348        Ok(Self {
349            db_path,
350            sequence_number,
351            family,
352            entries,
353            obsolete_entries: Vec::new(),
354            obsolete_sst_files,
355            amqf_data_start,
356            start_of_used_keys_amqf_data_offset,
357            end_of_used_keys_amqf_data_offset,
358            mmap,
359        })
360    }
361
362    pub fn clear_cache(&mut self) {
363        for entry in self.entries.iter_mut() {
364            entry.sst.take();
365        }
366    }
367
368    pub fn prepare_sst_cache(&self) {
369        for entry in self.entries.iter() {
370            let _ = entry.sst(self);
371        }
372    }
373
374    pub fn sequence_number(&self) -> u32 {
375        self.sequence_number
376    }
377
378    pub fn family(&self) -> u32 {
379        self.family
380    }
381
382    pub fn entries(&self) -> &[MetaEntry] {
383        &self.entries
384    }
385
386    pub fn entry(&self, index: u32) -> &MetaEntry {
387        let index = index as usize;
388        &self.entries[index]
389    }
390
391    pub fn amqf_data(&self) -> &[u8] {
392        &self.mmap[self.amqf_data_start as usize..]
393    }
394
395    pub fn deserialize_used_key_hashes_amqf(&self) -> Result<Option<qfilter::FilterRef<'_>>> {
396        if self.start_of_used_keys_amqf_data_offset == self.end_of_used_keys_amqf_data_offset {
397            return Ok(None);
398        }
399        let amqf = &self.amqf_data()[self.start_of_used_keys_amqf_data_offset as usize
400            ..self.end_of_used_keys_amqf_data_offset as usize];
401        Ok(Some(postcard::from_bytes(amqf).with_context(|| {
402            format!(
403                "Failed to deserialize used key hashes AMQF from {:08}.meta",
404                self.sequence_number
405            )
406        })?))
407    }
408
409    pub fn retain_entries(&mut self, mut predicate: impl FnMut(u32) -> bool) -> bool {
410        let old_len = self.entries.len();
411        self.entries.retain(|entry| {
412            if predicate(entry.sst_data.sequence_number) {
413                true
414            } else {
415                self.obsolete_entries.push(entry.sst_data.sequence_number);
416                false
417            }
418        });
419        old_len != self.entries.len()
420    }
421
422    pub fn obsolete_entries(&self) -> &[u32] {
423        &self.obsolete_entries
424    }
425
426    pub fn has_active_entries(&self) -> bool {
427        !self.entries.is_empty()
428    }
429
430    pub fn obsolete_sst_files(&self) -> &[u32] {
431        &self.obsolete_sst_files
432    }
433
434    /// Looks up a key in this meta file.
435    ///
436    /// If `FIND_ALL` is false, returns after finding the first match.
437    /// If `FIND_ALL` is true, returns all entries with the same key from all SST files
438    /// (useful for keyspaces where keys are hashes and collisions are possible).
439    pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
440        &self,
441        key_family: u32,
442        key_hash: u64,
443        key: &K,
444        key_block_cache: &BlockCache,
445        value_block_cache: &BlockCache,
446    ) -> Result<MetaLookupResult> {
447        if key_family != self.family {
448            return Ok(MetaLookupResult::FamilyMiss);
449        }
450        let mut miss_result = MetaLookupResult::RangeMiss;
451        let mut all_results: SmallVec<[LookupValue; 1]> = SmallVec::new();
452
453        for entry in self.entries.iter().rev() {
454            if key_hash < entry.min_hash || key_hash > entry.max_hash {
455                continue;
456            }
457            if !entry.amqf.contains_fingerprint(key_hash) {
458                miss_result = MetaLookupResult::QuickFilterMiss;
459                continue;
460            }
461
462            let result = entry.sst(self)?.lookup::<K, FIND_ALL>(
463                key_hash,
464                key,
465                key_block_cache,
466                value_block_cache,
467            )?;
468
469            match result {
470                SstLookupResult::NotFound => {
471                    // continue searching other sst files
472                }
473                SstLookupResult::Found(values) => {
474                    if !FIND_ALL {
475                        // Return immediately with the first result
476                        return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(values)));
477                    }
478                    // Check for tombstone — stops search across older SSTs within this meta file.
479                    // Since tombstones sort last within a key group, if the last value is Deleted,
480                    // we have a tombstone.
481                    let has_tombstone = values.last().is_some_and(|v| *v == LookupValue::Deleted);
482                    all_results.extend(values);
483                    if has_tombstone {
484                        return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
485                            all_results,
486                        )));
487                    }
488                }
489            }
490        }
491
492        if FIND_ALL && !all_results.is_empty() {
493            return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
494                all_results,
495            )));
496        }
497
498        Ok(miss_result)
499    }
500
501    pub fn batch_lookup<K: QueryKey>(
502        &self,
503        key_family: u32,
504        keys: &[K],
505        cells: &mut [(u64, usize, Option<LookupValue>)],
506        empty_cells: &mut usize,
507        key_block_cache: &BlockCache,
508        value_block_cache: &BlockCache,
509    ) -> Result<MetaBatchLookupResult> {
510        if key_family != self.family {
511            #[cfg(feature = "stats")]
512            return Ok(MetaBatchLookupResult {
513                family_miss: true,
514                ..Default::default()
515            });
516            #[cfg(not(feature = "stats"))]
517            return Ok(MetaBatchLookupResult {});
518        }
519        debug_assert!(
520            cells.is_sorted_by_key(|(hash, _, _)| *hash),
521            "Cells must be sorted by key hash"
522        );
523        #[allow(unused_mut, reason = "It's used when stats are enabled")]
524        let mut lookup_result = MetaBatchLookupResult::default();
525        for entry in self.entries.iter().rev() {
526            let start_index = cells
527                .binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater))
528                .err()
529                .unwrap();
530            if start_index >= cells.len() {
531                #[cfg(feature = "stats")]
532                {
533                    lookup_result.range_misses += 1;
534                }
535                continue;
536            }
537            let end_index = cells
538                .binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less))
539                .err()
540                .unwrap()
541                .checked_sub(1);
542            let Some(end_index) = end_index else {
543                #[cfg(feature = "stats")]
544                {
545                    lookup_result.range_misses += 1;
546                }
547                continue;
548            };
549            if start_index > end_index {
550                #[cfg(feature = "stats")]
551                {
552                    lookup_result.range_misses += 1;
553                }
554                continue;
555            }
556            for (hash, index, result) in &mut cells[start_index..=end_index] {
557                debug_assert!(
558                    *hash >= entry.min_hash && *hash <= entry.max_hash,
559                    "Key hash out of range"
560                );
561                if result.is_some() {
562                    continue;
563                }
564                if !entry.amqf.contains_fingerprint(*hash) {
565                    #[cfg(feature = "stats")]
566                    {
567                        lookup_result.quick_filter_misses += 1;
568                    }
569                    continue;
570                }
571                let sst_result = entry.sst(self)?.lookup::<_, false>(
572                    *hash,
573                    &keys[*index],
574                    key_block_cache,
575                    value_block_cache,
576                )?;
577                if let SstLookupResult::Found(mut values) = sst_result {
578                    // find_all=false guarantees exactly one result
579                    debug_assert!(values.len() == 1);
580                    let Some(value) = values.pop() else {
581                        unreachable!()
582                    };
583                    *result = Some(value);
584                    *empty_cells -= 1;
585                    #[cfg(feature = "stats")]
586                    {
587                        lookup_result.hits += 1;
588                    }
589                    if *empty_cells == 0 {
590                        return Ok(lookup_result);
591                    }
592                } else {
593                    #[cfg(feature = "stats")]
594                    {
595                        lookup_result.sst_misses += 1;
596                    }
597                }
598            }
599        }
600        Ok(lookup_result)
601    }
602}