1use std::{
2 cmp::Ordering,
3 fs::File,
4 hash::BuildHasherDefault,
5 path::Path,
6 rc::Rc,
7 sync::{
8 Arc,
9 atomic::{AtomicU64, Ordering as AtomicOrdering},
10 },
11};
12
13use anyhow::{Context, Result, bail, ensure};
14use memmap2::Mmap;
15use quick_cache::{Lifecycle, sync::GuardResult};
16use rustc_hash::FxHasher;
17use smallvec::SmallVec;
18
19use crate::{
20 QueryKey,
21 arc_bytes::ArcBytes,
22 be,
23 compression::checksum_block,
24 constants::MAX_INLINE_VALUE_SIZE,
25 lookup_entry::{IterValue, LookupEntry, LookupValue},
26 mmap_helper::advise_mmap_for_persistence,
27 rc_bytes::RcBytes,
28 shared_bytes::SharedBytes,
29 static_sorted_file_builder::{BLOCK_HEADER_SIZE, INDEX_BLOCK_ENTRY_SIZE},
30};
31
32pub const BLOCK_TYPE_INDEX: u8 = 0;
34pub const BLOCK_TYPE_KEY_WITH_HASH: u8 = 1;
36pub const BLOCK_TYPE_KEY_NO_HASH: u8 = 2;
38pub const BLOCK_TYPE_FIXED_KEY_WITH_HASH: u8 = 3;
40pub const BLOCK_TYPE_FIXED_KEY_NO_HASH: u8 = 4;
42
43pub const KEY_BLOCK_ENTRY_TYPE_SMALL: u8 = 0;
45pub const KEY_BLOCK_ENTRY_TYPE_BLOB: u8 = 1;
47pub const KEY_BLOCK_ENTRY_TYPE_DELETED: u8 = 2;
49pub const KEY_BLOCK_ENTRY_TYPE_MEDIUM: u8 = 3;
51pub const KEY_BLOCK_ENTRY_TYPE_INLINE_MIN: u8 = 8;
53
54pub(crate) const SMALL_VALUE_REF_SIZE: usize = 8;
56pub(crate) const MEDIUM_VALUE_REF_SIZE: usize = 2;
58pub(crate) const BLOB_VALUE_REF_SIZE: usize = 4;
60pub(crate) const DELETED_VALUE_REF_SIZE: usize = 0;
62
63const _: () = assert!(
66 MAX_INLINE_VALUE_SIZE <= (u8::MAX - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize,
67 "MAX_INLINE_VALUE_SIZE exceeds what can be encoded in key type byte"
68);
69
70pub enum SstLookupResult {
72 Found(SmallVec<[LookupValue; 1]>),
74 NotFound,
76}
77
78impl From<LookupValue> for SstLookupResult {
79 fn from(value: LookupValue) -> Self {
80 SstLookupResult::Found(smallvec::smallvec![value])
81 }
82}
83
84#[derive(Clone, Default)]
85pub struct BlockWeighter;
86
87impl quick_cache::Weighter<(u32, u16), ArcBytes> for BlockWeighter {
88 fn weight(&self, _key: &(u32, u16), val: &ArcBytes) -> u64 {
89 if val.is_mmap_backed() {
90 debug_assert!(
93 !val.is_mmap_backed(),
94 "mmap-backed block should not be inserted into BlockCache"
95 );
96 64
97 } else {
98 val.len() as u64 + 8
99 }
100 }
101}
102
103#[derive(Clone, Default)]
106pub struct BlockCacheLifecycle;
107
108impl Lifecycle<(u32, u16), ArcBytes> for BlockCacheLifecycle {
109 type RequestState = ();
110
111 #[inline]
112 fn is_pinned(&self, _key: &(u32, u16), val: &ArcBytes) -> bool {
113 val.is_shared_arc()
114 }
115
116 #[inline]
117 fn begin_request(&self) -> Self::RequestState {}
118
119 #[inline]
120 fn on_evict(&self, _state: &mut Self::RequestState, _key: (u32, u16), _val: ArcBytes) {}
121}
122
123pub type BlockCache = quick_cache::sync::Cache<
124 (u32, u16),
125 ArcBytes,
126 BlockWeighter,
127 BuildHasherDefault<FxHasher>,
128 BlockCacheLifecycle,
129>;
130
131trait ValueBlockCache<B: SharedBytes> {
138 fn get_or_read(
139 self,
140 mmap: &B::MmapHandle,
141 meta: &StaticSortedFileMetaData,
142 block_index: u16,
143 ) -> Result<B>;
144}
145
146#[derive(Clone, Copy)]
149struct ArcBlockCacheReader<'a> {
150 cache: &'a BlockCache,
151 verified_blocks: &'a [AtomicU64],
152}
153
154impl ValueBlockCache<ArcBytes> for ArcBlockCacheReader<'_> {
157 fn get_or_read(
158 self,
159 mmap: &Arc<Mmap>,
160 meta: &StaticSortedFileMetaData,
161 block_index: u16,
162 ) -> Result<ArcBytes> {
163 get_or_cache_block(mmap, meta, block_index, self.cache, self.verified_blocks)
164 }
165}
166
167impl ValueBlockCache<RcBytes> for &mut Option<(u16, RcBytes)> {
169 fn get_or_read(
170 self,
171 mmap: &Rc<Mmap>,
172 meta: &StaticSortedFileMetaData,
173 block_index: u16,
174 ) -> Result<RcBytes> {
175 if let Some((idx, block)) = self.as_ref()
176 && *idx == block_index
177 {
178 return Ok(block.clone());
179 }
180 let block: RcBytes = read_block_generic(mmap, meta, block_index)?;
181 *self = Some((block_index, block.clone()));
182 Ok(block)
183 }
184}
185
186#[derive(Clone, Copy, Debug)]
187pub struct StaticSortedFileMetaData {
188 pub sequence_number: u32,
190 pub block_count: u16,
192}
193
194impl StaticSortedFileMetaData {
195 pub fn block_offsets_start(&self, sst_len: usize) -> usize {
196 let bc: usize = self.block_count.into();
197 sst_len - (bc * size_of::<u32>())
198 }
199}
200
201pub struct StaticSortedFile {
203 meta: StaticSortedFileMetaData,
205 mmap: Arc<Mmap>,
209 verified_blocks: Box<[AtomicU64]>,
214}
215
216impl StaticSortedFile {
217 pub fn open(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
220 let filename = format!("{:08}.sst", meta.sequence_number);
221 let path = db_path.join(&filename);
222 let file = File::open(&path)
223 .with_context(|| format!("Failed to open SST file {}", path.display()))?;
224 let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
225 format!(
226 "Failed to mmap SST file {} ({} bytes)",
227 path.display(),
228 file.metadata().map(|m| m.len()).unwrap_or(0)
229 )
230 })?;
231 #[cfg(unix)]
232 {
233 mmap.advise(memmap2::Advice::Random)?;
234 let offset = meta.block_offsets_start(mmap.len());
235 let _ = mmap.advise_range(memmap2::Advice::Sequential, offset, mmap.len() - offset);
236 }
237 advise_mmap_for_persistence(&mmap)?;
238 let bitmap_words = (meta.block_count as usize).div_ceil(u64::BITS as usize);
239 let verified_blocks = (0..bitmap_words)
240 .map(|_| AtomicU64::new(0))
241 .collect::<Box<[_]>>();
242 Ok(Self {
243 meta,
244 mmap: Arc::new(mmap),
245 verified_blocks,
246 })
247 }
248
249 pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
255 &self,
256 key_hash: u64,
257 key: &K,
258 key_block_cache: &BlockCache,
259 value_block_cache: &BlockCache,
260 ) -> Result<SstLookupResult> {
261 let index_block_index = self.meta.block_count - 1;
264 let index_block = get_or_cache_block(
265 &self.mmap,
266 &self.meta,
267 index_block_index,
268 key_block_cache,
269 &self.verified_blocks,
270 )?;
271 let key_block_index = self.lookup_index_block(&index_block, key_hash)?;
272
273 let key_block_arc = get_or_cache_block(
274 &self.mmap,
275 &self.meta,
276 key_block_index,
277 key_block_cache,
278 &self.verified_blocks,
279 )?;
280 let reader = ArcBlockCacheReader {
281 cache: value_block_cache,
282 verified_blocks: &self.verified_blocks,
283 };
284 let block_type = be::read_u8(&key_block_arc);
285 match block_type {
286 BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
287 let has_hash = block_type == BLOCK_TYPE_KEY_WITH_HASH;
288 self.lookup_key_block::<K, FIND_ALL>(key_block_arc, key_hash, key, has_hash, reader)
289 }
290
291 BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
292 let has_hash = block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH;
293 self.lookup_fixed_key_block::<K, FIND_ALL>(
294 key_block_arc,
295 key_hash,
296 key,
297 has_hash,
298 reader,
299 )
300 }
301 _ => {
302 bail!("Invalid block type");
303 }
304 }
305 }
306
307 fn lookup_index_block(&self, block: &[u8], hash: u64) -> Result<u16> {
309 ensure!(block.len() >= 3, "index block too short");
310 debug_assert!(
311 be::read_u8(block) == BLOCK_TYPE_INDEX,
312 "expected index block as last block"
313 );
314 let first_block = be::read_u16(&block[1..]);
315 let (entries, remainder) = block[3..].as_chunks::<INDEX_BLOCK_ENTRY_SIZE>();
316 if entries.is_empty() {
317 return Ok(first_block);
318 }
319 if !remainder.is_empty() {
320 bail!("invalid index block, {} extra bytes", remainder.len())
321 }
322 match entries.binary_search_by(|entry| be::read_u64(entry).cmp(&hash)) {
323 Ok(i) => Ok(be::read_u16(&entries[i][8..])),
324 Err(0) => Ok(first_block),
325 Err(i) => Ok(be::read_u16(&entries[i - 1][8..])),
326 }
327 }
328
329 fn lookup_key_block<K: QueryKey, const FIND_ALL: bool>(
334 &self,
335 block: ArcBytes,
336 key_hash: u64,
337 key: &K,
338 has_hash: bool,
339 reader: ArcBlockCacheReader<'_>,
340 ) -> Result<SstLookupResult> {
341 let hash_len: u8 = if has_hash { 8 } else { 0 };
342 ensure!(block.len() >= 4, "key block too short");
343 let entry_count = be::read_u24(&block[1..]) as usize;
344 let data = &block[4..];
345 ensure!(
346 data.len() >= entry_count * 4,
347 "key block too short for {entry_count} entries"
348 );
349 let offsets = &data[..entry_count * 4];
350 let entries = &data[entry_count * 4..];
351
352 self.lookup_block_inner::<K, FIND_ALL>(&block, entry_count, key_hash, key, reader, |i| {
353 get_key_entry(offsets, entries, entry_count, i, hash_len)
354 })
355 }
356
357 fn lookup_fixed_key_block<K: QueryKey, const FIND_ALL: bool>(
362 &self,
363 block: ArcBytes,
364 key_hash: u64,
365 key: &K,
366 has_hash: bool,
367 reader: ArcBlockCacheReader<'_>,
368 ) -> Result<SstLookupResult> {
369 let hash_len: u8 = if has_hash { 8 } else { 0 };
370 ensure!(block.len() >= 6, "fixed key block too short");
371 let entry_count = be::read_u24(&block[1..]) as usize;
372 let key_size = be::read_u8(&block[4..]) as usize;
373 let value_type = be::read_u8(&block[5..]);
374 let val_size = entry_val_size(value_type)?;
375 let stride = hash_len as usize + key_size + val_size;
376 let entries = &block[6..];
377 ensure!(
378 entries.len() == entry_count * stride,
379 "fixed key block for {entry_count} entries must is the wrong size"
380 );
381
382 self.lookup_block_inner::<K, FIND_ALL>(&block, entry_count, key_hash, key, reader, |i| {
383 Ok(get_fixed_key_entry(
384 entries, i, hash_len, key_size, value_type, stride,
385 ))
386 })
387 }
388
389 fn lookup_block_inner<'a, K: QueryKey, const FIND_ALL: bool>(
394 &self,
395 block: &ArcBytes,
396 entry_count: usize,
397 key_hash: u64,
398 key: &K,
399 reader: ArcBlockCacheReader<'_>,
400 get_entry: impl Fn(usize) -> Result<GetKeyEntryResult<'a>>,
401 ) -> Result<SstLookupResult> {
402 let mut l = 0;
403 let mut r = entry_count;
404 while l < r {
406 let m = (l + r) / 2;
407 let GetKeyEntryResult {
408 hash: mid_hash,
409 key: mid_key,
410 ty,
411 val,
412 } = get_entry(m)?;
413
414 let comparison = compare_hash_key(mid_hash, mid_key, key_hash, key);
415
416 match comparison {
417 Ordering::Less => r = m,
418 Ordering::Equal => {
419 if !FIND_ALL {
420 let result = self.handle_key_match(ty, val, block, reader)?;
423 return Ok(SstLookupResult::Found(SmallVec::from_buf([result])));
424 }
425 let mut results = SmallVec::new();
431 for i in (l..m).rev() {
432 let GetKeyEntryResult {
433 hash,
434 key: entry_key,
435 ty,
436 val,
437 } = get_entry(i)?;
438 if !entry_matches_key(hash, entry_key, key_hash, key) {
439 break;
440 }
441 results.push(self.handle_key_match(ty, val, block, reader)?);
442 }
443 results.push(self.handle_key_match(ty, val, block, reader)?);
452 for i in (m + 1)..r {
453 let GetKeyEntryResult {
454 hash,
455 key: entry_key,
456 ty,
457 val,
458 } = get_entry(i)?;
459 if !entry_matches_key(hash, entry_key, key_hash, key) {
460 break;
461 }
462 results.push(self.handle_key_match(ty, val, block, reader)?);
463 }
464 return Ok(SstLookupResult::Found(results));
465 }
466 Ordering::Greater => l = m + 1,
467 }
468 }
469
470 Ok(SstLookupResult::NotFound)
471 }
472
473 fn handle_key_match(
475 &self,
476 ty: u8,
477 val: &[u8],
478 key_block_arc: &ArcBytes,
479 reader: ArcBlockCacheReader<'_>,
480 ) -> Result<LookupValue> {
481 handle_key_match_generic(&self.mmap, &self.meta, ty, val, key_block_arc, reader)
482 }
483}
484
485fn get_or_cache_block(
494 mmap: &Arc<Mmap>,
495 meta: &StaticSortedFileMetaData,
496 block_index: u16,
497 cache: &BlockCache,
498 verified_blocks: &[AtomicU64],
499) -> Result<ArcBytes> {
500 let (uncompressed_length, checksum, block_data) = get_raw_block_slice(mmap, meta, block_index)
501 .with_context(|| {
502 format!(
503 "Failed to read raw block {} from {:08}.sst",
504 block_index, meta.sequence_number
505 )
506 })?;
507
508 if uncompressed_length == 0 {
509 verify_checksum_once(meta, block_data, checksum, block_index, verified_blocks)?;
511 return Ok(unsafe { ArcBytes::from_mmap(mmap, block_data) });
513 }
514
515 Ok(
517 match cache.get_value_or_guard(&(meta.sequence_number, block_index), None) {
518 GuardResult::Value(block) => block,
519 GuardResult::Guard(guard) => {
520 verify_checksum_once(meta, block_data, checksum, block_index, verified_blocks)?;
523 let block = ArcBytes::from_decompressed(uncompressed_length, block_data)
524 .with_context(|| {
525 format!(
526 "Failed to decompress block {} from {:08}.sst ({} bytes uncompressed)",
527 block_index, meta.sequence_number, uncompressed_length
528 )
529 })?;
530 let _ = guard.insert(block.clone());
531 block
532 }
533 GuardResult::Timeout => unreachable!(),
534 },
535 )
536}
537
538fn get_raw_block_slice<'a>(
541 mmap: &'a Mmap,
542 meta: &StaticSortedFileMetaData,
543 block_index: u16,
544) -> Result<(u32, u32, &'a [u8])> {
545 #[cfg(feature = "strict_checks")]
546 if block_index >= meta.block_count {
547 bail!(
548 "Corrupted file seq:{} block:{} > number of blocks {} (block_offsets: {:x})",
549 meta.sequence_number,
550 block_index,
551 meta.block_count,
552 meta.block_offsets_start(mmap.len()),
553 );
554 }
555 let offset = meta.block_offsets_start(mmap.len()) + block_index as usize * 4;
556 #[cfg(feature = "strict_checks")]
557 if offset + 4 > mmap.len() {
558 bail!(
559 "Corrupted file seq:{} block:{} block offset locations {} + 4 bytes > file end {} \
560 (block_offsets: {:x})",
561 meta.sequence_number,
562 block_index,
563 offset,
564 mmap.len(),
565 meta.block_offsets_start(mmap.len()),
566 );
567 }
568 let block_start = if block_index == 0 {
569 0
570 } else {
571 be::read_u32(&mmap[offset - 4..]) as usize
572 };
573 let block_end = be::read_u32(&mmap[offset..]) as usize;
574 #[cfg(feature = "strict_checks")]
575 if block_end > mmap.len() || block_start > mmap.len() {
576 bail!(
577 "Corrupted file seq:{} block:{} block {} - {} > file end {} (block_offsets: {:x})",
578 meta.sequence_number,
579 block_index,
580 block_start,
581 block_end,
582 mmap.len(),
583 meta.block_offsets_start(mmap.len()),
584 );
585 }
586 ensure!(
587 block_start + BLOCK_HEADER_SIZE <= block_end,
588 "block {} header truncated in {:08}.sst",
589 block_index,
590 meta.sequence_number
591 );
592 let uncompressed_length = be::read_u32(&mmap[block_start..]);
593 let checksum = be::read_u32(&mmap[block_start + 4..]);
594 let block = &mmap[block_start + BLOCK_HEADER_SIZE..block_end];
595 Ok((uncompressed_length, checksum, block))
596}
597
598fn verify_checksum(
600 meta: &StaticSortedFileMetaData,
601 data: &[u8],
602 expected: u32,
603 block_index: u16,
604) -> Result<()> {
605 let actual = checksum_block(data);
606 if actual != expected {
607 bail!(
608 "Cache corruption detected: checksum mismatch in block {} of {:08}.sst (expected \
609 {:08x}, got {:08x})",
610 block_index,
611 meta.sequence_number,
612 expected,
613 actual
614 );
615 }
616 Ok(())
617}
618
619fn verify_checksum_once(
626 meta: &StaticSortedFileMetaData,
627 data: &[u8],
628 expected: u32,
629 block_index: u16,
630 verified_blocks: &[AtomicU64],
631) -> Result<()> {
632 let word_idx = block_index as usize / u64::BITS as usize;
633 let bit = 1u64 << (block_index as usize % u64::BITS as usize);
634 if verified_blocks[word_idx].load(AtomicOrdering::Relaxed) & bit != 0 {
635 return Ok(());
636 }
637 verify_checksum(meta, data, expected, block_index)?;
638 verified_blocks[word_idx].fetch_or(bit, AtomicOrdering::Relaxed);
639 Ok(())
640}
641
642fn get_raw_block_generic<B: SharedBytes>(
645 mmap: &B::MmapHandle,
646 meta: &StaticSortedFileMetaData,
647 block_index: u16,
648) -> Result<(u32, u32, B)> {
649 let (uncompressed_length, checksum, block) = get_raw_block_slice(mmap, meta, block_index)?;
650 Ok((uncompressed_length, checksum, unsafe {
652 B::from_mmap(mmap, block)
653 }))
654}
655
656#[tracing::instrument(level = "info", name = "reading database block", skip_all)]
659fn read_block_generic<B: SharedBytes>(
660 mmap: &B::MmapHandle,
661 meta: &StaticSortedFileMetaData,
662 block_index: u16,
663) -> Result<B> {
664 let (uncompressed_length, expected_checksum, block) =
665 get_raw_block_slice(mmap, meta, block_index).with_context(|| {
666 format!(
667 "Failed to read raw block {} from {:08}.sst",
668 block_index, meta.sequence_number
669 )
670 })?;
671
672 verify_checksum(meta, block, expected_checksum, block_index)?;
673
674 if uncompressed_length == 0 {
675 return Ok(unsafe { B::from_mmap(mmap, block) });
677 }
678
679 let buffer = B::from_decompressed(uncompressed_length, block).with_context(|| {
680 format!(
681 "Failed to decompress block {} from {:08}.sst ({} bytes uncompressed)",
682 block_index, meta.sequence_number, uncompressed_length
683 )
684 })?;
685 Ok(buffer)
686}
687
688fn handle_key_match_generic<B: SharedBytes>(
690 mmap: &B::MmapHandle,
691 meta: &StaticSortedFileMetaData,
692 ty: u8,
693 val: &[u8],
694 key_block: &B,
695 reader: impl ValueBlockCache<B>,
696) -> Result<LookupValue<B>> {
697 Ok(match ty {
698 KEY_BLOCK_ENTRY_TYPE_SMALL => {
699 let block = be::read_u16(val);
700 let size = be::read_u16(&val[2..]) as usize;
701 let position = be::read_u32(&val[4..]) as usize;
702 let value = reader
703 .get_or_read(mmap, meta, block)?
704 .slice(position..position + size);
705 LookupValue::Slice { value }
706 }
707 KEY_BLOCK_ENTRY_TYPE_MEDIUM => {
708 let block = be::read_u16(val);
709 let value = read_block_generic(mmap, meta, block)?;
710 LookupValue::Slice { value }
711 }
712 KEY_BLOCK_ENTRY_TYPE_BLOB => {
713 let sequence_number = be::read_u32(val);
714 LookupValue::Blob { sequence_number }
715 }
716 KEY_BLOCK_ENTRY_TYPE_DELETED => LookupValue::Deleted,
717 _ => {
718 let value = unsafe { key_block.slice_from_subslice(val) };
721 LookupValue::Slice { value }
722 }
723 })
724}
725
726pub struct StaticSortedFileIter {
728 mmap: Rc<Mmap>,
731 meta: StaticSortedFileMetaData,
733
734 index_entries: RcBytes,
737 num_index_entries: usize,
739 index_pos: usize,
741 current_key_block: CurrentKeyBlock,
742 value_block_cache: Option<(u16, RcBytes)>,
746}
747
748enum CurrentKeyBlockKind {
749 Variable { offsets: RcBytes, hash_len: u8 },
751 Fixed {
753 hash_len: u8,
754 key_size: usize,
755 value_type: u8,
756 stride: usize,
757 },
758}
759
760struct CurrentKeyBlock {
761 kind: CurrentKeyBlockKind,
762 entries: RcBytes,
763 entry_count: u32,
765 index: u32,
767}
768
769impl Iterator for StaticSortedFileIter {
770 type Item = Result<LookupEntry>;
771
772 fn next(&mut self) -> Option<Self::Item> {
773 self.next_internal().transpose()
774 }
775}
776
777impl StaticSortedFileIter {
778 pub fn open(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
782 let filename = format!("{:08}.sst", meta.sequence_number);
783 let path = db_path.join(&filename);
784 let file = File::open(&path)
785 .with_context(|| format!("Failed to open SST file {}", path.display()))?;
786 let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
787 format!(
788 "Failed to mmap SST file {} ({} bytes)",
789 path.display(),
790 file.metadata().map(|m| m.len()).unwrap_or(0)
791 )
792 })?;
793 #[cfg(unix)]
794 mmap.advise(memmap2::Advice::Sequential)?;
795 advise_mmap_for_persistence(&mmap)?;
796 Self::new(Rc::new(mmap), meta)
797 .with_context(|| format!("Unable to open static sorted file {filename}"))
798 }
799
800 fn new(mmap: Rc<Mmap>, meta: StaticSortedFileMetaData) -> Result<Self> {
801 let root_block_index = meta.block_count - 1;
802 let block: RcBytes = read_block_generic(&mmap, &meta, root_block_index)?;
803 let block_type = block[0];
804
805 if block_type != BLOCK_TYPE_INDEX {
807 bail!("Root block must be an index block");
808 }
809 let block_len = block.len();
810 ensure!(block_len >= 3, "index block too short");
811 let index_entries = block.slice(1..block_len);
812 let first_child = be::read_u16(&index_entries);
813 let num_index_entries: usize = (index_entries.len() + INDEX_BLOCK_ENTRY_SIZE
818 - size_of::<u16>())
819 / INDEX_BLOCK_ENTRY_SIZE;
820
821 let current_key_block = Self::parse_key_block(&mmap, &meta, first_child)?;
822 Ok(StaticSortedFileIter {
823 mmap,
824 meta,
825 index_entries,
826 num_index_entries,
827 index_pos: 1,
828 current_key_block,
829 value_block_cache: None,
830 })
831 }
832
833 fn parse_key_block(
835 mmap: &Rc<Mmap>,
836 meta: &StaticSortedFileMetaData,
837 block_index: u16,
838 ) -> Result<CurrentKeyBlock> {
839 let block: RcBytes = read_block_generic(mmap, meta, block_index)?;
840 let data = &*block;
841 ensure!(data.len() >= 4, "key block too short");
842 let block_type = data[0];
843 let entry_count = be::read_u24(&data[1..]);
844 match block_type {
845 BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
846 let hash_len = if block_type == BLOCK_TYPE_KEY_WITH_HASH {
847 8
848 } else {
849 0
850 };
851 let n = entry_count as usize;
852 let offsets_range = 4..4 + n * 4;
853 let entries_range = 4 + n * 4..block.len();
854 let offsets = block.clone().slice(offsets_range);
855 let entries = block.slice(entries_range);
856 Ok(CurrentKeyBlock {
857 kind: CurrentKeyBlockKind::Variable { offsets, hash_len },
858 entries,
859 entry_count,
860 index: 0,
861 })
862 }
863 BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
864 let hash_len = if block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH {
865 8
866 } else {
867 0
868 };
869 let key_size = data[4] as usize;
870 let value_type = data[5];
871 let val_size = entry_val_size(value_type)?;
872 let stride = hash_len as usize + key_size + val_size;
873 let entries_range = 6..block.len();
875 let entries = block.slice(entries_range);
876 Ok(CurrentKeyBlock {
877 kind: CurrentKeyBlockKind::Fixed {
878 hash_len,
879 key_size,
880 value_type,
881 stride,
882 },
883 entries,
884 entry_count,
885 index: 0,
886 })
887 }
888 _ => {
889 bail!("Invalid key block type: {block_type}");
890 }
891 }
892 }
893
894 fn next_internal(&mut self) -> Result<Option<LookupEntry>> {
896 loop {
897 let kb = &mut self.current_key_block;
898 if kb.index < kb.entry_count {
899 let index = kb.index as usize;
900 let entry_count = kb.entry_count as usize;
901 let GetKeyEntryResult { hash, key, ty, val } = match &kb.kind {
902 CurrentKeyBlockKind::Variable { offsets, hash_len } => {
903 get_key_entry(offsets, &kb.entries, entry_count, index, *hash_len)?
904 }
905 CurrentKeyBlockKind::Fixed {
906 hash_len,
907 key_size,
908 value_type,
909 stride,
910 } => get_fixed_key_entry(
911 &kb.entries,
912 index,
913 *hash_len,
914 *key_size,
915 *value_type,
916 *stride,
917 ),
918 };
919 let full_hash = if hash.is_empty() {
920 crate::key::hash_key(&key)
921 } else {
922 be::read_u64(hash)
923 };
924 let value = if ty == KEY_BLOCK_ENTRY_TYPE_MEDIUM {
925 let block = be::read_u16(val);
926 let (uncompressed_size, checksum, block) =
927 get_raw_block_generic(&self.mmap, &self.meta, block)?;
928 IterValue::Medium {
929 uncompressed_size,
930 checksum,
931 block,
932 }
933 } else {
934 handle_key_match_generic(
935 &self.mmap,
936 &self.meta,
937 ty,
938 val,
939 &kb.entries,
940 &mut self.value_block_cache,
941 )?
942 .into()
943 };
944 let entry = LookupEntry {
945 hash: full_hash,
946 key: unsafe { kb.entries.slice_from_subslice(key) },
947 value,
948 };
949 kb.index += 1;
950 return Ok(Some(entry));
951 }
952 if self.index_pos < self.num_index_entries {
953 let base = self.index_pos * INDEX_BLOCK_ENTRY_SIZE;
954 let block_index = be::read_u16(&self.index_entries[base..]);
955 self.index_pos += 1;
956 self.current_key_block =
957 Self::parse_key_block(&self.mmap, &self.meta, block_index)?;
958 } else {
959 return Ok(None);
960 }
961 }
962 }
963}
964
965struct GetKeyEntryResult<'l> {
966 hash: &'l [u8],
967 key: &'l [u8],
968 ty: u8,
969 val: &'l [u8],
970}
971
972fn compare_hash_key<K: QueryKey>(
976 entry_hash: &[u8],
977 entry_key: &[u8],
978 full_hash: u64,
979 query_key: &K,
980) -> Ordering {
981 if entry_hash.is_empty() {
982 let entry_full_hash = crate::key::hash_key(&entry_key);
984 match full_hash.cmp(&entry_full_hash) {
985 Ordering::Equal => query_key.cmp(entry_key),
986 ord => ord,
987 }
988 } else {
989 let full_hash_bytes = full_hash.to_be_bytes();
991 match full_hash_bytes[..].cmp(entry_hash) {
992 Ordering::Equal => query_key.cmp(entry_key),
993 ord => ord,
994 }
995 }
996}
997
998fn entry_matches_key<K: QueryKey>(
1002 entry_hash: &[u8],
1003 entry_key: &[u8],
1004 full_hash: u64,
1005 query_key: &K,
1006) -> bool {
1007 if entry_hash.is_empty() {
1008 query_key.cmp(entry_key) == Ordering::Equal
1010 } else {
1011 full_hash.to_be_bytes()[..] == *entry_hash && query_key.cmp(entry_key) == Ordering::Equal
1013 }
1014}
1015
1016fn entry_val_size(ty: u8) -> Result<usize> {
1018 match ty {
1019 KEY_BLOCK_ENTRY_TYPE_SMALL => Ok(SMALL_VALUE_REF_SIZE),
1020 KEY_BLOCK_ENTRY_TYPE_MEDIUM => Ok(MEDIUM_VALUE_REF_SIZE),
1021 KEY_BLOCK_ENTRY_TYPE_BLOB => Ok(BLOB_VALUE_REF_SIZE),
1022 KEY_BLOCK_ENTRY_TYPE_DELETED => Ok(DELETED_VALUE_REF_SIZE),
1023 ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
1024 Ok((ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize)
1025 }
1026 _ => bail!("Invalid key block entry type: {ty}"),
1027 }
1028}
1029
1030#[inline(always)]
1033fn read_offset_entry(offsets: &[u8], index: usize) -> (u8, usize) {
1034 let base = index * 4;
1035 let word = be::read_u32(&offsets[base..]);
1036 let ty = (word >> 24) as u8;
1037 let offset = (word & 0x00FF_FFFF) as usize;
1038 (ty, offset)
1039}
1040
1041fn get_key_entry<'l>(
1043 offsets: &[u8],
1044 entries: &'l [u8],
1045 entry_count: usize,
1046 index: usize,
1047 hash_len: u8,
1048) -> Result<GetKeyEntryResult<'l>> {
1049 let hash_len_usize = hash_len as usize;
1050 let (ty, start) = read_offset_entry(offsets, index);
1051 let end = if index == entry_count - 1 {
1052 entries.len()
1053 } else {
1054 let (_, next_start) = read_offset_entry(offsets, index + 1);
1055 next_start
1056 };
1057 let hash = &entries[start..start + hash_len_usize];
1059 let val_size = entry_val_size(ty)?;
1060 Ok(GetKeyEntryResult {
1061 hash,
1062 key: &entries[start + hash_len_usize..end - val_size],
1063 ty,
1064 val: &entries[end - val_size..end],
1065 })
1066}
1067
1068fn get_fixed_key_entry<'l>(
1073 entries: &'l [u8],
1074 index: usize,
1075 hash_len: u8,
1076 key_size: usize,
1077 value_type: u8,
1078 stride: usize,
1079) -> GetKeyEntryResult<'l> {
1080 let hash_len_usize = hash_len as usize;
1081 let start = index * stride;
1082 GetKeyEntryResult {
1083 hash: &entries[start..start + hash_len_usize],
1084 key: &entries[start + hash_len_usize..start + hash_len_usize + key_size],
1085 ty: value_type,
1086 val: &entries[start + hash_len_usize + key_size..(index + 1) * stride],
1087 }
1088}