Skip to main content

turbo_persistence/
static_sorted_file.rs

1use std::{
2    cmp::Ordering,
3    fs::File,
4    hash::BuildHasherDefault,
5    path::Path,
6    rc::Rc,
7    sync::{
8        Arc,
9        atomic::{AtomicU64, Ordering as AtomicOrdering},
10    },
11};
12
13use anyhow::{Context, Result, bail, ensure};
14use memmap2::Mmap;
15use quick_cache::{Lifecycle, sync::GuardResult};
16use rustc_hash::FxHasher;
17use smallvec::SmallVec;
18
19use crate::{
20    QueryKey,
21    arc_bytes::ArcBytes,
22    be,
23    compression::checksum_block,
24    constants::MAX_INLINE_VALUE_SIZE,
25    lookup_entry::{IterValue, LookupEntry, LookupValue},
26    mmap_helper::advise_mmap_for_persistence,
27    rc_bytes::RcBytes,
28    shared_bytes::SharedBytes,
29    static_sorted_file_builder::{BLOCK_HEADER_SIZE, INDEX_BLOCK_ENTRY_SIZE},
30};
31
32/// The block header for an index block.
33pub const BLOCK_TYPE_INDEX: u8 = 0;
34/// The block header for a key block with 8-byte hash per entry.
35pub const BLOCK_TYPE_KEY_WITH_HASH: u8 = 1;
36/// The block header for a key block without hash.
37pub const BLOCK_TYPE_KEY_NO_HASH: u8 = 2;
38/// The block header for a fixed-size key block with 8-byte hash per entry.
39pub const BLOCK_TYPE_FIXED_KEY_WITH_HASH: u8 = 3;
40/// The block header for a fixed-size key block without hash.
41pub const BLOCK_TYPE_FIXED_KEY_NO_HASH: u8 = 4;
42
43/// The tag for a small-sized value.
44pub const KEY_BLOCK_ENTRY_TYPE_SMALL: u8 = 0;
45/// The tag for the blob value.
46pub const KEY_BLOCK_ENTRY_TYPE_BLOB: u8 = 1;
47/// The tag for the deleted value.
48pub const KEY_BLOCK_ENTRY_TYPE_DELETED: u8 = 2;
49/// The tag for a medium-sized value.
50pub const KEY_BLOCK_ENTRY_TYPE_MEDIUM: u8 = 3;
51/// The minimum tag for inline values. The actual size is (tag - INLINE_MIN).
52pub const KEY_BLOCK_ENTRY_TYPE_INLINE_MIN: u8 = 8;
53
54/// Encoded size of a small value reference: 2B block index + 2B size + 4B offset.
55pub(crate) const SMALL_VALUE_REF_SIZE: usize = 8;
56/// Encoded size of a medium value reference: 2B block index.
57pub(crate) const MEDIUM_VALUE_REF_SIZE: usize = 2;
58/// Encoded size of a blob value reference: 4B blob id.
59pub(crate) const BLOB_VALUE_REF_SIZE: usize = 4;
60/// Encoded size of a deleted (tombstone) value reference.
61pub(crate) const DELETED_VALUE_REF_SIZE: usize = 0;
62
63// Static assertion: MAX_INLINE_VALUE_SIZE must fit in the key type encoding.
64// Key types 8-255 encode inline values of size 0-247, so max is 255 - 8 = 247.
65const _: () = assert!(
66    MAX_INLINE_VALUE_SIZE <= (u8::MAX - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize,
67    "MAX_INLINE_VALUE_SIZE exceeds what can be encoded in key type byte"
68);
69
70/// The result of a lookup operation.
71pub enum SstLookupResult {
72    /// One or more values were found.
73    Found(SmallVec<[LookupValue; 1]>),
74    /// The key was not found.
75    NotFound,
76}
77
78impl From<LookupValue> for SstLookupResult {
79    fn from(value: LookupValue) -> Self {
80        SstLookupResult::Found(smallvec::smallvec![value])
81    }
82}
83
84#[derive(Clone, Default)]
85pub struct BlockWeighter;
86
87impl quick_cache::Weighter<(u32, u16), ArcBytes> for BlockWeighter {
88    fn weight(&self, _key: &(u32, u16), val: &ArcBytes) -> u64 {
89        if val.is_mmap_backed() {
90            // Mmap-backed blocks bypass the cache (served directly from mmap),
91            // so this branch should never be reached.
92            debug_assert!(
93                !val.is_mmap_backed(),
94                "mmap-backed block should not be inserted into BlockCache"
95            );
96            64
97        } else {
98            val.len() as u64 + 8
99        }
100    }
101}
102
103/// Lifecycle hooks for the block cache that prevent eviction of entries
104/// still referenced outside the cache (i.e., with `Arc` strong count > 1).
105#[derive(Clone, Default)]
106pub struct BlockCacheLifecycle;
107
108impl Lifecycle<(u32, u16), ArcBytes> for BlockCacheLifecycle {
109    type RequestState = ();
110
111    #[inline]
112    fn is_pinned(&self, _key: &(u32, u16), val: &ArcBytes) -> bool {
113        val.is_shared_arc()
114    }
115
116    #[inline]
117    fn begin_request(&self) -> Self::RequestState {}
118
119    #[inline]
120    fn on_evict(&self, _state: &mut Self::RequestState, _key: (u32, u16), _val: ArcBytes) {}
121}
122
123pub type BlockCache = quick_cache::sync::Cache<
124    (u32, u16),
125    ArcBytes,
126    BlockWeighter,
127    BuildHasherDefault<FxHasher>,
128    BlockCacheLifecycle,
129>;
130
131/// Trait abstracting value block reading for `handle_key_match_generic`.
132///
133/// Provides cached reads (small value blocks) and uncached reads (medium value
134/// blocks). Generic over the byte type so it works for both the lookup path
135/// (`ArcBytes` with `BlockCache`) and the iteration path (`RcBytes` with a
136/// single-entry `Option` cache).
137trait ValueBlockCache<B: SharedBytes> {
138    fn get_or_read(
139        self,
140        mmap: &B::MmapHandle,
141        meta: &StaticSortedFileMetaData,
142        block_index: u16,
143    ) -> Result<B>;
144}
145
146/// Bundles the shared block cache with the per-file CRC-verified bitmap,
147/// used on the lookup path.
148#[derive(Clone, Copy)]
149struct ArcBlockCacheReader<'a> {
150    cache: &'a BlockCache,
151    verified_blocks: &'a [AtomicU64],
152}
153
154/// Lookup-path: concurrent `BlockCache` with uncompressed-bypass and
155/// once-per-block CRC verification via `verified_blocks` bitmap.
156impl ValueBlockCache<ArcBytes> for ArcBlockCacheReader<'_> {
157    fn get_or_read(
158        self,
159        mmap: &Arc<Mmap>,
160        meta: &StaticSortedFileMetaData,
161        block_index: u16,
162    ) -> Result<ArcBytes> {
163        get_or_cache_block(mmap, meta, block_index, self.cache, self.verified_blocks)
164    }
165}
166
167/// Iteration-path: lightweight single-entry cache for sequential reads.
168impl ValueBlockCache<RcBytes> for &mut Option<(u16, RcBytes)> {
169    fn get_or_read(
170        self,
171        mmap: &Rc<Mmap>,
172        meta: &StaticSortedFileMetaData,
173        block_index: u16,
174    ) -> Result<RcBytes> {
175        if let Some((idx, block)) = self.as_ref()
176            && *idx == block_index
177        {
178            return Ok(block.clone());
179        }
180        let block: RcBytes = read_block_generic(mmap, meta, block_index)?;
181        *self = Some((block_index, block.clone()));
182        Ok(block)
183    }
184}
185
186#[derive(Clone, Copy, Debug)]
187pub struct StaticSortedFileMetaData {
188    /// The sequence number of this file.
189    pub sequence_number: u32,
190    /// The number of blocks in the SST file.
191    pub block_count: u16,
192}
193
194impl StaticSortedFileMetaData {
195    pub fn block_offsets_start(&self, sst_len: usize) -> usize {
196        let bc: usize = self.block_count.into();
197        sst_len - (bc * size_of::<u32>())
198    }
199}
200
201/// A memory mapped SST file.
202pub struct StaticSortedFile {
203    /// The meta file of this file.
204    meta: StaticSortedFileMetaData,
205    /// The memory mapped file.
206    /// We store as an Arc so we can hand out references (via ArcBytes) that can outlive this
207    /// struct (not that we expect them to outlive it by very much)
208    mmap: Arc<Mmap>,
209    /// One bit per block, set when that block's CRC has been verified at least once.
210    /// Uncompressed (mmap-backed) blocks bypass the `BlockCache`, so without this
211    /// bitmap the CRC would be re-computed on every access. `Relaxed` ordering
212    /// suffices: racing first-time verifications are idempotent.
213    verified_blocks: Box<[AtomicU64]>,
214}
215
216impl StaticSortedFile {
217    /// Opens an SST file at the given path. This memory maps the file, but does not read it yet.
218    /// It's lazy read on demand.
219    pub fn open(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
220        let filename = format!("{:08}.sst", meta.sequence_number);
221        let path = db_path.join(&filename);
222        let file = File::open(&path)
223            .with_context(|| format!("Failed to open SST file {}", path.display()))?;
224        let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
225            format!(
226                "Failed to mmap SST file {} ({} bytes)",
227                path.display(),
228                file.metadata().map(|m| m.len()).unwrap_or(0)
229            )
230        })?;
231        #[cfg(unix)]
232        {
233            mmap.advise(memmap2::Advice::Random)?;
234            let offset = meta.block_offsets_start(mmap.len());
235            let _ = mmap.advise_range(memmap2::Advice::Sequential, offset, mmap.len() - offset);
236        }
237        advise_mmap_for_persistence(&mmap)?;
238        let bitmap_words = (meta.block_count as usize).div_ceil(u64::BITS as usize);
239        let verified_blocks = (0..bitmap_words)
240            .map(|_| AtomicU64::new(0))
241            .collect::<Box<[_]>>();
242        Ok(Self {
243            meta,
244            mmap: Arc::new(mmap),
245            verified_blocks,
246        })
247    }
248
249    /// Looks up a key in this file.
250    ///
251    /// If `FIND_ALL` is false, returns after finding the first match.
252    /// If `FIND_ALL` is true, returns all entries with the same key (useful for
253    /// keyspaces where keys are hashes and collisions are possible).
254    pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
255        &self,
256        key_hash: u64,
257        key: &K,
258        key_block_cache: &BlockCache,
259        value_block_cache: &BlockCache,
260    ) -> Result<SstLookupResult> {
261        // There is exactly one index block per file (always the last block).
262        // Read it first, then dispatch directly to the key block it points to.
263        let index_block_index = self.meta.block_count - 1;
264        let index_block = get_or_cache_block(
265            &self.mmap,
266            &self.meta,
267            index_block_index,
268            key_block_cache,
269            &self.verified_blocks,
270        )?;
271        let key_block_index = self.lookup_index_block(&index_block, key_hash)?;
272
273        let key_block_arc = get_or_cache_block(
274            &self.mmap,
275            &self.meta,
276            key_block_index,
277            key_block_cache,
278            &self.verified_blocks,
279        )?;
280        let reader = ArcBlockCacheReader {
281            cache: value_block_cache,
282            verified_blocks: &self.verified_blocks,
283        };
284        let block_type = be::read_u8(&key_block_arc);
285        match block_type {
286            BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
287                let has_hash = block_type == BLOCK_TYPE_KEY_WITH_HASH;
288                self.lookup_key_block::<K, FIND_ALL>(key_block_arc, key_hash, key, has_hash, reader)
289            }
290
291            BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
292                let has_hash = block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH;
293                self.lookup_fixed_key_block::<K, FIND_ALL>(
294                    key_block_arc,
295                    key_hash,
296                    key,
297                    has_hash,
298                    reader,
299                )
300            }
301            _ => {
302                bail!("Invalid block type");
303            }
304        }
305    }
306
307    /// Looks up a hash in a index block.
308    fn lookup_index_block(&self, block: &[u8], hash: u64) -> Result<u16> {
309        ensure!(block.len() >= 3, "index block too short");
310        debug_assert!(
311            be::read_u8(block) == BLOCK_TYPE_INDEX,
312            "expected index block as last block"
313        );
314        let first_block = be::read_u16(&block[1..]);
315        let (entries, remainder) = block[3..].as_chunks::<INDEX_BLOCK_ENTRY_SIZE>();
316        if entries.is_empty() {
317            return Ok(first_block);
318        }
319        if !remainder.is_empty() {
320            bail!("invalid index block, {} extra bytes", remainder.len())
321        }
322        match entries.binary_search_by(|entry| be::read_u64(entry).cmp(&hash)) {
323            Ok(i) => Ok(be::read_u16(&entries[i][8..])),
324            Err(0) => Ok(first_block),
325            Err(i) => Ok(be::read_u16(&entries[i - 1][8..])),
326        }
327    }
328
329    /// Looks up a key in a key block and the value in a value block.
330    ///
331    /// If `FIND_ALL` is false, returns after finding the first match.
332    /// If `FIND_ALL` is true, collects all entries with the same key.
333    fn lookup_key_block<K: QueryKey, const FIND_ALL: bool>(
334        &self,
335        block: ArcBytes,
336        key_hash: u64,
337        key: &K,
338        has_hash: bool,
339        reader: ArcBlockCacheReader<'_>,
340    ) -> Result<SstLookupResult> {
341        let hash_len: u8 = if has_hash { 8 } else { 0 };
342        ensure!(block.len() >= 4, "key block too short");
343        let entry_count = be::read_u24(&block[1..]) as usize;
344        let data = &block[4..];
345        ensure!(
346            data.len() >= entry_count * 4,
347            "key block too short for {entry_count} entries"
348        );
349        let offsets = &data[..entry_count * 4];
350        let entries = &data[entry_count * 4..];
351
352        self.lookup_block_inner::<K, FIND_ALL>(&block, entry_count, key_hash, key, reader, |i| {
353            get_key_entry(offsets, entries, entry_count, i, hash_len)
354        })
355    }
356
357    /// Looks up a key in a fixed-size key block.
358    ///
359    /// Fixed-size key blocks store entries at predictable offsets (no offset table),
360    /// enabling direct indexing during binary search.
361    fn lookup_fixed_key_block<K: QueryKey, const FIND_ALL: bool>(
362        &self,
363        block: ArcBytes,
364        key_hash: u64,
365        key: &K,
366        has_hash: bool,
367        reader: ArcBlockCacheReader<'_>,
368    ) -> Result<SstLookupResult> {
369        let hash_len: u8 = if has_hash { 8 } else { 0 };
370        ensure!(block.len() >= 6, "fixed key block too short");
371        let entry_count = be::read_u24(&block[1..]) as usize;
372        let key_size = be::read_u8(&block[4..]) as usize;
373        let value_type = be::read_u8(&block[5..]);
374        let val_size = entry_val_size(value_type)?;
375        let stride = hash_len as usize + key_size + val_size;
376        let entries = &block[6..];
377        ensure!(
378            entries.len() == entry_count * stride,
379            "fixed key block for {entry_count} entries must is the wrong size"
380        );
381
382        self.lookup_block_inner::<K, FIND_ALL>(&block, entry_count, key_hash, key, reader, |i| {
383            Ok(get_fixed_key_entry(
384                entries, i, hash_len, key_size, value_type, stride,
385            ))
386        })
387    }
388
389    /// Shared binary search + collection logic for both key block variants.
390    ///
391    /// The `get_entry` closure abstracts over the difference between variable-size
392    /// key blocks (offset table lookup) and fixed-size key blocks (stride-based indexing).
393    fn lookup_block_inner<'a, K: QueryKey, const FIND_ALL: bool>(
394        &self,
395        block: &ArcBytes,
396        entry_count: usize,
397        key_hash: u64,
398        key: &K,
399        reader: ArcBlockCacheReader<'_>,
400        get_entry: impl Fn(usize) -> Result<GetKeyEntryResult<'a>>,
401    ) -> Result<SstLookupResult> {
402        let mut l = 0;
403        let mut r = entry_count;
404        // binary search for a matching key
405        while l < r {
406            let m = (l + r) / 2;
407            let GetKeyEntryResult {
408                hash: mid_hash,
409                key: mid_key,
410                ty,
411                val,
412            } = get_entry(m)?;
413
414            let comparison = compare_hash_key(mid_hash, mid_key, key_hash, key);
415
416            match comparison {
417                Ordering::Less => r = m,
418                Ordering::Equal => {
419                    if !FIND_ALL {
420                        // SingleValue mode: each key has exactly one entry
421                        // this is enforced when writing
422                        let result = self.handle_key_match(ty, val, block, reader)?;
423                        return Ok(SstLookupResult::Found(SmallVec::from_buf([result])));
424                    }
425                    // FIND_ALL (MultiValue) mode: collect all values for this key.
426                    // Tombstones (Deleted) sort last within each key group, so we
427                    // scan backward to find the start of the key group, then forward
428                    // to collect all entries. The tombstone, if present, will be the
429                    // last entry in the results.
430                    let mut results = SmallVec::new();
431                    for i in (l..m).rev() {
432                        let GetKeyEntryResult {
433                            hash,
434                            key: entry_key,
435                            ty,
436                            val,
437                        } = get_entry(i)?;
438                        if !entry_matches_key(hash, entry_key, key_hash, key) {
439                            break;
440                        }
441                        results.push(self.handle_key_match(ty, val, block, reader)?);
442                    }
443                    // Technically we could `.reverse()` the items collected by the backwards
444                    // scan, but the only ordering constraint we need to maintain for single
445                    // sst multivalue reads is that a deleted token, if it exists comes last.
446                    // Because all the backwards scan items are strictly before the found item
447                    // we know they don't contain the _last_ item. So we don't care about
448                    // their order.
449
450                    // Add the entry at `m`
451                    results.push(self.handle_key_match(ty, val, block, reader)?);
452                    for i in (m + 1)..r {
453                        let GetKeyEntryResult {
454                            hash,
455                            key: entry_key,
456                            ty,
457                            val,
458                        } = get_entry(i)?;
459                        if !entry_matches_key(hash, entry_key, key_hash, key) {
460                            break;
461                        }
462                        results.push(self.handle_key_match(ty, val, block, reader)?);
463                    }
464                    return Ok(SstLookupResult::Found(results));
465                }
466                Ordering::Greater => l = m + 1,
467            }
468        }
469
470        Ok(SstLookupResult::NotFound)
471    }
472
473    /// Handles a key match by looking up the value.
474    fn handle_key_match(
475        &self,
476        ty: u8,
477        val: &[u8],
478        key_block_arc: &ArcBytes,
479        reader: ArcBlockCacheReader<'_>,
480    ) -> Result<LookupValue> {
481        handle_key_match_generic(&self.mmap, &self.meta, ty, val, key_block_arc, reader)
482    }
483}
484
485/// Gets a block from the cache, or reads it from the mmap and inserts it.
486///
487/// Reads the block header exactly once via `get_raw_block_slice` (which
488/// includes all `strict_checks` bounds guards). Uncompressed blocks bypass
489/// the cache — an mmap-backed `ArcBytes` is cheaper than a cache lookup.
490/// Their CRC is verified at most once per file open, tracked by
491/// `verified_blocks`. Compressed blocks are looked up in `cache`; on a
492/// miss they are decompressed, CRC-verified, and inserted.
493fn get_or_cache_block(
494    mmap: &Arc<Mmap>,
495    meta: &StaticSortedFileMetaData,
496    block_index: u16,
497    cache: &BlockCache,
498    verified_blocks: &[AtomicU64],
499) -> Result<ArcBytes> {
500    let (uncompressed_length, checksum, block_data) = get_raw_block_slice(mmap, meta, block_index)
501        .with_context(|| {
502            format!(
503                "Failed to read raw block {} from {:08}.sst",
504                block_index, meta.sequence_number
505            )
506        })?;
507
508    if uncompressed_length == 0 {
509        // Uncompressed: serve directly from mmap. Verify CRC only once per file open.
510        verify_checksum_once(meta, block_data, checksum, block_index, verified_blocks)?;
511        // SAFETY: block_data points into the mmap backing `mmap`.
512        return Ok(unsafe { ArcBytes::from_mmap(mmap, block_data) });
513    }
514
515    // Compressed: check cache; decompress and insert on miss.
516    Ok(
517        match cache.get_value_or_guard(&(meta.sequence_number, block_index), None) {
518            GuardResult::Value(block) => block,
519            GuardResult::Guard(guard) => {
520                // A cached block may have been evicted, so re-reading still
521                // benefits from the bitmap to skip redundant CRC verification.
522                verify_checksum_once(meta, block_data, checksum, block_index, verified_blocks)?;
523                let block = ArcBytes::from_decompressed(uncompressed_length, block_data)
524                    .with_context(|| {
525                        format!(
526                            "Failed to decompress block {} from {:08}.sst ({} bytes uncompressed)",
527                            block_index, meta.sequence_number, uncompressed_length
528                        )
529                    })?;
530                let _ = guard.insert(block.clone());
531                block
532            }
533            GuardResult::Timeout => unreachable!(),
534        },
535    )
536}
537
538/// Gets the raw block slice directly from a memory-mapped file.
539/// Returns `(uncompressed_length, checksum, block_data)`.
540fn get_raw_block_slice<'a>(
541    mmap: &'a Mmap,
542    meta: &StaticSortedFileMetaData,
543    block_index: u16,
544) -> Result<(u32, u32, &'a [u8])> {
545    #[cfg(feature = "strict_checks")]
546    if block_index >= meta.block_count {
547        bail!(
548            "Corrupted file seq:{} block:{} > number of blocks {} (block_offsets: {:x})",
549            meta.sequence_number,
550            block_index,
551            meta.block_count,
552            meta.block_offsets_start(mmap.len()),
553        );
554    }
555    let offset = meta.block_offsets_start(mmap.len()) + block_index as usize * 4;
556    #[cfg(feature = "strict_checks")]
557    if offset + 4 > mmap.len() {
558        bail!(
559            "Corrupted file seq:{} block:{} block offset locations {} + 4 bytes > file end {} \
560             (block_offsets: {:x})",
561            meta.sequence_number,
562            block_index,
563            offset,
564            mmap.len(),
565            meta.block_offsets_start(mmap.len()),
566        );
567    }
568    let block_start = if block_index == 0 {
569        0
570    } else {
571        be::read_u32(&mmap[offset - 4..]) as usize
572    };
573    let block_end = be::read_u32(&mmap[offset..]) as usize;
574    #[cfg(feature = "strict_checks")]
575    if block_end > mmap.len() || block_start > mmap.len() {
576        bail!(
577            "Corrupted file seq:{} block:{} block {} - {} > file end {} (block_offsets: {:x})",
578            meta.sequence_number,
579            block_index,
580            block_start,
581            block_end,
582            mmap.len(),
583            meta.block_offsets_start(mmap.len()),
584        );
585    }
586    ensure!(
587        block_start + BLOCK_HEADER_SIZE <= block_end,
588        "block {} header truncated in {:08}.sst",
589        block_index,
590        meta.sequence_number
591    );
592    let uncompressed_length = be::read_u32(&mmap[block_start..]);
593    let checksum = be::read_u32(&mmap[block_start + 4..]);
594    let block = &mmap[block_start + BLOCK_HEADER_SIZE..block_end];
595    Ok((uncompressed_length, checksum, block))
596}
597
598/// Verifies the CRC32 checksum of on-disk block data. Returns an error on mismatch.
599fn verify_checksum(
600    meta: &StaticSortedFileMetaData,
601    data: &[u8],
602    expected: u32,
603    block_index: u16,
604) -> Result<()> {
605    let actual = checksum_block(data);
606    if actual != expected {
607        bail!(
608            "Cache corruption detected: checksum mismatch in block {} of {:08}.sst (expected \
609             {:08x}, got {:08x})",
610            block_index,
611            meta.sequence_number,
612            expected,
613            actual
614        );
615    }
616    Ok(())
617}
618
619/// Verifies a block's CRC using the `verified_blocks` bitmap to avoid redundant
620/// work. In practice each block is verified once, but concurrent first-time
621/// accesses may race and verify the same block more than once — this is harmless
622/// since the check is deterministic and idempotent. Verification failures are
623/// *not* recorded in the bitmap, so a corrupted block will be re-checked (and
624/// fail again) on every access.
625fn verify_checksum_once(
626    meta: &StaticSortedFileMetaData,
627    data: &[u8],
628    expected: u32,
629    block_index: u16,
630    verified_blocks: &[AtomicU64],
631) -> Result<()> {
632    let word_idx = block_index as usize / u64::BITS as usize;
633    let bit = 1u64 << (block_index as usize % u64::BITS as usize);
634    if verified_blocks[word_idx].load(AtomicOrdering::Relaxed) & bit != 0 {
635        return Ok(());
636    }
637    verify_checksum(meta, data, expected, block_index)?;
638    verified_blocks[word_idx].fetch_or(bit, AtomicOrdering::Relaxed);
639    Ok(())
640}
641
642/// Returns `(uncompressed_length, checksum, block)` wrapping the raw on-disk
643/// data as the given byte type. Generic over `ArcBytes`/`RcBytes`.
644fn get_raw_block_generic<B: SharedBytes>(
645    mmap: &B::MmapHandle,
646    meta: &StaticSortedFileMetaData,
647    block_index: u16,
648) -> Result<(u32, u32, B)> {
649    let (uncompressed_length, checksum, block) = get_raw_block_slice(mmap, meta, block_index)?;
650    // SAFETY: block points into mmap which backs the MmapHandle.
651    Ok((uncompressed_length, checksum, unsafe {
652        B::from_mmap(mmap, block)
653    }))
654}
655
656/// Reads a block, decompresses if needed, and verifies its checksum.
657/// Generic over the byte type (`ArcBytes` or `RcBytes`).
658#[tracing::instrument(level = "info", name = "reading database block", skip_all)]
659fn read_block_generic<B: SharedBytes>(
660    mmap: &B::MmapHandle,
661    meta: &StaticSortedFileMetaData,
662    block_index: u16,
663) -> Result<B> {
664    let (uncompressed_length, expected_checksum, block) =
665        get_raw_block_slice(mmap, meta, block_index).with_context(|| {
666            format!(
667                "Failed to read raw block {} from {:08}.sst",
668                block_index, meta.sequence_number
669            )
670        })?;
671
672    verify_checksum(meta, block, expected_checksum, block_index)?;
673
674    if uncompressed_length == 0 {
675        // SAFETY: callers guarantee block points into the mmap.
676        return Ok(unsafe { B::from_mmap(mmap, block) });
677    }
678
679    let buffer = B::from_decompressed(uncompressed_length, block).with_context(|| {
680        format!(
681            "Failed to decompress block {} from {:08}.sst ({} bytes uncompressed)",
682            block_index, meta.sequence_number, uncompressed_length
683        )
684    })?;
685    Ok(buffer)
686}
687
688/// Handles a key match by resolving the value reference. Generic over byte type.
689fn handle_key_match_generic<B: SharedBytes>(
690    mmap: &B::MmapHandle,
691    meta: &StaticSortedFileMetaData,
692    ty: u8,
693    val: &[u8],
694    key_block: &B,
695    reader: impl ValueBlockCache<B>,
696) -> Result<LookupValue<B>> {
697    Ok(match ty {
698        KEY_BLOCK_ENTRY_TYPE_SMALL => {
699            let block = be::read_u16(val);
700            let size = be::read_u16(&val[2..]) as usize;
701            let position = be::read_u32(&val[4..]) as usize;
702            let value = reader
703                .get_or_read(mmap, meta, block)?
704                .slice(position..position + size);
705            LookupValue::Slice { value }
706        }
707        KEY_BLOCK_ENTRY_TYPE_MEDIUM => {
708            let block = be::read_u16(val);
709            let value = read_block_generic(mmap, meta, block)?;
710            LookupValue::Slice { value }
711        }
712        KEY_BLOCK_ENTRY_TYPE_BLOB => {
713            let sequence_number = be::read_u32(val);
714            LookupValue::Blob { sequence_number }
715        }
716        KEY_BLOCK_ENTRY_TYPE_DELETED => LookupValue::Deleted,
717        _ => {
718            // Inline value — val is already the correct slice
719            // SAFETY: val points into key_block's data
720            let value = unsafe { key_block.slice_from_subslice(val) };
721            LookupValue::Slice { value }
722        }
723    })
724}
725
726/// An iterator over all entries in a SST file in sorted order.
727pub struct StaticSortedFileIter {
728    /// The memory-mapped file, wrapped in `Rc` for non-atomic refcounting.
729    /// All `RcBytes` slices produced during iteration share this `Rc`.
730    mmap: Rc<Mmap>,
731    /// Metadata (sequence number, block count) needed for block access.
732    meta: StaticSortedFileMetaData,
733
734    /// The root index block entries (body bytes starting after the type byte).
735    /// SST files have exactly one index level.
736    index_entries: RcBytes,
737    /// Total key block references in the index block (first_child + boundary entries).
738    num_index_entries: usize,
739    /// Next index entry to read from the index block.
740    index_pos: usize,
741    current_key_block: CurrentKeyBlock,
742    /// Single-entry value block cache. Within a key block, entries reference
743    /// value blocks sequentially and don't revisit earlier blocks, so caching
744    /// just the current one avoids redundant decompression.
745    value_block_cache: Option<(u16, RcBytes)>,
746}
747
748enum CurrentKeyBlockKind {
749    /// Variable-size entries with an offset table for random access.
750    Variable { offsets: RcBytes, hash_len: u8 },
751    /// Fixed-size entries with uniform key size and value type (no offset table).
752    Fixed {
753        hash_len: u8,
754        key_size: usize,
755        value_type: u8,
756        stride: usize,
757    },
758}
759
760struct CurrentKeyBlock {
761    kind: CurrentKeyBlockKind,
762    entries: RcBytes,
763    /// Number of entries in this key block (max ~819 per 16 KiB block).
764    entry_count: u32,
765    /// Current position within the key block.
766    index: u32,
767}
768
769impl Iterator for StaticSortedFileIter {
770    type Item = Result<LookupEntry>;
771
772    fn next(&mut self) -> Option<Self::Item> {
773        self.next_internal().transpose()
774    }
775}
776
777impl StaticSortedFileIter {
778    /// Opens an SST file for sequential iteration. Uses `MADV_SEQUENTIAL` for
779    /// read-ahead and wraps the mmap in `Rc<Mmap>` directly (no `Arc`),
780    /// eliminating all atomic refcounting during iteration.
781    pub fn open(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
782        let filename = format!("{:08}.sst", meta.sequence_number);
783        let path = db_path.join(&filename);
784        let file = File::open(&path)
785            .with_context(|| format!("Failed to open SST file {}", path.display()))?;
786        let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
787            format!(
788                "Failed to mmap SST file {} ({} bytes)",
789                path.display(),
790                file.metadata().map(|m| m.len()).unwrap_or(0)
791            )
792        })?;
793        #[cfg(unix)]
794        mmap.advise(memmap2::Advice::Sequential)?;
795        advise_mmap_for_persistence(&mmap)?;
796        Self::new(Rc::new(mmap), meta)
797            .with_context(|| format!("Unable to open static sorted file {filename}"))
798    }
799
800    fn new(mmap: Rc<Mmap>, meta: StaticSortedFileMetaData) -> Result<Self> {
801        let root_block_index = meta.block_count - 1;
802        let block: RcBytes = read_block_generic(&mmap, &meta, root_block_index)?;
803        let block_type = block[0];
804
805        // The builder always writes an index block as the root block.
806        if block_type != BLOCK_TYPE_INDEX {
807            bail!("Root block must be an index block");
808        }
809        let block_len = block.len();
810        ensure!(block_len >= 3, "index block too short");
811        let index_entries = block.slice(1..block_len);
812        let first_child = be::read_u16(&index_entries);
813        // Index block body layout: [first_child: u16] [hash: u64, block: u16]*
814        // Compute total key block references (first_child + N boundary entries)
815        // using ceil division: (body_len - sizeof(first_child) + ENTRY_SIZE - 1) / ENTRY_SIZE + 1
816        // simplified to (body_len + ENTRY_SIZE - 2) / ENTRY_SIZE
817        let num_index_entries: usize = (index_entries.len() + INDEX_BLOCK_ENTRY_SIZE
818            - size_of::<u16>())
819            / INDEX_BLOCK_ENTRY_SIZE;
820
821        let current_key_block = Self::parse_key_block(&mmap, &meta, first_child)?;
822        Ok(StaticSortedFileIter {
823            mmap,
824            meta,
825            index_entries,
826            num_index_entries,
827            index_pos: 1,
828            current_key_block,
829            value_block_cache: None,
830        })
831    }
832
833    /// Parses a key block at the given index, returning `RcBytes`-backed data.
834    fn parse_key_block(
835        mmap: &Rc<Mmap>,
836        meta: &StaticSortedFileMetaData,
837        block_index: u16,
838    ) -> Result<CurrentKeyBlock> {
839        let block: RcBytes = read_block_generic(mmap, meta, block_index)?;
840        let data = &*block;
841        ensure!(data.len() >= 4, "key block too short");
842        let block_type = data[0];
843        let entry_count = be::read_u24(&data[1..]);
844        match block_type {
845            BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
846                let hash_len = if block_type == BLOCK_TYPE_KEY_WITH_HASH {
847                    8
848                } else {
849                    0
850                };
851                let n = entry_count as usize;
852                let offsets_range = 4..4 + n * 4;
853                let entries_range = 4 + n * 4..block.len();
854                let offsets = block.clone().slice(offsets_range);
855                let entries = block.slice(entries_range);
856                Ok(CurrentKeyBlock {
857                    kind: CurrentKeyBlockKind::Variable { offsets, hash_len },
858                    entries,
859                    entry_count,
860                    index: 0,
861                })
862            }
863            BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
864                let hash_len = if block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH {
865                    8
866                } else {
867                    0
868                };
869                let key_size = data[4] as usize;
870                let value_type = data[5];
871                let val_size = entry_val_size(value_type)?;
872                let stride = hash_len as usize + key_size + val_size;
873                // Header is 6 bytes for fixed-size blocks
874                let entries_range = 6..block.len();
875                let entries = block.slice(entries_range);
876                Ok(CurrentKeyBlock {
877                    kind: CurrentKeyBlockKind::Fixed {
878                        hash_len,
879                        key_size,
880                        value_type,
881                        stride,
882                    },
883                    entries,
884                    entry_count,
885                    index: 0,
886                })
887            }
888            _ => {
889                bail!("Invalid key block type: {block_type}");
890            }
891        }
892    }
893
894    /// Gets the next entry in the file and moves the cursor.
895    fn next_internal(&mut self) -> Result<Option<LookupEntry>> {
896        loop {
897            let kb = &mut self.current_key_block;
898            if kb.index < kb.entry_count {
899                let index = kb.index as usize;
900                let entry_count = kb.entry_count as usize;
901                let GetKeyEntryResult { hash, key, ty, val } = match &kb.kind {
902                    CurrentKeyBlockKind::Variable { offsets, hash_len } => {
903                        get_key_entry(offsets, &kb.entries, entry_count, index, *hash_len)?
904                    }
905                    CurrentKeyBlockKind::Fixed {
906                        hash_len,
907                        key_size,
908                        value_type,
909                        stride,
910                    } => get_fixed_key_entry(
911                        &kb.entries,
912                        index,
913                        *hash_len,
914                        *key_size,
915                        *value_type,
916                        *stride,
917                    ),
918                };
919                let full_hash = if hash.is_empty() {
920                    crate::key::hash_key(&key)
921                } else {
922                    be::read_u64(hash)
923                };
924                let value = if ty == KEY_BLOCK_ENTRY_TYPE_MEDIUM {
925                    let block = be::read_u16(val);
926                    let (uncompressed_size, checksum, block) =
927                        get_raw_block_generic(&self.mmap, &self.meta, block)?;
928                    IterValue::Medium {
929                        uncompressed_size,
930                        checksum,
931                        block,
932                    }
933                } else {
934                    handle_key_match_generic(
935                        &self.mmap,
936                        &self.meta,
937                        ty,
938                        val,
939                        &kb.entries,
940                        &mut self.value_block_cache,
941                    )?
942                    .into()
943                };
944                let entry = LookupEntry {
945                    hash: full_hash,
946                    key: unsafe { kb.entries.slice_from_subslice(key) },
947                    value,
948                };
949                kb.index += 1;
950                return Ok(Some(entry));
951            }
952            if self.index_pos < self.num_index_entries {
953                let base = self.index_pos * INDEX_BLOCK_ENTRY_SIZE;
954                let block_index = be::read_u16(&self.index_entries[base..]);
955                self.index_pos += 1;
956                self.current_key_block =
957                    Self::parse_key_block(&self.mmap, &self.meta, block_index)?;
958            } else {
959                return Ok(None);
960            }
961        }
962    }
963}
964
965struct GetKeyEntryResult<'l> {
966    hash: &'l [u8],
967    key: &'l [u8],
968    ty: u8,
969    val: &'l [u8],
970}
971
972/// Compares a query (full_hash + query_key) against an entry (entry_hash + entry_key).
973/// Returns the ordering of query relative to entry.
974/// When entry_hash is empty, computes full hash from entry_key.
975fn compare_hash_key<K: QueryKey>(
976    entry_hash: &[u8],
977    entry_key: &[u8],
978    full_hash: u64,
979    query_key: &K,
980) -> Ordering {
981    if entry_hash.is_empty() {
982        // No hash stored - compute full hash from entry key
983        let entry_full_hash = crate::key::hash_key(&entry_key);
984        match full_hash.cmp(&entry_full_hash) {
985            Ordering::Equal => query_key.cmp(entry_key),
986            ord => ord,
987        }
988    } else {
989        // Full 8-byte hash stored - compare hashes first
990        let full_hash_bytes = full_hash.to_be_bytes();
991        match full_hash_bytes[..].cmp(entry_hash) {
992            Ordering::Equal => query_key.cmp(entry_key),
993            ord => ord,
994        }
995    }
996}
997
998/// Checks if a query key equals an entry key, optionally comparing stored hashes first.
999/// When a hash is stored (8 bytes), compares hashes before keys for speed.
1000/// When no hash is stored, compares keys directly (avoiding hash recomputation).
1001fn entry_matches_key<K: QueryKey>(
1002    entry_hash: &[u8],
1003    entry_key: &[u8],
1004    full_hash: u64,
1005    query_key: &K,
1006) -> bool {
1007    if entry_hash.is_empty() {
1008        // No hash stored - compare keys directly instead of recomputing hash
1009        query_key.cmp(entry_key) == Ordering::Equal
1010    } else {
1011        // Hash stored - cheap 8-byte comparison first, then key comparison
1012        full_hash.to_be_bytes()[..] == *entry_hash && query_key.cmp(entry_key) == Ordering::Equal
1013    }
1014}
1015
1016/// Returns the byte size of the value portion for a given key block entry type.
1017fn entry_val_size(ty: u8) -> Result<usize> {
1018    match ty {
1019        KEY_BLOCK_ENTRY_TYPE_SMALL => Ok(SMALL_VALUE_REF_SIZE),
1020        KEY_BLOCK_ENTRY_TYPE_MEDIUM => Ok(MEDIUM_VALUE_REF_SIZE),
1021        KEY_BLOCK_ENTRY_TYPE_BLOB => Ok(BLOB_VALUE_REF_SIZE),
1022        KEY_BLOCK_ENTRY_TYPE_DELETED => Ok(DELETED_VALUE_REF_SIZE),
1023        ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
1024            Ok((ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize)
1025        }
1026        _ => bail!("Invalid key block entry type: {ty}"),
1027    }
1028}
1029
1030/// Reads the type and start offset from an offset table entry.
1031/// Each entry is 4 bytes: 1 byte type + 3 bytes BE offset.
1032#[inline(always)]
1033fn read_offset_entry(offsets: &[u8], index: usize) -> (u8, usize) {
1034    let base = index * 4;
1035    let word = be::read_u32(&offsets[base..]);
1036    let ty = (word >> 24) as u8;
1037    let offset = (word & 0x00FF_FFFF) as usize;
1038    (ty, offset)
1039}
1040
1041/// Reads a key entry from a key block.
1042fn get_key_entry<'l>(
1043    offsets: &[u8],
1044    entries: &'l [u8],
1045    entry_count: usize,
1046    index: usize,
1047    hash_len: u8,
1048) -> Result<GetKeyEntryResult<'l>> {
1049    let hash_len_usize = hash_len as usize;
1050    let (ty, start) = read_offset_entry(offsets, index);
1051    let end = if index == entry_count - 1 {
1052        entries.len()
1053    } else {
1054        let (_, next_start) = read_offset_entry(offsets, index + 1);
1055        next_start
1056    };
1057    // Return the raw hash bytes slice (0-8 bytes depending on hash_len)
1058    let hash = &entries[start..start + hash_len_usize];
1059    let val_size = entry_val_size(ty)?;
1060    Ok(GetKeyEntryResult {
1061        hash,
1062        key: &entries[start + hash_len_usize..end - val_size],
1063        ty,
1064        val: &entries[end - val_size..end],
1065    })
1066}
1067
1068/// Reads a key entry from a fixed-size key block by direct indexing.
1069///
1070/// All entries have the same key size and value type, so positions are computed
1071/// arithmetically with no offset table indirection.
1072fn get_fixed_key_entry<'l>(
1073    entries: &'l [u8],
1074    index: usize,
1075    hash_len: u8,
1076    key_size: usize,
1077    value_type: u8,
1078    stride: usize,
1079) -> GetKeyEntryResult<'l> {
1080    let hash_len_usize = hash_len as usize;
1081    let start = index * stride;
1082    GetKeyEntryResult {
1083        hash: &entries[start..start + hash_len_usize],
1084        key: &entries[start + hash_len_usize..start + hash_len_usize + key_size],
1085        ty: value_type,
1086        val: &entries[start + hash_len_usize + key_size..(index + 1) * stride],
1087    }
1088}