1use std::{
2 cmp::Ordering,
3 fs::File,
4 hash::BuildHasherDefault,
5 path::{Path, PathBuf},
6 sync::Arc,
7};
8
9use anyhow::{Context, Result, bail};
10use byteorder::{BE, ReadBytesExt};
11use memmap2::Mmap;
12use quick_cache::sync::GuardResult;
13use rustc_hash::FxHasher;
14use smallvec::SmallVec;
15
16use crate::{
17 QueryKey,
18 arc_bytes::ArcBytes,
19 compression::{checksum_block, decompress_into_arc},
20 constants::MAX_INLINE_VALUE_SIZE,
21 lookup_entry::{LazyLookupValue, LookupEntry, LookupValue},
22 mmap_helper::advise_mmap_for_persistence,
23 static_sorted_file_builder::BLOCK_HEADER_SIZE,
24};
25
26pub const BLOCK_TYPE_INDEX: u8 = 0;
28pub const BLOCK_TYPE_KEY_WITH_HASH: u8 = 1;
30pub const BLOCK_TYPE_KEY_NO_HASH: u8 = 2;
32pub const BLOCK_TYPE_FIXED_KEY_WITH_HASH: u8 = 3;
34pub const BLOCK_TYPE_FIXED_KEY_NO_HASH: u8 = 4;
36
37pub const KEY_BLOCK_ENTRY_TYPE_SMALL: u8 = 0;
39pub const KEY_BLOCK_ENTRY_TYPE_BLOB: u8 = 1;
41pub const KEY_BLOCK_ENTRY_TYPE_DELETED: u8 = 2;
43pub const KEY_BLOCK_ENTRY_TYPE_MEDIUM: u8 = 3;
45pub const KEY_BLOCK_ENTRY_TYPE_INLINE_MIN: u8 = 8;
47
48pub(crate) const SMALL_VALUE_REF_SIZE: usize = 8;
50pub(crate) const MEDIUM_VALUE_REF_SIZE: usize = 2;
52pub(crate) const BLOB_VALUE_REF_SIZE: usize = 4;
54pub(crate) const DELETED_VALUE_REF_SIZE: usize = 0;
56
57const _: () = assert!(
60 MAX_INLINE_VALUE_SIZE <= (u8::MAX - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize,
61 "MAX_INLINE_VALUE_SIZE exceeds what can be encoded in key type byte"
62);
63
64pub enum SstLookupResult {
66 Found(SmallVec<[LookupValue; 1]>),
68 NotFound,
70}
71
72impl From<LookupValue> for SstLookupResult {
73 fn from(value: LookupValue) -> Self {
74 SstLookupResult::Found(smallvec::smallvec![value])
75 }
76}
77
78#[derive(Clone, Default)]
79pub struct BlockWeighter;
80
81impl quick_cache::Weighter<(u32, u16), ArcBytes> for BlockWeighter {
82 fn weight(&self, _key: &(u32, u16), val: &ArcBytes) -> u64 {
83 if val.is_mmap_backed() {
84 64
88 } else {
89 val.len() as u64 + 8
90 }
91 }
92}
93
94pub type BlockCache =
95 quick_cache::sync::Cache<(u32, u16), ArcBytes, BlockWeighter, BuildHasherDefault<FxHasher>>;
96
97trait ValueBlockCache {
103 fn get_or_read(self, sst: &StaticSortedFile, block_index: u16) -> Result<ArcBytes>;
104}
105
106impl ValueBlockCache for &BlockCache {
107 fn get_or_read(self, sst: &StaticSortedFile, block_index: u16) -> Result<ArcBytes> {
108 let this = &sst;
109 let block = match self.get_value_or_guard(&(this.meta.sequence_number, block_index), None) {
110 GuardResult::Value(block) => block,
111 GuardResult::Guard(guard) => {
112 let block = this.read_small_value_block(block_index)?;
113 let _ = guard.insert(block.clone());
114 block
115 }
116 GuardResult::Timeout => unreachable!(),
117 };
118 Ok(block)
119 }
120}
121
122impl ValueBlockCache for &mut Option<(u16, ArcBytes)> {
123 fn get_or_read(self, sst: &StaticSortedFile, block_index: u16) -> Result<ArcBytes> {
124 if let Some((idx, block)) = self.as_ref()
125 && *idx == block_index
126 {
127 return Ok(block.clone());
128 }
129 let block = sst.read_small_value_block(block_index)?;
130 *self = Some((block_index, block.clone()));
131 Ok(block)
132 }
133}
134
135#[derive(Clone, Copy, Debug)]
136pub struct StaticSortedFileMetaData {
137 pub sequence_number: u32,
139 pub block_count: u16,
141}
142
143impl StaticSortedFileMetaData {
144 pub fn block_offsets_start(&self, sst_len: usize) -> usize {
145 let bc: usize = self.block_count.into();
146 sst_len - (bc * size_of::<u32>())
147 }
148}
149
150pub struct StaticSortedFile {
152 meta: StaticSortedFileMetaData,
154 mmap: Arc<Mmap>,
158}
159
160impl StaticSortedFile {
161 pub fn open(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
164 let filename = format!("{:08}.sst", meta.sequence_number);
165 let path = db_path.join(&filename);
166 Self::open_internal(path, meta, false)
167 .with_context(|| format!("Unable to open static sorted file {filename}"))
168 }
169
170 pub fn open_for_compaction(db_path: &Path, meta: StaticSortedFileMetaData) -> Result<Self> {
174 let filename = format!("{:08}.sst", meta.sequence_number);
175 let path = db_path.join(&filename);
176 Self::open_internal(path, meta, true)
177 .with_context(|| format!("Unable to open static sorted file {filename}"))
178 }
179
180 fn open_internal(
181 path: PathBuf,
182 meta: StaticSortedFileMetaData,
183 sequential: bool,
184 ) -> Result<Self> {
185 let file = File::open(&path)
186 .with_context(|| format!("Failed to open SST file {}", path.display()))?;
187 let mmap = unsafe { Mmap::map(&file) }.with_context(|| {
188 format!(
189 "Failed to mmap SST file {} ({} bytes)",
190 path.display(),
191 file.metadata().map(|m| m.len()).unwrap_or(0)
192 )
193 })?;
194 #[cfg(unix)]
195 if sequential {
196 mmap.advise(memmap2::Advice::Sequential)?;
197 } else {
198 mmap.advise(memmap2::Advice::Random)?;
199 let offset = meta.block_offsets_start(mmap.len());
200 let _ = mmap.advise_range(memmap2::Advice::Sequential, offset, mmap.len() - offset);
201 }
202 advise_mmap_for_persistence(&mmap)?;
203 let file = Self {
204 meta,
205 mmap: Arc::new(mmap),
206 };
207 Ok(file)
208 }
209
210 pub fn try_into_iter(self) -> Result<StaticSortedFileIter> {
214 let block_count = self.meta.block_count;
215 let mut iter = StaticSortedFileIter {
216 this: self,
217 stack: Vec::new(),
218 current_key_block: None,
219 value_block_cache: None,
220 };
221 iter.enter_block(block_count - 1)?;
222 Ok(iter)
223 }
224
225 pub fn lookup<K: QueryKey, const FIND_ALL: bool>(
231 &self,
232 key_hash: u64,
233 key: &K,
234 key_block_cache: &BlockCache,
235 value_block_cache: &BlockCache,
236 ) -> Result<SstLookupResult> {
237 let mut current_block = self.meta.block_count - 1;
238 loop {
239 let mut key_block_arc = self.get_key_block(current_block, key_block_cache)?;
240 let block_type = key_block_arc.read_u8()?;
241 match block_type {
242 BLOCK_TYPE_INDEX => {
243 current_block = self.lookup_index_block(&key_block_arc, key_hash)?;
244 }
245 BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
246 let has_hash = block_type == BLOCK_TYPE_KEY_WITH_HASH;
247 return self.lookup_key_block::<K, FIND_ALL>(
248 key_block_arc,
249 key_hash,
250 key,
251 has_hash,
252 value_block_cache,
253 );
254 }
255 BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
256 let has_hash = block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH;
257 return self.lookup_fixed_key_block::<K, FIND_ALL>(
258 key_block_arc,
259 key_hash,
260 key,
261 has_hash,
262 value_block_cache,
263 );
264 }
265 _ => {
266 bail!("Invalid block type");
267 }
268 }
269 }
270 }
271
272 fn lookup_index_block(&self, mut block: &[u8], hash: u64) -> Result<u16> {
274 let first_block = block.read_u16::<BE>()?;
275 let (entries, remainder) = block.as_chunks::<10>();
277 if entries.is_empty() {
278 return Ok(first_block);
279 }
280 if !remainder.is_empty() {
281 bail!("invalid index block, {} extra bytes", remainder.len())
282 }
283 match entries.binary_search_by(|entry| (&entry[..]).read_u64::<BE>().unwrap().cmp(&hash)) {
284 Ok(i) => Ok((&entries[i][8..]).read_u16::<BE>()?),
285 Err(0) => Ok(first_block),
286 Err(i) => Ok((&entries[i - 1][8..]).read_u16::<BE>()?),
287 }
288 }
289
290 fn lookup_key_block<K: QueryKey, const FIND_ALL: bool>(
295 &self,
296 mut block: ArcBytes,
297 key_hash: u64,
298 key: &K,
299 has_hash: bool,
300 value_block_cache: &BlockCache,
301 ) -> Result<SstLookupResult> {
302 let hash_len: u8 = if has_hash { 8 } else { 0 };
303 let entry_count = block.read_u24::<BE>()? as usize;
304 let offsets = &block[..entry_count * 4];
305 let entries = &block[entry_count * 4..];
306
307 self.lookup_block_inner::<K, FIND_ALL>(
308 &block,
309 entry_count,
310 key_hash,
311 key,
312 value_block_cache,
313 |i| get_key_entry(offsets, entries, entry_count, i, hash_len),
314 )
315 }
316
317 fn lookup_fixed_key_block<K: QueryKey, const FIND_ALL: bool>(
322 &self,
323 mut block: ArcBytes,
324 key_hash: u64,
325 key: &K,
326 has_hash: bool,
327 value_block_cache: &BlockCache,
328 ) -> Result<SstLookupResult> {
329 let hash_len: u8 = if has_hash { 8 } else { 0 };
330 let entry_count = block.read_u24::<BE>()? as usize;
331 let key_size = block.read_u8()? as usize;
332 let value_type = block.read_u8()?;
333 let val_size = entry_val_size(value_type)?;
334 let stride = hash_len as usize + key_size + val_size;
335 let entries = &block[..];
336
337 self.lookup_block_inner::<K, FIND_ALL>(
338 &block,
339 entry_count,
340 key_hash,
341 key,
342 value_block_cache,
343 |i| {
344 Ok(get_fixed_key_entry(
345 entries, i, hash_len, key_size, value_type, stride,
346 ))
347 },
348 )
349 }
350
351 fn lookup_block_inner<'a, K: QueryKey, const FIND_ALL: bool>(
356 &self,
357 block: &ArcBytes,
358 entry_count: usize,
359 key_hash: u64,
360 key: &K,
361 value_block_cache: &BlockCache,
362 get_entry: impl Fn(usize) -> Result<GetKeyEntryResult<'a>>,
363 ) -> Result<SstLookupResult> {
364 let mut l = 0;
365 let mut r = entry_count;
366 while l < r {
368 let m = (l + r) / 2;
369 let GetKeyEntryResult {
370 hash: mid_hash,
371 key: mid_key,
372 ty,
373 val,
374 } = get_entry(m)?;
375
376 let comparison = compare_hash_key(mid_hash, mid_key, key_hash, key);
377
378 match comparison {
379 Ordering::Less => r = m,
380 Ordering::Equal => {
381 if !FIND_ALL {
382 let result = self.handle_key_match(ty, val, block, value_block_cache)?;
385 return Ok(SstLookupResult::Found(SmallVec::from_buf([result])));
386 }
387 let mut results = SmallVec::new();
393 for i in (l..m).rev() {
394 let GetKeyEntryResult {
395 hash,
396 key: entry_key,
397 ty,
398 val,
399 } = get_entry(i)?;
400 if !entry_matches_key(hash, entry_key, key_hash, key) {
401 break;
402 }
403 results.push(self.handle_key_match(ty, val, block, value_block_cache)?);
404 }
405 results.push(self.handle_key_match(ty, val, block, value_block_cache)?);
414 for i in (m + 1)..r {
415 let GetKeyEntryResult {
416 hash,
417 key: entry_key,
418 ty,
419 val,
420 } = get_entry(i)?;
421 if !entry_matches_key(hash, entry_key, key_hash, key) {
422 break;
423 }
424 results.push(self.handle_key_match(ty, val, block, value_block_cache)?);
425 }
426 return Ok(SstLookupResult::Found(results));
427 }
428 Ordering::Greater => l = m + 1,
429 }
430 }
431
432 Ok(SstLookupResult::NotFound)
433 }
434
435 fn handle_key_match(
437 &self,
438 ty: u8,
439 mut val: &[u8],
440 key_block_arc: &ArcBytes,
441 value_block_cache: impl ValueBlockCache,
442 ) -> Result<LookupValue> {
443 Ok(match ty {
444 KEY_BLOCK_ENTRY_TYPE_SMALL => {
445 let block = val.read_u16::<BE>()?;
446 let size = val.read_u16::<BE>()? as usize;
447 let position = val.read_u32::<BE>()? as usize;
448 let value = value_block_cache
449 .get_or_read(self, block)?
450 .slice(position..position + size);
451 LookupValue::Slice { value }
452 }
453 KEY_BLOCK_ENTRY_TYPE_MEDIUM => {
454 let block = val.read_u16::<BE>()?;
455 let value = self.read_value_block(block)?;
456 LookupValue::Slice { value }
457 }
458 KEY_BLOCK_ENTRY_TYPE_BLOB => {
459 let sequence_number = val.read_u32::<BE>()?;
460 LookupValue::Blob { sequence_number }
461 }
462 KEY_BLOCK_ENTRY_TYPE_DELETED => LookupValue::Deleted,
463 _ => {
464 let value = unsafe { key_block_arc.slice_from_subslice(val) };
467 LookupValue::Slice { value }
468 }
469 })
470 }
471
472 fn get_key_block(
474 &self,
475 block: u16,
476 key_block_cache: &BlockCache,
477 ) -> Result<ArcBytes, anyhow::Error> {
478 Ok(
479 match key_block_cache.get_value_or_guard(&(self.meta.sequence_number, block), None) {
480 GuardResult::Value(block) => block,
481 GuardResult::Guard(guard) => {
482 let block = self.read_key_block(block)?;
483 let _ = guard.insert(block.clone());
484 block
485 }
486 GuardResult::Timeout => unreachable!(),
487 },
488 )
489 }
490
491 fn read_key_block(&self, block_index: u16) -> Result<ArcBytes> {
493 self.read_block(block_index)
494 }
495
496 fn read_small_value_block(&self, block_index: u16) -> Result<ArcBytes> {
498 self.read_block(block_index)
499 }
500
501 fn read_value_block(&self, block_index: u16) -> Result<ArcBytes> {
503 self.read_block(block_index)
504 }
505
506 fn verify_checksum(&self, data: &[u8], expected: u32, block_index: u16) -> Result<()> {
508 let actual = checksum_block(data);
509 if actual != expected {
510 bail!(
511 "Cache corruption detected: checksum mismatch in block {} of {:08}.sst (expected \
512 {:08x}, got {:08x})",
513 block_index,
514 self.meta.sequence_number,
515 expected,
516 actual
517 );
518 }
519 Ok(())
520 }
521
522 #[tracing::instrument(level = "info", name = "reading database block", skip_all)]
527 fn read_block(&self, block_index: u16) -> Result<ArcBytes> {
528 let (uncompressed_length, expected_checksum, block) =
529 self.get_raw_block_slice(block_index).with_context(|| {
530 format!(
531 "Failed to read raw block {} from {:08}.sst",
532 block_index, self.meta.sequence_number
533 )
534 })?;
535
536 self.verify_checksum(block, expected_checksum, block_index)?;
538
539 if uncompressed_length == 0 {
541 return Ok(self.mmap_slice_to_arc_bytes(block));
542 }
543
544 #[cfg(unix)]
549 let _ = self.mmap.advise_range(
550 memmap2::Advice::Sequential,
551 block.as_ptr() as usize - self.mmap.as_ptr() as usize,
552 block.len(),
553 );
554
555 let buffer = decompress_into_arc(uncompressed_length, block).with_context(|| {
556 format!(
557 "Failed to decompress block {} from {:08}.sst ({} bytes uncompressed)",
558 block_index, self.meta.sequence_number, uncompressed_length
559 )
560 })?;
561 Ok(ArcBytes::from(buffer))
562 }
563
564 fn get_raw_block(&self, block_index: u16) -> Result<(u32, u32, ArcBytes)> {
568 let (uncompressed_length, checksum, block) = self.get_raw_block_slice(block_index)?;
569 Ok((
570 uncompressed_length,
571 checksum,
572 self.mmap_slice_to_arc_bytes(block),
573 ))
574 }
575
576 fn mmap_slice_to_arc_bytes(&self, subslice: &[u8]) -> ArcBytes {
578 unsafe { ArcBytes::from_mmap(self.mmap.clone(), subslice) }
580 }
581
582 fn get_raw_block_slice(&self, block_index: u16) -> Result<(u32, u32, &[u8])> {
585 #[cfg(feature = "strict_checks")]
586 if block_index >= self.meta.block_count {
587 bail!(
588 "Corrupted file seq:{} block:{} > number of blocks {} (block_offsets: {:x})",
589 self.meta.sequence_number,
590 block_index,
591 self.meta.block_count,
592 self.meta.block_offsets_start(self.mmap.len()),
593 );
594 }
595 let offset = self.meta.block_offsets_start(self.mmap.len()) + block_index as usize * 4;
596 #[cfg(feature = "strict_checks")]
597 if offset + 4 > self.mmap.len() {
598 bail!(
599 "Corrupted file seq:{} block:{} block offset locations {} + 4 bytes > file end {} \
600 (block_offsets: {:x})",
601 self.meta.sequence_number,
602 block_index,
603 offset,
604 self.mmap.len(),
605 self.meta.block_offsets_start(self.mmap.len()),
606 );
607 }
608 let block_start = if block_index == 0 {
609 0
610 } else {
611 (&self.mmap[offset - 4..offset])
612 .read_u32::<BE>()
613 .with_context(|| {
614 format!(
615 "Failed to read block_start offset for block {} in {:08}.sst",
616 block_index, self.meta.sequence_number
617 )
618 })? as usize
619 };
620 let block_end = (&self.mmap[offset..offset + 4])
621 .read_u32::<BE>()
622 .with_context(|| {
623 format!(
624 "Failed to read block_end offset for block {} in {:08}.sst",
625 block_index, self.meta.sequence_number
626 )
627 })? as usize;
628 #[cfg(feature = "strict_checks")]
629 if block_end > self.mmap.len() || block_start > self.mmap.len() {
630 bail!(
631 "Corrupted file seq:{} block:{} block {} - {} > file end {} (block_offsets: {:x})",
632 self.meta.sequence_number,
633 block_index,
634 block_start,
635 block_end,
636 self.mmap.len(),
637 self.meta.block_offsets_start(self.mmap.len()),
638 );
639 }
640 let uncompressed_length = u32::from_be_bytes(
641 self.mmap[block_start..block_start + 4]
642 .try_into()
643 .with_context(|| {
644 format!(
645 "Failed to read uncompressed_length from block {} header in {:08}.sst",
646 block_index, self.meta.sequence_number
647 )
648 })?,
649 );
650 let checksum = u32::from_be_bytes(
651 self.mmap[block_start + 4..block_start + 8]
652 .try_into()
653 .with_context(|| {
654 format!(
655 "Failed to read checksum from block {} header in {:08}.sst",
656 block_index, self.meta.sequence_number
657 )
658 })?,
659 );
660 let block = &self.mmap[block_start + BLOCK_HEADER_SIZE..block_end];
661 Ok((uncompressed_length, checksum, block))
662 }
663}
664
665pub struct StaticSortedFileIter {
667 this: StaticSortedFile,
668
669 stack: Vec<CurrentIndexBlock>,
670 current_key_block: Option<CurrentKeyBlock>,
671 value_block_cache: Option<(u16, ArcBytes)>,
675}
676
677enum CurrentKeyBlockKind {
678 Variable { offsets: ArcBytes, hash_len: u8 },
680 Fixed {
682 hash_len: u8,
683 key_size: usize,
684 value_type: u8,
685 stride: usize,
686 },
687}
688
689struct CurrentKeyBlock {
690 kind: CurrentKeyBlockKind,
691 entries: ArcBytes,
692 entry_count: usize,
693 index: usize,
694}
695
696struct CurrentIndexBlock {
697 entries: ArcBytes,
698 block_indices_count: usize,
699 index: usize,
700}
701
702impl Iterator for StaticSortedFileIter {
703 type Item = Result<LookupEntry>;
704
705 fn next(&mut self) -> Option<Self::Item> {
706 self.next_internal().transpose()
707 }
708}
709
710impl StaticSortedFileIter {
711 fn enter_block(&mut self, block_index: u16) -> Result<()> {
713 let block_arc = self.this.read_key_block(block_index)?;
714 let mut block = &*block_arc;
715 let block_type = block.read_u8()?;
716 match block_type {
717 BLOCK_TYPE_INDEX => {
718 let block_indices_count = (block.len() + 8) / 10;
719 let range = 1..block_arc.len();
720 self.stack.push(CurrentIndexBlock {
721 entries: block_arc.slice(range),
722 block_indices_count,
723 index: 0,
724 });
725 }
726 BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => {
727 let has_hash = block_type == BLOCK_TYPE_KEY_WITH_HASH;
728 let hash_len = if has_hash { 8 } else { 0 };
729 let entry_count = block.read_u24::<BE>()? as usize;
730 let offsets_range = 4..4 + entry_count * 4;
731 let entries_range = 4 + entry_count * 4..block_arc.len();
732 let offsets = block_arc.clone().slice(offsets_range);
733 let entries = block_arc.slice(entries_range);
734 self.current_key_block = Some(CurrentKeyBlock {
735 kind: CurrentKeyBlockKind::Variable { offsets, hash_len },
736 entries,
737 entry_count,
738 index: 0,
739 });
740 }
741 BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => {
742 let has_hash = block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH;
743 let hash_len = if has_hash { 8 } else { 0 };
744 let entry_count = block.read_u24::<BE>()? as usize;
745 let key_size = block.read_u8()? as usize;
746 let value_type = block.read_u8()?;
747 let val_size = entry_val_size(value_type)?;
748 let stride = hash_len as usize + key_size + val_size;
749 let entries_range = 6..block_arc.len();
751 let entries = block_arc.slice(entries_range);
752 self.current_key_block = Some(CurrentKeyBlock {
753 kind: CurrentKeyBlockKind::Fixed {
754 hash_len,
755 key_size,
756 value_type,
757 stride,
758 },
759 entries,
760 entry_count,
761 index: 0,
762 });
763 }
764 _ => {
765 bail!("Invalid block type");
766 }
767 }
768 Ok(())
769 }
770
771 fn next_internal(&mut self) -> Result<Option<LookupEntry>> {
773 loop {
774 if let Some(CurrentKeyBlock {
775 kind,
776 entries,
777 entry_count,
778 index,
779 }) = self.current_key_block.take()
780 {
781 let GetKeyEntryResult { hash, key, ty, val } = match &kind {
782 CurrentKeyBlockKind::Variable { offsets, hash_len } => {
783 get_key_entry(offsets, &entries, entry_count, index, *hash_len)?
784 }
785 CurrentKeyBlockKind::Fixed {
786 hash_len,
787 key_size,
788 value_type,
789 stride,
790 } => get_fixed_key_entry(
791 &entries,
792 index,
793 *hash_len,
794 *key_size,
795 *value_type,
796 *stride,
797 ),
798 };
799 let full_hash = if hash.is_empty() {
801 crate::key::hash_key(&key)
802 } else {
803 u64::from_be_bytes(hash.try_into().unwrap())
804 };
805 let value = if ty == KEY_BLOCK_ENTRY_TYPE_MEDIUM {
806 let mut val = val;
807 let block = val.read_u16::<BE>()?;
808 let (uncompressed_size, checksum, block) = self.this.get_raw_block(block)?;
809 LazyLookupValue::Medium {
810 uncompressed_size,
811 checksum,
812 block,
813 }
814 } else {
815 let value = self.this.handle_key_match(
816 ty,
817 val,
818 &entries,
819 &mut self.value_block_cache,
820 )?;
821 LazyLookupValue::Eager(value)
822 };
823 let entry = LookupEntry {
824 hash: full_hash,
825 key: unsafe { entries.slice_from_subslice(key) },
827 value,
828 };
829 if index + 1 < entry_count {
830 self.current_key_block = Some(CurrentKeyBlock {
831 kind,
832 entries,
833 entry_count,
834 index: index + 1,
835 });
836 }
837 return Ok(Some(entry));
838 }
839 if let Some(CurrentIndexBlock {
840 entries,
841 block_indices_count,
842 index,
843 }) = self.stack.pop()
844 {
845 let block_index = (&entries[index * 10..]).read_u16::<BE>()?;
846 if index + 1 < block_indices_count {
847 self.stack.push(CurrentIndexBlock {
848 entries,
849 block_indices_count,
850 index: index + 1,
851 });
852 }
853 self.enter_block(block_index)?;
854 } else {
855 return Ok(None);
856 }
857 }
858 }
859}
860
861struct GetKeyEntryResult<'l> {
862 hash: &'l [u8],
863 key: &'l [u8],
864 ty: u8,
865 val: &'l [u8],
866}
867
868fn compare_hash_key<K: QueryKey>(
872 entry_hash: &[u8],
873 entry_key: &[u8],
874 full_hash: u64,
875 query_key: &K,
876) -> Ordering {
877 if entry_hash.is_empty() {
878 let entry_full_hash = crate::key::hash_key(&entry_key);
880 match full_hash.cmp(&entry_full_hash) {
881 Ordering::Equal => query_key.cmp(entry_key),
882 ord => ord,
883 }
884 } else {
885 let full_hash_bytes = full_hash.to_be_bytes();
887 match full_hash_bytes[..].cmp(entry_hash) {
888 Ordering::Equal => query_key.cmp(entry_key),
889 ord => ord,
890 }
891 }
892}
893
894fn entry_matches_key<K: QueryKey>(
898 entry_hash: &[u8],
899 entry_key: &[u8],
900 full_hash: u64,
901 query_key: &K,
902) -> bool {
903 if entry_hash.is_empty() {
904 query_key.cmp(entry_key) == Ordering::Equal
906 } else {
907 full_hash.to_be_bytes()[..] == *entry_hash && query_key.cmp(entry_key) == Ordering::Equal
909 }
910}
911
912fn entry_val_size(ty: u8) -> Result<usize> {
914 match ty {
915 KEY_BLOCK_ENTRY_TYPE_SMALL => Ok(SMALL_VALUE_REF_SIZE),
916 KEY_BLOCK_ENTRY_TYPE_MEDIUM => Ok(MEDIUM_VALUE_REF_SIZE),
917 KEY_BLOCK_ENTRY_TYPE_BLOB => Ok(BLOB_VALUE_REF_SIZE),
918 KEY_BLOCK_ENTRY_TYPE_DELETED => Ok(DELETED_VALUE_REF_SIZE),
919 ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
920 Ok((ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize)
921 }
922 _ => bail!("Invalid key block entry type: {ty}"),
923 }
924}
925
926fn get_key_entry<'l>(
928 offsets: &[u8],
929 entries: &'l [u8],
930 entry_count: usize,
931 index: usize,
932 hash_len: u8,
933) -> Result<GetKeyEntryResult<'l>> {
934 let hash_len_usize = hash_len as usize;
935 let mut offset = &offsets[index * 4..];
936 let ty = offset.read_u8()?;
937 let start = offset.read_u24::<BE>()? as usize;
938 let end = if index == entry_count - 1 {
939 entries.len()
940 } else {
941 (&offsets[(index + 1) * 4 + 1..]).read_u24::<BE>()? as usize
942 };
943 let hash = &entries[start..start + hash_len_usize];
945 let val_size = entry_val_size(ty)?;
946 Ok(GetKeyEntryResult {
947 hash,
948 key: &entries[start + hash_len_usize..end - val_size],
949 ty,
950 val: &entries[end - val_size..end],
951 })
952}
953
954fn get_fixed_key_entry<'l>(
959 entries: &'l [u8],
960 index: usize,
961 hash_len: u8,
962 key_size: usize,
963 value_type: u8,
964 stride: usize,
965) -> GetKeyEntryResult<'l> {
966 let hash_len_usize = hash_len as usize;
967 let start = index * stride;
968 GetKeyEntryResult {
969 hash: &entries[start..start + hash_len_usize],
970 key: &entries[start + hash_len_usize..start + hash_len_usize + key_size],
971 ty: value_type,
972 val: &entries[start + hash_len_usize + key_size..(index + 1) * stride],
973 }
974}