Skip to main content

turbo_persistence/
static_sorted_file.rs

1use std::{
2    cmp::Ordering,
3    fs::File,
4    hash::BuildHasherDefault,
5    path::{Path, PathBuf},
6    sync::Arc,
7};
8
9use anyhow::{Context, Result, bail};
10use byteorder::{BE, ReadBytesExt};
11use memmap2::Mmap;
12use quick_cache::sync::GuardResult;
13use rustc_hash::FxHasher;
14use smallvec::SmallVec;
15
16use crate::{
17    QueryKey,
18    arc_bytes::ArcBytes,
19    compression::{checksum_block, decompress_into_arc},
20    constants::MAX_INLINE_VALUE_SIZE,
21    lookup_entry::{LazyLookupValue, LookupEntry, LookupValue},
22    mmap_helper::advise_mmap_for_persistence,
23    static_sorted_file_builder::BLOCK_HEADER_SIZE,
24};
25
26/// The block header for an index block.
27pub const BLOCK_TYPE_INDEX: u8 = 0;
28/// The block header for a key block with 8-byte hash per entry.
29pub const BLOCK_TYPE_KEY_WITH_HASH: u8 = 1;
30/// The block header for a key block without hash.
31pub const BLOCK_TYPE_KEY_NO_HASH: u8 = 2;
32/// The block header for a fixed-size key block with 8-byte hash per entry.
33pub const BLOCK_TYPE_FIXED_KEY_WITH_HASH: u8 = 3;
34/// The block header for a fixed-size key block without hash.
35pub const BLOCK_TYPE_FIXED_KEY_NO_HASH: u8 = 4;
36
37/// The tag for a small-sized value.
38pub const KEY_BLOCK_ENTRY_TYPE_SMALL: u8 = 0;
39/// The tag for the blob value.
40pub const KEY_BLOCK_ENTRY_TYPE_BLOB: u8 = 1;
41/// The tag for the deleted value.
42pub const KEY_BLOCK_ENTRY_TYPE_DELETED: u8 = 2;
43/// The tag for a medium-sized value.
44pub const KEY_BLOCK_ENTRY_TYPE_MEDIUM: u8 = 3;
45/// The minimum tag for inline values. The actual size is (tag - INLINE_MIN).
46pub const KEY_BLOCK_ENTRY_TYPE_INLINE_MIN: u8 = 8;
47
48/// Encoded size of a small value reference: 2B block index + 2B size + 4B offset.
49pub(crate) const SMALL_VALUE_REF_SIZE: usize = 8;
50/// Encoded size of a medium value reference: 2B block index.
51pub(crate) const MEDIUM_VALUE_REF_SIZE: usize = 2;
52/// Encoded size of a blob value reference: 4B blob id.
53pub(crate) const BLOB_VALUE_REF_SIZE: usize = 4;
54/// Encoded size of a deleted (tombstone) value reference.
55pub(crate) const DELETED_VALUE_REF_SIZE: usize = 0;
56
57// Static assertion: MAX_INLINE_VALUE_SIZE must fit in the key type encoding.
58// Key types 8-255 encode inline values of size 0-247, so max is 255 - 8 = 247.
59const _: () = assert!(
60    MAX_INLINE_VALUE_SIZE <= (u8::MAX - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize,
61    "MAX_INLINE_VALUE_SIZE exceeds what can be encoded in key type byte"
62);
63
64/// The result of a lookup operation.
65pub enum SstLookupResult {
66    /// One or more values were found.
67    Found(SmallVec<[LookupValue; 1]>),
68    /// The key was not found.
69    NotFound,
70}
71
72impl From<LookupValue> for SstLookupResult {
73    fn from(value: LookupValue) -> Self {
74        SstLookupResult::Found(smallvec::smallvec![value])
75    }
76}
77
78#[derive(Clone, Default)]
79pub struct BlockWeighter;
80
81impl quick_cache::Weighter<(u32, u16), ArcBytes> for BlockWeighter {
82    fn weight(&self, _key: &(u32, u16), val: &ArcBytes) -> u64 {
83        if val.is_mmap_backed() {
84            // Mmap-backed blocks are cheap (just a pointer + Arc clone), so we
85            // assign a small fixed weight. Caching them avoids re-parsing block
86            // offsets on every lookup.
87            64
88        } else {
89            val.len() as u64 + 8
90        }
91    }
92}
93
94pub type BlockCache =
95    quick_cache::sync::Cache<(u32, u16), ArcBytes, BlockWeighter, BuildHasherDefault<FxHasher>>;
96
97/// Trait abstracting value block caching for `handle_key_match`.
98///
99/// Implemented by `&BlockCache` (global shared cache for lookups) and
100/// `&mut Option<(u16, ArcBytes)>` (lightweight single-entry cache for
101/// sequential iteration).
102trait ValueBlockCache {
103    fn get_or_read(self, sst: &StaticSortedFile, block_index: u16) -> Result<ArcBytes>;
104}
105
106impl ValueBlockCache for &BlockCache {
107    fn get_or_read(self, sst: &StaticSortedFile, block_index: u16) -> Result<ArcBytes> {
108        let this = &sst;
109        let block = match self.get_value_or_guard(&(this.meta.sequence_number, block_index), None) {
110            GuardResult::Value(block) => block,
111            GuardResult::Guard(guard) => {
112                let block = this.read_small_value_block(block_index)?;
113                let _ = guard.insert(block.clone());
114                block
115            }
116            GuardResult::Timeout => unreachable!(),
117        };
118        Ok(block)
119    }
120}
121
122impl ValueBlockCache for &mut Option<(u16, ArcBytes)> {
123    fn get_or_read(self, sst: &StaticSortedFile, block_index: u16) -> Result<ArcBytes> {
124        if let Some((idx, block)) = self.as_ref()
125            && *idx == block_index
126        {
127            return Ok(block.clone());
128        }
129        let block = sst.read_small_value_block(block_index)?;
130        *self = Some((block_index, block.clone()));
131        Ok(block)
132    }
133}
134
135#[derive(Clone, Copy, Debug)]
136pub struct StaticSortedFileMetaData {
137    /// The sequence number of this file.
138    pub sequence_number: u32,
139    /// The number of blocks in the SST file.
140    pub block_count: u16,
141}
142
143impl StaticSortedFileMetaData {
144    pub fn block_offsets_start(&self, sst_len: usize) -> usize {
145        let bc: usize = self.block_count.into();
146        sst_len - (bc * size_of::<u32>())
147    }
148}
149
150/// A memory mapped SST file.
151pub struct StaticSortedFile {
152    /// The meta file of this file.
153    meta: StaticSortedFileMetaData,
154    /// The memory mapped file.
155    /// We store as an Arc so we can hand out references (via ArcBytes) that can outlive this
156    /// struct (not that we expect them to outlive it by very much)
157    mmap: Arc<Mmap>,
158}
159
160impl StaticSortedFile {
161    /// Opens an SST file at the given path. This memory maps the file, but does not read it yet.
162    /// It's lazy read on demand.
163    pub fn open(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
164        let filename = format!("{:08}.sst", meta.sequence_number);
165        let path = db_path.join(&filename);
166        Self::open_internal(path, meta, false)
167            .with_context(|| format!("Unable to open static sorted file {filename}"))
168    }
169
170    /// Opens an SST file for compaction. Uses MADV_SEQUENTIAL instead of MADV_RANDOM,
171    /// since compaction reads blocks sequentially and benefits from OS read-ahead
172    /// and page reclamation.
173    pub fn open_for_compaction(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
174        let filename = format!("{:08}.sst", meta.sequence_number);
175        let path = db_path.join(&filename);
176        Self::open_internal(path, meta, true)
177            .with_context(|| format!("Unable to open static sorted file {filename}"))
178    }
179
180    fn open_internal(
181        path: PathBuf,
182        meta: StaticSortedFileMetaData,
183        sequential: bool,
184    ) -> Result<Self> {
185        let file = File::open(&path)
186            .with_context(|| format!("Failed to open SST file {}", path.display()))?;
187        let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
188            format!(
189                "Failed to mmap SST file {} ({} bytes)",
190                path.display(),
191                file.metadata().map(|m| m.len()).unwrap_or(0)
192            )
193        })?;
194        #[cfg(unix)]
195        if sequential {
196            mmap.advise(memmap2::Advice::Sequential)?;
197        } else {
198            mmap.advise(memmap2::Advice::Random)?;
199            let offset = meta.block_offsets_start(mmap.len());
200            let _ = mmap.advise_range(memmap2::Advice::Sequential, offset, mmap.len() - offset);
201        }
202        advise_mmap_for_persistence(&mmap)?;
203        let file = Self {
204            meta,
205            mmap: Arc::new(mmap),
206        };
207        Ok(file)
208    }
209
210    /// Consume this file and return an iterator over all entries in sorted order.
211    /// The iterator takes ownership of the SST file, so the mmap and its pages
212    /// are freed when the iterator is dropped.
213    pub fn try_into_iter(self) -> Result<StaticSortedFileIter> {
214        let block_count = self.meta.block_count;
215        let mut iter = StaticSortedFileIter {
216            this: self,
217            stack: Vec::new(),
218            current_key_block: None,
219            value_block_cache: None,
220        };
221        iter.enter_block(block_count - 1)?;
222        Ok(iter)
223    }
224
225    /// Looks up a key in this file.
226    ///
227    /// If `FIND_ALL` is false, returns after finding the first match.
228    /// If `FIND_ALL` is true, returns all entries with the same key (useful for
229    /// keyspaces where keys are hashes and collisions are possible).
230    pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
231        &self,
232        key_hash: u64,
233        key: &K,
234        key_block_cache: &BlockCache,
235        value_block_cache: &BlockCache,
236    ) -> Result<SstLookupResult> {
237        let mut current_block = self.meta.block_count - 1;
238        loop {
239            let mut key_block_arc = self.get_key_block(current_block, key_block_cache)?;
240            let block_type = key_block_arc.read_u8()?;
241            match block_type {
242                BLOCK_TYPE_INDEX => {
243                    current_block = self.lookup_index_block(&key_block_arc, key_hash)?;
244                }
245                BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
246                    let has_hash = block_type == BLOCK_TYPE_KEY_WITH_HASH;
247                    return self.lookup_key_block::<K, FIND_ALL>(
248                        key_block_arc,
249                        key_hash,
250                        key,
251                        has_hash,
252                        value_block_cache,
253                    );
254                }
255                BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
256                    let has_hash = block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH;
257                    return self.lookup_fixed_key_block::<K, FIND_ALL>(
258                        key_block_arc,
259                        key_hash,
260                        key,
261                        has_hash,
262                        value_block_cache,
263                    );
264                }
265                _ => {
266                    bail!("Invalid block type");
267                }
268            }
269        }
270    }
271
272    /// Looks up a hash in a index block.
273    fn lookup_index_block(&self, mut block: &[u8], hash: u64) -> Result<u16> {
274        let first_block = block.read_u16::<BE>()?;
275        // Each entry is 10 bytes: 8 bytes for the hash, 2 bytes for the block index
276        let (entries, remainder) = block.as_chunks::<10>();
277        if entries.is_empty() {
278            return Ok(first_block);
279        }
280        if !remainder.is_empty() {
281            bail!("invalid index block, {} extra bytes", remainder.len())
282        }
283        match entries.binary_search_by(|entry| (&entry[..]).read_u64::<BE>().unwrap().cmp(&hash)) {
284            Ok(i) => Ok((&entries[i][8..]).read_u16::<BE>()?),
285            Err(0) => Ok(first_block),
286            Err(i) => Ok((&entries[i - 1][8..]).read_u16::<BE>()?),
287        }
288    }
289
290    /// Looks up a key in a key block and the value in a value block.
291    ///
292    /// If `FIND_ALL` is false, returns after finding the first match.
293    /// If `FIND_ALL` is true, collects all entries with the same key.
294    fn lookup_key_block<K: QueryKey, const FIND_ALL: bool>(
295        &self,
296        mut block: ArcBytes,
297        key_hash: u64,
298        key: &K,
299        has_hash: bool,
300        value_block_cache: &BlockCache,
301    ) -> Result<SstLookupResult> {
302        let hash_len: u8 = if has_hash { 8 } else { 0 };
303        let entry_count = block.read_u24::<BE>()? as usize;
304        let offsets = &block[..entry_count * 4];
305        let entries = &block[entry_count * 4..];
306
307        self.lookup_block_inner::<K, FIND_ALL>(
308            &block,
309            entry_count,
310            key_hash,
311            key,
312            value_block_cache,
313            |i| get_key_entry(offsets, entries, entry_count, i, hash_len),
314        )
315    }
316
317    /// Looks up a key in a fixed-size key block.
318    ///
319    /// Fixed-size key blocks store entries at predictable offsets (no offset table),
320    /// enabling direct indexing during binary search.
321    fn lookup_fixed_key_block<K: QueryKey, const FIND_ALL: bool>(
322        &self,
323        mut block: ArcBytes,
324        key_hash: u64,
325        key: &K,
326        has_hash: bool,
327        value_block_cache: &BlockCache,
328    ) -> Result<SstLookupResult> {
329        let hash_len: u8 = if has_hash { 8 } else { 0 };
330        let entry_count = block.read_u24::<BE>()? as usize;
331        let key_size = block.read_u8()? as usize;
332        let value_type = block.read_u8()?;
333        let val_size = entry_val_size(value_type)?;
334        let stride = hash_len as usize + key_size + val_size;
335        let entries = &block[..];
336
337        self.lookup_block_inner::<K, FIND_ALL>(
338            &block,
339            entry_count,
340            key_hash,
341            key,
342            value_block_cache,
343            |i| {
344                Ok(get_fixed_key_entry(
345                    entries, i, hash_len, key_size, value_type, stride,
346                ))
347            },
348        )
349    }
350
351    /// Shared binary search + collection logic for both key block variants.
352    ///
353    /// The `get_entry` closure abstracts over the difference between variable-size
354    /// key blocks (offset table lookup) and fixed-size key blocks (stride-based indexing).
355    fn lookup_block_inner<'a, K: QueryKey, const FIND_ALL: bool>(
356        &self,
357        block: &ArcBytes,
358        entry_count: usize,
359        key_hash: u64,
360        key: &K,
361        value_block_cache: &BlockCache,
362        get_entry: impl Fn(usize) -> Result<GetKeyEntryResult<'a>>,
363    ) -> Result<SstLookupResult> {
364        let mut l = 0;
365        let mut r = entry_count;
366        // binary search for a matching key
367        while l < r {
368            let m = (l + r) / 2;
369            let GetKeyEntryResult {
370                hash: mid_hash,
371                key: mid_key,
372                ty,
373                val,
374            } = get_entry(m)?;
375
376            let comparison = compare_hash_key(mid_hash, mid_key, key_hash, key);
377
378            match comparison {
379                Ordering::Less => r = m,
380                Ordering::Equal => {
381                    if !FIND_ALL {
382                        // SingleValue mode: each key has exactly one entry
383                        // this is enforced when writing
384                        let result = self.handle_key_match(ty, val, block, value_block_cache)?;
385                        return Ok(SstLookupResult::Found(SmallVec::from_buf([result])));
386                    }
387                    // FIND_ALL (MultiValue) mode: collect all values for this key.
388                    // Tombstones (Deleted) sort last within each key group, so we
389                    // scan backward to find the start of the key group, then forward
390                    // to collect all entries. The tombstone, if present, will be the
391                    // last entry in the results.
392                    let mut results = SmallVec::new();
393                    for i in (l..m).rev() {
394                        let GetKeyEntryResult {
395                            hash,
396                            key: entry_key,
397                            ty,
398                            val,
399                        } = get_entry(i)?;
400                        if !entry_matches_key(hash, entry_key, key_hash, key) {
401                            break;
402                        }
403                        results.push(self.handle_key_match(ty, val, block, value_block_cache)?);
404                    }
405                    // Technically we could `.reverse()` the items collected by the backwards
406                    // scan, but the only ordering constraint we need to maintain for single
407                    // sst multivalue reads is that a deleted token, if it exists comes last.
408                    // Because all the backwards scan items are strictly before the found item
409                    // we know they don't contain the _last_ item. So we don't care about
410                    // their order.
411
412                    // Add the entry at `m`
413                    results.push(self.handle_key_match(ty, val, block, value_block_cache)?);
414                    for i in (m + 1)..r {
415                        let GetKeyEntryResult {
416                            hash,
417                            key: entry_key,
418                            ty,
419                            val,
420                        } = get_entry(i)?;
421                        if !entry_matches_key(hash, entry_key, key_hash, key) {
422                            break;
423                        }
424                        results.push(self.handle_key_match(ty, val, block, value_block_cache)?);
425                    }
426                    return Ok(SstLookupResult::Found(results));
427                }
428                Ordering::Greater => l = m + 1,
429            }
430        }
431
432        Ok(SstLookupResult::NotFound)
433    }
434
435    /// Handles a key match by looking up the value.
436    fn handle_key_match(
437        &self,
438        ty: u8,
439        mut val: &[u8],
440        key_block_arc: &ArcBytes,
441        value_block_cache: impl ValueBlockCache,
442    ) -> Result<LookupValue> {
443        Ok(match ty {
444            KEY_BLOCK_ENTRY_TYPE_SMALL => {
445                let block = val.read_u16::<BE>()?;
446                let size = val.read_u16::<BE>()? as usize;
447                let position = val.read_u32::<BE>()? as usize;
448                let value = value_block_cache
449                    .get_or_read(self, block)?
450                    .slice(position..position + size);
451                LookupValue::Slice { value }
452            }
453            KEY_BLOCK_ENTRY_TYPE_MEDIUM => {
454                let block = val.read_u16::<BE>()?;
455                let value = self.read_value_block(block)?;
456                LookupValue::Slice { value }
457            }
458            KEY_BLOCK_ENTRY_TYPE_BLOB => {
459                let sequence_number = val.read_u32::<BE>()?;
460                LookupValue::Blob { sequence_number }
461            }
462            KEY_BLOCK_ENTRY_TYPE_DELETED => LookupValue::Deleted,
463            _ => {
464                // Inline value — val is already the correct slice
465                // SAFETY: val points into key_block_arc's data
466                let value = unsafe { key_block_arc.slice_from_subslice(val) };
467                LookupValue::Slice { value }
468            }
469        })
470    }
471
472    /// Gets a key block from the cache or reads it from the file.
473    fn get_key_block(
474        &self,
475        block: u16,
476        key_block_cache: &BlockCache,
477    ) -> Result<ArcBytes, anyhow::Error> {
478        Ok(
479            match key_block_cache.get_value_or_guard(&(self.meta.sequence_number, block), None) {
480                GuardResult::Value(block) => block,
481                GuardResult::Guard(guard) => {
482                    let block = self.read_key_block(block)?;
483                    let _ = guard.insert(block.clone());
484                    block
485                }
486                GuardResult::Timeout => unreachable!(),
487            },
488        )
489    }
490
491    /// Reads a key block from the file.
492    fn read_key_block(&self, block_index: u16) -> Result<ArcBytes> {
493        self.read_block(block_index)
494    }
495
496    /// Reads a small value block from the file.
497    fn read_small_value_block(&self, block_index: u16) -> Result<ArcBytes> {
498        self.read_block(block_index)
499    }
500
501    /// Reads a value block from the file.
502    fn read_value_block(&self, block_index: u16) -> Result<ArcBytes> {
503        self.read_block(block_index)
504    }
505
506    /// Verifies the CRC32 checksum of on-disk block data. Returns an error on mismatch.
507    fn verify_checksum(&self, data: &[u8], expected: u32, block_index: u16) -> Result<()> {
508        let actual = checksum_block(data);
509        if actual != expected {
510            bail!(
511                "Cache corruption detected: checksum mismatch in block {} of {:08}.sst (expected \
512                 {:08x}, got {:08x})",
513                block_index,
514                self.meta.sequence_number,
515                expected,
516                actual
517            );
518        }
519        Ok(())
520    }
521
522    /// Reads a block from the file, decompressing if needed, and verifies its checksum.
523    ///
524    /// The checksum is verified on the raw on-disk data **before** decompression, so
525    /// corruption is caught before passing data to LZ4.
526    #[tracing::instrument(level = "info", name = "reading database block", skip_all)]
527    fn read_block(&self, block_index: u16) -> Result<ArcBytes> {
528        let (uncompressed_length, expected_checksum, block) =
529            self.get_raw_block_slice(block_index).with_context(|| {
530                format!(
531                    "Failed to read raw block {} from {:08}.sst",
532                    block_index, self.meta.sequence_number
533                )
534            })?;
535
536        // Verify checksum on the raw on-disk data before decompression.
537        self.verify_checksum(block, expected_checksum, block_index)?;
538
539        // 0 means the block was not compressed, return the mmap-backed ArcBytes directly
540        if uncompressed_length == 0 {
541            return Ok(self.mmap_slice_to_arc_bytes(block));
542        }
543
544        // Advise Sequential only here: we're about to linearly scan the block
545        // through the decompressor. For uncompressed blocks (returned above)
546        // and lazy medium values (which call get_raw_block directly without
547        // decompressing), the file-level Random advice applies.
548        #[cfg(unix)]
549        let _ = self.mmap.advise_range(
550            memmap2::Advice::Sequential,
551            block.as_ptr() as usize - self.mmap.as_ptr() as usize,
552            block.len(),
553        );
554
555        let buffer = decompress_into_arc(uncompressed_length, block).with_context(|| {
556            format!(
557                "Failed to decompress block {} from {:08}.sst ({} bytes uncompressed)",
558                block_index, self.meta.sequence_number, uncompressed_length
559            )
560        })?;
561        Ok(ArcBytes::from(buffer))
562    }
563
564    /// Returns `(uncompressed_length, block_data)` as an owned `ArcBytes` backed by
565    /// the mmap. Only use this when the block data needs to outlive the current borrow
566    /// (e.g. medium values stored in `LookupEntry`).
567    fn get_raw_block(&self, block_index: u16) -> Result<(u32, u32, ArcBytes)> {
568        let (uncompressed_length, checksum, block) = self.get_raw_block_slice(block_index)?;
569        Ok((
570            uncompressed_length,
571            checksum,
572            self.mmap_slice_to_arc_bytes(block),
573        ))
574    }
575
576    /// Promotes a mmap subslice to an owned `ArcBytes`. This clones the `Arc<Mmap>`.
577    fn mmap_slice_to_arc_bytes(&self, subslice: &[u8]) -> ArcBytes {
578        // SAFETY: callers guarantee subslice points into self.mmap.
579        unsafe { ArcBytes::from_mmap(self.mmap.clone(), subslice) }
580    }
581
582    /// Gets the raw block slice directly from the memory mapped file, without
583    /// cloning the `Arc<Mmap>`. The returned slice borrows from the mmap.
584    fn get_raw_block_slice(&self, block_index: u16) -> Result<(u32, u32, &[u8])> {
585        #[cfg(feature = "strict_checks")]
586        if block_index >= self.meta.block_count {
587            bail!(
588                "Corrupted file seq:{} block:{} > number of blocks {} (block_offsets: {:x})",
589                self.meta.sequence_number,
590                block_index,
591                self.meta.block_count,
592                self.meta.block_offsets_start(self.mmap.len()),
593            );
594        }
595        let offset = self.meta.block_offsets_start(self.mmap.len()) + block_index as usize * 4;
596        #[cfg(feature = "strict_checks")]
597        if offset + 4 > self.mmap.len() {
598            bail!(
599                "Corrupted file seq:{} block:{} block offset locations {} + 4 bytes > file end {} \
600                 (block_offsets: {:x})",
601                self.meta.sequence_number,
602                block_index,
603                offset,
604                self.mmap.len(),
605                self.meta.block_offsets_start(self.mmap.len()),
606            );
607        }
608        let block_start = if block_index == 0 {
609            0
610        } else {
611            (&self.mmap[offset - 4..offset])
612                .read_u32::<BE>()
613                .with_context(|| {
614                    format!(
615                        "Failed to read block_start offset for block {} in {:08}.sst",
616                        block_index, self.meta.sequence_number
617                    )
618                })? as usize
619        };
620        let block_end = (&self.mmap[offset..offset + 4])
621            .read_u32::<BE>()
622            .with_context(|| {
623                format!(
624                    "Failed to read block_end offset for block {} in {:08}.sst",
625                    block_index, self.meta.sequence_number
626                )
627            })? as usize;
628        #[cfg(feature = "strict_checks")]
629        if block_end > self.mmap.len() || block_start > self.mmap.len() {
630            bail!(
631                "Corrupted file seq:{} block:{} block {} - {} > file end {} (block_offsets: {:x})",
632                self.meta.sequence_number,
633                block_index,
634                block_start,
635                block_end,
636                self.mmap.len(),
637                self.meta.block_offsets_start(self.mmap.len()),
638            );
639        }
640        let uncompressed_length = u32::from_be_bytes(
641            self.mmap[block_start..block_start + 4]
642                .try_into()
643                .with_context(|| {
644                    format!(
645                        "Failed to read uncompressed_length from block {} header in {:08}.sst",
646                        block_index, self.meta.sequence_number
647                    )
648                })?,
649        );
650        let checksum = u32::from_be_bytes(
651            self.mmap[block_start + 4..block_start + 8]
652                .try_into()
653                .with_context(|| {
654                    format!(
655                        "Failed to read checksum from block {} header in {:08}.sst",
656                        block_index, self.meta.sequence_number
657                    )
658                })?,
659        );
660        let block = &self.mmap[block_start + BLOCK_HEADER_SIZE..block_end];
661        Ok((uncompressed_length, checksum, block))
662    }
663}
664
665/// An iterator over all entries in a SST file in sorted order.
666pub struct StaticSortedFileIter {
667    this: StaticSortedFile,
668
669    stack: Vec<CurrentIndexBlock>,
670    current_key_block: Option<CurrentKeyBlock>,
671    /// Single-entry value block cache. Within a key block, entries reference
672    /// value blocks sequentially and don't revisit earlier blocks, so caching
673    /// just the current one avoids redundant decompression.
674    value_block_cache: Option<(u16, ArcBytes)>,
675}
676
677enum CurrentKeyBlockKind {
678    /// Variable-size entries with an offset table for random access.
679    Variable { offsets: ArcBytes, hash_len: u8 },
680    /// Fixed-size entries with uniform key size and value type (no offset table).
681    Fixed {
682        hash_len: u8,
683        key_size: usize,
684        value_type: u8,
685        stride: usize,
686    },
687}
688
689struct CurrentKeyBlock {
690    kind: CurrentKeyBlockKind,
691    entries: ArcBytes,
692    entry_count: usize,
693    index: usize,
694}
695
696struct CurrentIndexBlock {
697    entries: ArcBytes,
698    block_indices_count: usize,
699    index: usize,
700}
701
702impl Iterator for StaticSortedFileIter {
703    type Item = Result<LookupEntry>;
704
705    fn next(&mut self) -> Option<Self::Item> {
706        self.next_internal().transpose()
707    }
708}
709
710impl StaticSortedFileIter {
711    /// Enters a block at the given index.
712    fn enter_block(&mut self, block_index: u16) -> Result<()> {
713        let block_arc = self.this.read_key_block(block_index)?;
714        let mut block = &*block_arc;
715        let block_type = block.read_u8()?;
716        match block_type {
717            BLOCK_TYPE_INDEX => {
718                let block_indices_count = (block.len() + 8) / 10;
719                let range = 1..block_arc.len();
720                self.stack.push(CurrentIndexBlock {
721                    entries: block_arc.slice(range),
722                    block_indices_count,
723                    index: 0,
724                });
725            }
726            BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
727                let has_hash = block_type == BLOCK_TYPE_KEY_WITH_HASH;
728                let hash_len = if has_hash { 8 } else { 0 };
729                let entry_count = block.read_u24::<BE>()? as usize;
730                let offsets_range = 4..4 + entry_count * 4;
731                let entries_range = 4 + entry_count * 4..block_arc.len();
732                let offsets = block_arc.clone().slice(offsets_range);
733                let entries = block_arc.slice(entries_range);
734                self.current_key_block = Some(CurrentKeyBlock {
735                    kind: CurrentKeyBlockKind::Variable { offsets, hash_len },
736                    entries,
737                    entry_count,
738                    index: 0,
739                });
740            }
741            BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
742                let has_hash = block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH;
743                let hash_len = if has_hash { 8 } else { 0 };
744                let entry_count = block.read_u24::<BE>()? as usize;
745                let key_size = block.read_u8()? as usize;
746                let value_type = block.read_u8()?;
747                let val_size = entry_val_size(value_type)?;
748                let stride = hash_len as usize + key_size + val_size;
749                // Header is 6 bytes for fixed-size blocks
750                let entries_range = 6..block_arc.len();
751                let entries = block_arc.slice(entries_range);
752                self.current_key_block = Some(CurrentKeyBlock {
753                    kind: CurrentKeyBlockKind::Fixed {
754                        hash_len,
755                        key_size,
756                        value_type,
757                        stride,
758                    },
759                    entries,
760                    entry_count,
761                    index: 0,
762                });
763            }
764            _ => {
765                bail!("Invalid block type");
766            }
767        }
768        Ok(())
769    }
770
771    /// Gets the next entry in the file and moves the cursor.
772    fn next_internal(&mut self) -> Result<Option<LookupEntry>> {
773        loop {
774            if let Some(CurrentKeyBlock {
775                kind,
776                entries,
777                entry_count,
778                index,
779            }) = self.current_key_block.take()
780            {
781                let GetKeyEntryResult { hash, key, ty, val } = match &kind {
782                    CurrentKeyBlockKind::Variable { offsets, hash_len } => {
783                        get_key_entry(offsets, &entries, entry_count, index, *hash_len)?
784                    }
785                    CurrentKeyBlockKind::Fixed {
786                        hash_len,
787                        key_size,
788                        value_type,
789                        stride,
790                    } => get_fixed_key_entry(
791                        &entries,
792                        index,
793                        *hash_len,
794                        *key_size,
795                        *value_type,
796                        *stride,
797                    ),
798                };
799                // Convert hash slice to u64, computing from key if no hash stored
800                let full_hash = if hash.is_empty() {
801                    crate::key::hash_key(&key)
802                } else {
803                    u64::from_be_bytes(hash.try_into().unwrap())
804                };
805                let value = if ty == KEY_BLOCK_ENTRY_TYPE_MEDIUM {
806                    let mut val = val;
807                    let block = val.read_u16::<BE>()?;
808                    let (uncompressed_size, checksum, block) = self.this.get_raw_block(block)?;
809                    LazyLookupValue::Medium {
810                        uncompressed_size,
811                        checksum,
812                        block,
813                    }
814                } else {
815                    let value = self.this.handle_key_match(
816                        ty,
817                        val,
818                        &entries,
819                        &mut self.value_block_cache,
820                    )?;
821                    LazyLookupValue::Eager(value)
822                };
823                let entry = LookupEntry {
824                    hash: full_hash,
825                    // SAFETY: key points into entries which is backed by the same Arc
826                    key: unsafe { entries.slice_from_subslice(key) },
827                    value,
828                };
829                if index + 1 < entry_count {
830                    self.current_key_block = Some(CurrentKeyBlock {
831                        kind,
832                        entries,
833                        entry_count,
834                        index: index + 1,
835                    });
836                }
837                return Ok(Some(entry));
838            }
839            if let Some(CurrentIndexBlock {
840                entries,
841                block_indices_count,
842                index,
843            }) = self.stack.pop()
844            {
845                let block_index = (&entries[index * 10..]).read_u16::<BE>()?;
846                if index + 1 < block_indices_count {
847                    self.stack.push(CurrentIndexBlock {
848                        entries,
849                        block_indices_count,
850                        index: index + 1,
851                    });
852                }
853                self.enter_block(block_index)?;
854            } else {
855                return Ok(None);
856            }
857        }
858    }
859}
860
861struct GetKeyEntryResult<'l> {
862    hash: &'l [u8],
863    key: &'l [u8],
864    ty: u8,
865    val: &'l [u8],
866}
867
868/// Compares a query (full_hash + query_key) against an entry (entry_hash + entry_key).
869/// Returns the ordering of query relative to entry.
870/// When entry_hash is empty, computes full hash from entry_key.
871fn compare_hash_key<K: QueryKey>(
872    entry_hash: &[u8],
873    entry_key: &[u8],
874    full_hash: u64,
875    query_key: &K,
876) -> Ordering {
877    if entry_hash.is_empty() {
878        // No hash stored - compute full hash from entry key
879        let entry_full_hash = crate::key::hash_key(&entry_key);
880        match full_hash.cmp(&entry_full_hash) {
881            Ordering::Equal => query_key.cmp(entry_key),
882            ord => ord,
883        }
884    } else {
885        // Full 8-byte hash stored - compare hashes first
886        let full_hash_bytes = full_hash.to_be_bytes();
887        match full_hash_bytes[..].cmp(entry_hash) {
888            Ordering::Equal => query_key.cmp(entry_key),
889            ord => ord,
890        }
891    }
892}
893
894/// Checks if a query key equals an entry key, optionally comparing stored hashes first.
895/// When a hash is stored (8 bytes), compares hashes before keys for speed.
896/// When no hash is stored, compares keys directly (avoiding hash recomputation).
897fn entry_matches_key<K: QueryKey>(
898    entry_hash: &[u8],
899    entry_key: &[u8],
900    full_hash: u64,
901    query_key: &K,
902) -> bool {
903    if entry_hash.is_empty() {
904        // No hash stored - compare keys directly instead of recomputing hash
905        query_key.cmp(entry_key) == Ordering::Equal
906    } else {
907        // Hash stored - cheap 8-byte comparison first, then key comparison
908        full_hash.to_be_bytes()[..] == *entry_hash && query_key.cmp(entry_key) == Ordering::Equal
909    }
910}
911
912/// Returns the byte size of the value portion for a given key block entry type.
913fn entry_val_size(ty: u8) -> Result<usize> {
914    match ty {
915        KEY_BLOCK_ENTRY_TYPE_SMALL => Ok(SMALL_VALUE_REF_SIZE),
916        KEY_BLOCK_ENTRY_TYPE_MEDIUM => Ok(MEDIUM_VALUE_REF_SIZE),
917        KEY_BLOCK_ENTRY_TYPE_BLOB => Ok(BLOB_VALUE_REF_SIZE),
918        KEY_BLOCK_ENTRY_TYPE_DELETED => Ok(DELETED_VALUE_REF_SIZE),
919        ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
920            Ok((ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize)
921        }
922        _ => bail!("Invalid key block entry type: {ty}"),
923    }
924}
925
926/// Reads a key entry from a key block.
927fn get_key_entry<'l>(
928    offsets: &[u8],
929    entries: &'l [u8],
930    entry_count: usize,
931    index: usize,
932    hash_len: u8,
933) -> Result<GetKeyEntryResult<'l>> {
934    let hash_len_usize = hash_len as usize;
935    let mut offset = &offsets[index * 4..];
936    let ty = offset.read_u8()?;
937    let start = offset.read_u24::<BE>()? as usize;
938    let end = if index == entry_count - 1 {
939        entries.len()
940    } else {
941        (&offsets[(index + 1) * 4 + 1..]).read_u24::<BE>()? as usize
942    };
943    // Return the raw hash bytes slice (0-8 bytes depending on hash_len)
944    let hash = &entries[start..start + hash_len_usize];
945    let val_size = entry_val_size(ty)?;
946    Ok(GetKeyEntryResult {
947        hash,
948        key: &entries[start + hash_len_usize..end - val_size],
949        ty,
950        val: &entries[end - val_size..end],
951    })
952}
953
954/// Reads a key entry from a fixed-size key block by direct indexing.
955///
956/// All entries have the same key size and value type, so positions are computed
957/// arithmetically with no offset table indirection.
958fn get_fixed_key_entry<'l>(
959    entries: &'l [u8],
960    index: usize,
961    hash_len: u8,
962    key_size: usize,
963    value_type: u8,
964    stride: usize,
965) -> GetKeyEntryResult<'l> {
966    let hash_len_usize = hash_len as usize;
967    let start = index * stride;
968    GetKeyEntryResult {
969        hash: &entries[start..start + hash_len_usize],
970        key: &entries[start + hash_len_usize..start + hash_len_usize + key_size],
971        ty: value_type,
972        val: &entries[start + hash_len_usize + key_size..(index + 1) * stride],
973    }
974}