Skip to main content

turbo_persistence/
meta_file.rs

1use std::{
2    cmp::Ordering,
3    fmt::Display,
4    fs::File,
5    io::{BufReader, Seek},
6    ops::Deref,
7    path::{Path, PathBuf},
8    sync::OnceLock,
9};
10
11use anyhow::{Context, Result, bail};
12use bincode::{Decode, Encode};
13use bitfield::bitfield;
14use byteorder::{BE, ReadBytesExt};
15use memmap2::{Mmap, MmapOptions};
16use smallvec::SmallVec;
17use turbo_bincode::turbo_bincode_decode;
18
19use crate::{
20    QueryKey,
21    lookup_entry::LookupValue,
22    mmap_helper::advise_mmap_for_persistence,
23    static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData},
24};
25
26bitfield! {
27    #[derive(Clone, Copy, Default)]
28    pub struct MetaEntryFlags(u32);
29    impl Debug;
30    impl From<u32>;
31    /// The SST file was compacted and none of the entries have been accessed recently.
32    pub cold, set_cold: 0;
33    /// The SST file was freshly written and has not been compacted yet.
34    pub fresh, set_fresh: 1;
35}
36
37impl MetaEntryFlags {
38    pub const FRESH: MetaEntryFlags = MetaEntryFlags(0b10);
39    pub const COLD: MetaEntryFlags = MetaEntryFlags(0b01);
40    pub const WARM: MetaEntryFlags = MetaEntryFlags(0b00);
41}
42
43impl Display for MetaEntryFlags {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        if self.fresh() {
46            f.pad_integral(true, "", "fresh")
47        } else if self.cold() {
48            f.pad_integral(true, "", "cold")
49        } else {
50            f.pad_integral(true, "", "warm")
51        }
52    }
53}
54
55/// A wrapper around [`qfilter::Filter`] that implements [`Encode`] and [`Decode`].
56#[derive(Encode, Decode)]
57pub struct AmqfBincodeWrapper(
58    // this annotation can be replaced with `#[bincode(serde)]` once
59    // <https://github.com/arthurprs/qfilter/issues/13> is resolved
60    #[bincode(with = "turbo_bincode::serde_self_describing")] pub qfilter::Filter,
61);
62
63pub struct MetaEntry {
64    /// The metadata for the static sorted file.
65    sst_data: StaticSortedFileMetaData,
66    /// The key family of the SST file.
67    family: u32,
68    /// The minimum hash value of the keys in the SST file.
69    min_hash: u64,
70    /// The maximum hash value of the keys in the SST file.
71    max_hash: u64,
72    /// The size of the SST file in bytes.
73    size: u64,
74    /// The status flags for this entry.
75    flags: MetaEntryFlags,
76    /// The offset of the start of the AMQF data in the meta file relative to the end of the
77    /// header.
78    start_of_amqf_data_offset: u32,
79    /// The offset of the end of the AMQF data in the the meta file relative to the end of the
80    /// header.
81    end_of_amqf_data_offset: u32,
82    /// The AMQF filter of this file. This is only used if the range is very large. Smaller ranges
83    /// use the AMQF cache instead.
84    amqf: OnceLock<qfilter::Filter>,
85    /// The static sorted file that is lazily loaded
86    sst: OnceLock<StaticSortedFile>,
87}
88
89impl MetaEntry {
90    pub fn sequence_number(&self) -> u32 {
91        self.sst_data.sequence_number
92    }
93
94    pub fn size(&self) -> u64 {
95        self.size
96    }
97
98    pub fn flags(&self) -> MetaEntryFlags {
99        self.flags
100    }
101
102    pub fn amqf_size(&self) -> u32 {
103        self.end_of_amqf_data_offset - self.start_of_amqf_data_offset
104    }
105
106    pub fn raw_amqf<'l>(&self, amqf_data: &'l [u8]) -> &'l [u8] {
107        amqf_data
108            .get(self.start_of_amqf_data_offset as usize..self.end_of_amqf_data_offset as usize)
109            .expect("AMQF data out of bounds")
110    }
111
112    pub fn deserialize_amqf(&self, meta: &MetaFile) -> Result<qfilter::Filter> {
113        let amqf = self.raw_amqf(meta.amqf_data());
114        Ok(turbo_bincode_decode::<AmqfBincodeWrapper>(amqf)
115            .with_context(|| {
116                format!(
117                    "Failed to deserialize AMQF from {:08}.meta for {:08}.sst",
118                    meta.sequence_number,
119                    self.sequence_number()
120                )
121            })?
122            .0)
123    }
124
125    pub fn amqf(&self, meta: &MetaFile) -> Result<impl Deref<Target = qfilter::Filter>> {
126        self.amqf.get_or_try_init(|| {
127            let amqf = self.deserialize_amqf(meta)?;
128            anyhow::Ok(amqf)
129        })
130    }
131
132    pub fn sst(&self, meta: &MetaFile) -> Result<&StaticSortedFile> {
133        self.sst.get_or_try_init(|| {
134            StaticSortedFile::open(&meta.db_path, self.sst_data).with_context(|| {
135                format!(
136                    "Unable to open static sorted file referenced from {:08}.meta",
137                    meta.sequence_number()
138                )
139            })
140        })
141    }
142
143    /// Returns the key family and hash range of this file.
144    pub fn range(&self) -> StaticSortedFileRange {
145        StaticSortedFileRange {
146            family: self.family,
147            min_hash: self.min_hash,
148            max_hash: self.max_hash,
149        }
150    }
151
152    pub fn min_hash(&self) -> u64 {
153        self.min_hash
154    }
155
156    pub fn max_hash(&self) -> u64 {
157        self.max_hash
158    }
159
160    pub fn block_count(&self) -> u16 {
161        self.sst_data.block_count
162    }
163
164    /// Returns the SST metadata needed to open the file independently.
165    /// Used during compaction to avoid caching mmaps on the MetaEntry.
166    pub fn sst_metadata(&self) -> StaticSortedFileMetaData {
167        self.sst_data
168    }
169}
170
171/// The result of a lookup operation.
172pub enum MetaLookupResult {
173    /// The key was not found because it is from a different key family.
174    FamilyMiss,
175    /// The key was not found because it is out of the range of this SST file. But it was the
176    /// correct key family.
177    RangeMiss,
178    /// The key was not found because it was not in the AMQF filter. But it was in the range.
179    QuickFilterMiss,
180    /// The key was looked up in the SST file. It was in the AMQF filter.
181    SstLookup(SstLookupResult),
182}
183
184/// The result of a batch lookup operation.
185#[derive(Default)]
186pub struct MetaBatchLookupResult {
187    /// The key was not found because it is from a different key family.
188    #[cfg(feature = "stats")]
189    pub family_miss: bool,
190    /// The key was not found because it is out of the range of this SST file. But it was the
191    /// correct key family.
192    #[cfg(feature = "stats")]
193    pub range_misses: usize,
194    /// The key was not found because it was not in the AMQF filter. But it was in the range.
195    #[cfg(feature = "stats")]
196    pub quick_filter_misses: usize,
197    /// The key was unsuccessfully looked up in the SST file. It was in the AMQF filter.
198    #[cfg(feature = "stats")]
199    pub sst_misses: usize,
200    /// The key was found in the SST file.
201    #[cfg(feature = "stats")]
202    pub hits: usize,
203}
204
205/// The key family and hash range of an SST file.
206#[derive(Clone, Copy)]
207pub struct StaticSortedFileRange {
208    pub family: u32,
209    pub min_hash: u64,
210    pub max_hash: u64,
211}
212
213pub struct MetaFile {
214    /// The database path
215    db_path: PathBuf,
216    /// The sequence number of this file.
217    sequence_number: u32,
218    /// The key family of the SST files in this meta file.
219    family: u32,
220    /// The entries of the file.
221    entries: Vec<MetaEntry>,
222    /// The entries that have been marked as obsolete.
223    obsolete_entries: Vec<u32>,
224    /// The obsolete SST files.
225    obsolete_sst_files: Vec<u32>,
226    /// The offset of the start of the "used keys" AMQF data in the meta file relative to the end
227    /// of the header.
228    start_of_used_keys_amqf_data_offset: u32,
229    /// The offset of the end of the "used keys" AMQF data in the the meta file relative to the end
230    /// of the header.
231    end_of_used_keys_amqf_data_offset: u32,
232    /// The memory mapped file.
233    mmap: Mmap,
234}
235
236impl MetaFile {
237    /// Opens a meta file at the given path. This memory maps the file, but does not read it yet.
238    /// It's lazy read on demand.
239    pub fn open(db_path: &Path, sequence_number: u32) -> Result<Self> {
240        let filename = format!("{sequence_number:08}.meta");
241        let path = db_path.join(&filename);
242        Self::open_internal(db_path.to_path_buf(), sequence_number, &path)
243            .with_context(|| format!("Unable to open meta file {filename}"))
244    }
245
246    fn open_internal(db_path: PathBuf, sequence_number: u32, path: &Path) -> Result<Self> {
247        let mut file = BufReader::new(File::open(path)?);
248        let magic = file.read_u32::<BE>()?;
249        if magic != 0xFE4ADA4A {
250            bail!("Invalid magic number");
251        }
252        let family = file.read_u32::<BE>()?;
253        let obsolete_count = file.read_u32::<BE>()?;
254        let mut obsolete_sst_files = Vec::with_capacity(obsolete_count as usize);
255        for _ in 0..obsolete_count {
256            let obsolete_sst = file.read_u32::<BE>()?;
257            obsolete_sst_files.push(obsolete_sst);
258        }
259        let count = file.read_u32::<BE>()?;
260        let mut entries = Vec::with_capacity(count as usize);
261        let mut start_of_amqf_data_offset = 0;
262        for _ in 0..count {
263            let entry = MetaEntry {
264                sst_data: StaticSortedFileMetaData {
265                    sequence_number: file.read_u32::<BE>()?,
266                    block_count: file.read_u16::<BE>()?,
267                },
268                family,
269                min_hash: file.read_u64::<BE>()?,
270                max_hash: file.read_u64::<BE>()?,
271                size: file.read_u64::<BE>()?,
272                flags: MetaEntryFlags(file.read_u32::<BE>()?),
273                start_of_amqf_data_offset,
274                end_of_amqf_data_offset: file.read_u32::<BE>()?,
275                amqf: OnceLock::new(),
276                sst: OnceLock::new(),
277            };
278            start_of_amqf_data_offset = entry.end_of_amqf_data_offset;
279            entries.push(entry);
280        }
281        let start_of_used_keys_amqf_data_offset = start_of_amqf_data_offset;
282        let end_of_used_keys_amqf_data_offset = file.read_u32::<BE>()?;
283
284        let offset = file.stream_position()?;
285        let file = file.into_inner();
286        let mut options = MmapOptions::new();
287        options.offset(offset);
288        let mmap = unsafe { options.map(&file) }
289            .with_context(|| format!("Failed to mmap meta file {}", path.display()))?;
290        #[cfg(unix)]
291        mmap.advise(memmap2::Advice::Random)?;
292        advise_mmap_for_persistence(&mmap)?;
293        let file = Self {
294            db_path,
295            sequence_number,
296            family,
297            entries,
298            obsolete_entries: Vec::new(),
299            obsolete_sst_files,
300            start_of_used_keys_amqf_data_offset,
301            end_of_used_keys_amqf_data_offset,
302            mmap,
303        };
304        Ok(file)
305    }
306
307    pub fn clear_cache(&mut self) {
308        for entry in self.entries.iter_mut() {
309            entry.amqf.take();
310            entry.sst.take();
311        }
312    }
313
314    pub fn prepare_sst_cache(&self) {
315        for entry in self.entries.iter() {
316            let _ = entry.sst(self);
317            let _ = entry.amqf(self);
318        }
319    }
320
321    pub fn sequence_number(&self) -> u32 {
322        self.sequence_number
323    }
324
325    pub fn family(&self) -> u32 {
326        self.family
327    }
328
329    pub fn entries(&self) -> &[MetaEntry] {
330        &self.entries
331    }
332
333    pub fn entry(&self, index: u32) -> &MetaEntry {
334        let index = index as usize;
335        &self.entries[index]
336    }
337
338    pub fn amqf_data(&self) -> &[u8] {
339        &self.mmap
340    }
341
342    pub fn deserialize_used_key_hashes_amqf(&self) -> Result<Option<qfilter::Filter>> {
343        if self.start_of_used_keys_amqf_data_offset == self.end_of_used_keys_amqf_data_offset {
344            return Ok(None);
345        }
346        let amqf = &self.amqf_data()[self.start_of_used_keys_amqf_data_offset as usize
347            ..self.end_of_used_keys_amqf_data_offset as usize];
348        Ok(Some(pot::from_slice(amqf).with_context(|| {
349            format!(
350                "Failed to deserialize used key hashes AMQF from {:08}.meta",
351                self.sequence_number
352            )
353        })?))
354    }
355
356    pub fn retain_entries(&mut self, mut predicate: impl FnMut(u32) -> bool) -> bool {
357        let old_len = self.entries.len();
358        self.entries.retain(|entry| {
359            if predicate(entry.sst_data.sequence_number) {
360                true
361            } else {
362                self.obsolete_entries.push(entry.sst_data.sequence_number);
363                false
364            }
365        });
366        old_len != self.entries.len()
367    }
368
369    pub fn obsolete_entries(&self) -> &[u32] {
370        &self.obsolete_entries
371    }
372
373    pub fn has_active_entries(&self) -> bool {
374        !self.entries.is_empty()
375    }
376
377    pub fn obsolete_sst_files(&self) -> &[u32] {
378        &self.obsolete_sst_files
379    }
380
381    /// Looks up a key in this meta file.
382    ///
383    /// If `FIND_ALL` is false, returns after finding the first match.
384    /// If `FIND_ALL` is true, returns all entries with the same key from all SST files
385    /// (useful for keyspaces where keys are hashes and collisions are possible).
386    pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
387        &self,
388        key_family: u32,
389        key_hash: u64,
390        key: &K,
391        key_block_cache: &BlockCache,
392        value_block_cache: &BlockCache,
393    ) -> Result<MetaLookupResult> {
394        if key_family != self.family {
395            return Ok(MetaLookupResult::FamilyMiss);
396        }
397        let mut miss_result = MetaLookupResult::RangeMiss;
398        let mut all_results: SmallVec<[LookupValue; 1]> = SmallVec::new();
399
400        for entry in self.entries.iter().rev() {
401            if key_hash < entry.min_hash || key_hash > entry.max_hash {
402                continue;
403            }
404            let amqf = entry.amqf(self)?;
405            if !amqf.contains_fingerprint(key_hash) {
406                miss_result = MetaLookupResult::QuickFilterMiss;
407                continue;
408            }
409
410            let result = entry.sst(self)?.lookup::<K, FIND_ALL>(
411                key_hash,
412                key,
413                key_block_cache,
414                value_block_cache,
415            )?;
416
417            match result {
418                SstLookupResult::NotFound => {
419                    // continue searching other sst files
420                }
421                SstLookupResult::Found(values) => {
422                    if !FIND_ALL {
423                        // Return immediately with the first result
424                        return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(values)));
425                    }
426                    // Check for tombstone — stops search across older SSTs within this meta file.
427                    // Since tombstones sort last within a key group, if the last value is Deleted,
428                    // we have a tombstone.
429                    let has_tombstone = values.last().is_some_and(|v| *v == LookupValue::Deleted);
430                    all_results.extend(values);
431                    if has_tombstone {
432                        return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
433                            all_results,
434                        )));
435                    }
436                }
437            }
438        }
439
440        if FIND_ALL && !all_results.is_empty() {
441            return Ok(MetaLookupResult::SstLookup(SstLookupResult::Found(
442                all_results,
443            )));
444        }
445
446        Ok(miss_result)
447    }
448
449    pub fn batch_lookup<K: QueryKey>(
450        &self,
451        key_family: u32,
452        keys: &[K],
453        cells: &mut [(u64, usize, Option<LookupValue>)],
454        empty_cells: &mut usize,
455        key_block_cache: &BlockCache,
456        value_block_cache: &BlockCache,
457    ) -> Result<MetaBatchLookupResult> {
458        if key_family != self.family {
459            #[cfg(feature = "stats")]
460            return Ok(MetaBatchLookupResult {
461                family_miss: true,
462                ..Default::default()
463            });
464            #[cfg(not(feature = "stats"))]
465            return Ok(MetaBatchLookupResult {});
466        }
467        debug_assert!(
468            cells.is_sorted_by_key(|(hash, _, _)| *hash),
469            "Cells must be sorted by key hash"
470        );
471        #[allow(unused_mut, reason = "It's used when stats are enabled")]
472        let mut lookup_result = MetaBatchLookupResult::default();
473        for entry in self.entries.iter().rev() {
474            let start_index = cells
475                .binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater))
476                .err()
477                .unwrap();
478            if start_index >= cells.len() {
479                #[cfg(feature = "stats")]
480                {
481                    lookup_result.range_misses += 1;
482                }
483                continue;
484            }
485            let end_index = cells
486                .binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less))
487                .err()
488                .unwrap()
489                .checked_sub(1);
490            let Some(end_index) = end_index else {
491                #[cfg(feature = "stats")]
492                {
493                    lookup_result.range_misses += 1;
494                }
495                continue;
496            };
497            if start_index > end_index {
498                #[cfg(feature = "stats")]
499                {
500                    lookup_result.range_misses += 1;
501                }
502                continue;
503            }
504            let amqf = entry.amqf(self)?;
505            for (hash, index, result) in &mut cells[start_index..=end_index] {
506                debug_assert!(
507                    *hash >= entry.min_hash && *hash <= entry.max_hash,
508                    "Key hash out of range"
509                );
510                if result.is_some() {
511                    continue;
512                }
513                if !amqf.contains_fingerprint(*hash) {
514                    #[cfg(feature = "stats")]
515                    {
516                        lookup_result.quick_filter_misses += 1;
517                    }
518                    continue;
519                }
520                let sst_result = entry.sst(self)?.lookup::<_, false>(
521                    *hash,
522                    &keys[*index],
523                    key_block_cache,
524                    value_block_cache,
525                )?;
526                if let SstLookupResult::Found(mut values) = sst_result {
527                    // find_all=false guarantees exactly one result
528                    debug_assert!(values.len() == 1);
529                    let Some(value) = values.pop() else {
530                        unreachable!()
531                    };
532                    *result = Some(value);
533                    *empty_cells -= 1;
534                    #[cfg(feature = "stats")]
535                    {
536                        lookup_result.hits += 1;
537                    }
538                    if *empty_cells == 0 {
539                        return Ok(lookup_result);
540                    }
541                } else {
542                    #[cfg(feature = "stats")]
543                    {
544                        lookup_result.sst_misses += 1;
545                    }
546                }
547            }
548        }
549        Ok(lookup_result)
550    }
551}