1use std::{
2 borrow::Cow,
3 collections::VecDeque,
4 fs::File,
5 io::{BufWriter, Write},
6 path::Path,
7};
8
9use anyhow::{Context, Result};
10use byteorder::{BE, ByteOrder, WriteBytesExt};
11
12use crate::{
13 compression::{checksum_block, compress_into_buffer},
14 constants::{MAX_INLINE_VALUE_SIZE, MAX_SMALL_VALUE_SIZE, MIN_SMALL_VALUE_BLOCK_SIZE},
15 meta_file::MetaEntryFlags,
16 static_sorted_file::{
17 BLOB_VALUE_REF_SIZE, BLOCK_TYPE_FIXED_KEY_NO_HASH, BLOCK_TYPE_FIXED_KEY_WITH_HASH,
18 BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY_NO_HASH, BLOCK_TYPE_KEY_WITH_HASH, DELETED_VALUE_REF_SIZE,
19 KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, KEY_BLOCK_ENTRY_TYPE_INLINE_MIN,
20 KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL, MEDIUM_VALUE_REF_SIZE,
21 SMALL_VALUE_REF_SIZE,
22 },
23};
24
25pub const BLOCK_HEADER_SIZE: usize = 8;
27
28const MAX_KEY_BLOCK_ENTRIES: usize = MAX_KEY_BLOCK_SIZE / KEY_BLOCK_ENTRY_META_OVERHEAD;
30const MAX_KEY_BLOCK_SIZE: usize = 16 * 1024;
33const KEY_BLOCK_ENTRY_META_OVERHEAD: usize = 20;
42const AMQF_FALSE_POSITIVE_RATE: f64 = 0.01;
44const AVG_SMALL_VALUE_SIZE: usize = 64;
49
50const BLOCK_INDEX_CAPACITY_BUFFER: usize = 16;
54
55const MIN_KEY_SIZE_FOR_COMPRESSION: usize = 16;
61
62const MAX_FIXED_KEY_LEN: usize = u8::MAX as usize;
67
68#[derive(Clone, Copy, PartialEq, Eq, Debug)]
73struct EntryType(u8);
74
75#[derive(Clone, Copy)]
83enum KeyBlockFormat {
84 Unknown,
86 Fixed { key_len: u8, value_type: EntryType },
88 Variable,
90}
91
92impl KeyBlockFormat {
93 fn update(&mut self, key_len: usize, value_type: EntryType) {
98 *self = match *self {
99 KeyBlockFormat::Unknown => {
100 if key_len <= MAX_FIXED_KEY_LEN {
101 KeyBlockFormat::Fixed {
102 key_len: key_len as u8,
103 value_type,
104 }
105 } else {
106 KeyBlockFormat::Variable
107 }
108 }
109 KeyBlockFormat::Fixed {
110 key_len: k,
111 value_type: v,
112 } if k as usize == key_len && v == value_type => KeyBlockFormat::Fixed {
113 key_len: k,
114 value_type: v,
115 },
116 KeyBlockFormat::Fixed { .. } | KeyBlockFormat::Variable => KeyBlockFormat::Variable,
117 };
118 }
119}
120
121#[derive(Clone, Copy)]
123struct KeyBlockFlushInfo {
124 max_key_len: usize,
125 format: KeyBlockFormat,
126}
127
128struct KeyBlockAccumulator {
134 size: usize,
136 entry_count: usize,
138 max_key_len: usize,
140 last_hash: u64,
143 format: KeyBlockFormat,
145}
146
147impl KeyBlockAccumulator {
148 fn new() -> Self {
149 Self {
150 size: 0,
151 entry_count: 0,
152 max_key_len: 0,
153 last_hash: 0,
154 format: KeyBlockFormat::Unknown,
155 }
156 }
157
158 fn add(&mut self, key_len: usize, key_hash: u64, value_type: EntryType) {
160 self.size += key_len + KEY_BLOCK_ENTRY_META_OVERHEAD;
161 self.max_key_len = self.max_key_len.max(key_len);
162 self.entry_count += 1;
163 self.last_hash = key_hash;
164 self.format.update(key_len, value_type);
165 }
166
167 fn flush_info(&self) -> KeyBlockFlushInfo {
169 KeyBlockFlushInfo {
170 max_key_len: self.max_key_len,
171 format: self.format,
172 }
173 }
174
175 fn should_flush(&self, next_key_len: usize, next_key_hash: u64) -> bool {
179 if self.entry_count == 0 {
180 return false;
181 }
182 let would_exceed_size =
183 self.size + next_key_len + KEY_BLOCK_ENTRY_META_OVERHEAD > MAX_KEY_BLOCK_SIZE;
184 let would_exceed_entries = self.entry_count >= MAX_KEY_BLOCK_ENTRIES;
185 (would_exceed_size || would_exceed_entries) && self.last_hash != next_key_hash
187 }
188
189 fn reset(&mut self) {
191 self.size = 0;
192 self.entry_count = 0;
193 self.max_key_len = 0;
194 self.format = KeyBlockFormat::Unknown;
195 }
197}
198
199fn use_hash(max_key_len: usize) -> bool {
201 max_key_len > 32
202}
203
204pub trait Entry {
206 fn key_hash(&self) -> u64;
208 fn key_len(&self) -> usize;
210 fn write_key_to(&self, buf: &mut Vec<u8>);
212
213 fn value(&self) -> EntryValue<'_>;
215}
216
217impl<E: Entry> Entry for &E {
218 fn key_hash(&self) -> u64 {
219 (*self).key_hash()
220 }
221 fn key_len(&self) -> usize {
222 (*self).key_len()
223 }
224 fn write_key_to(&self, buf: &mut Vec<u8>) {
225 (*self).write_key_to(buf)
226 }
227 fn value(&self) -> EntryValue<'_> {
228 (*self).value()
229 }
230}
231
232#[derive(Copy, Clone)]
234pub enum EntryValue<'l> {
235 Inline { value: &'l [u8] },
237 Small { value: &'l [u8] },
239 Medium { value: &'l [u8] },
241 MediumRaw {
243 uncompressed_size: u32,
246 checksum: u32,
248 block: &'l [u8],
249 },
250 Large { blob: u32 },
252 Deleted,
254}
255
256#[derive(Debug, Clone)]
257pub struct StaticSortedFileBuilderMeta<'a> {
258 pub min_hash: u64,
260 pub max_hash: u64,
262 pub amqf: Cow<'a, [u8]>,
264 pub block_count: u16,
266 pub size: u64,
268 pub flags: MetaEntryFlags,
270 pub entries: u64,
272}
273
274pub fn write_static_stored_file<E: Entry>(
281 entries: &[E],
282 file: &Path,
283 flags: MetaEntryFlags,
284) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
285 debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted());
286 let mut writer = StreamingSstWriter::new(file, flags, entries.len() as u64)?;
287 for entry in entries {
288 writer.add(entry)?;
289 }
290 writer.close()
291}
292
293fn write_raw_block_to_file(
302 file: &mut BufWriter<File>,
303 block_offsets: &mut Vec<u32>,
304 uncompressed_size: u32,
305 checksum: u32,
306 block: &[u8],
307) -> Result<u16> {
308 let block_index: u16 = block_offsets
309 .len()
310 .try_into()
311 .expect("Block index overflow");
312
313 let len: u32 = (block.len() + BLOCK_HEADER_SIZE).try_into().unwrap();
314 let offset = block_offsets
315 .last()
316 .copied()
317 .unwrap_or_default()
318 .checked_add(len)
319 .expect("Block offset overflow");
320 block_offsets.push(offset);
321
322 file.write_u32::<BE>(uncompressed_size)
323 .context("Failed to write uncompressed size")?;
324 file.write_u32::<BE>(checksum)
325 .context("Failed to write checksum")?;
326 file.write_all(block)
327 .context("Failed to write block data")?;
328 Ok(block_index)
329}
330
331fn write_block_to_file(
333 file: &mut BufWriter<File>,
334 compress_buffer: &mut Vec<u8>,
335 block_offsets: &mut Vec<u32>,
336 block: &[u8],
337 try_compress: bool,
338) -> Result<u16> {
339 let (uncompressed_size, data_to_write): (u32, &[u8]) = if try_compress {
340 compress_into_buffer(block, compress_buffer)?;
341 if compress_buffer.len() < block.len() - (block.len() / 8) {
343 (block.len().try_into().unwrap(), compress_buffer.as_slice())
344 } else {
345 (0, block)
346 }
347 } else {
348 (0, block)
349 };
350
351 let checksum = checksum_block(data_to_write);
353
354 let result = write_raw_block_to_file(
355 file,
356 block_offsets,
357 uncompressed_size,
358 checksum,
359 data_to_write,
360 );
361 compress_buffer.clear();
362 result
363}
364
365enum ValueRef {
371 Small {
373 block_index: u16,
374 offset: u32,
375 size: u16,
376 },
377 PendingSmall {
380 #[cfg(debug_assertions)]
381 small_block_id: u16,
382 offset: u32,
383 size: u16,
384 },
385 Medium { block_index: u16 },
387 Inline {
389 data: [u8; MAX_INLINE_VALUE_SIZE],
390 len: u8,
391 },
392 Blob { blob_id: u32 },
394 Deleted,
396}
397
398impl ValueRef {
399 fn entry_type(&self) -> EntryType {
401 EntryType(match self {
402 ValueRef::Small { .. } | ValueRef::PendingSmall { .. } => KEY_BLOCK_ENTRY_TYPE_SMALL,
403 ValueRef::Medium { .. } => KEY_BLOCK_ENTRY_TYPE_MEDIUM,
404 ValueRef::Inline { len, .. } => KEY_BLOCK_ENTRY_TYPE_INLINE_MIN + *len,
405 ValueRef::Blob { .. } => KEY_BLOCK_ENTRY_TYPE_BLOB,
406 ValueRef::Deleted => KEY_BLOCK_ENTRY_TYPE_DELETED,
407 })
408 }
409
410 fn write_value_to(&self, buffer: &mut Vec<u8>) {
415 match self {
416 ValueRef::Small {
417 block_index,
418 offset,
419 size,
420 } => {
421 let mut scratch = [0u8; 8];
422 BE::write_u16(&mut scratch, *block_index);
423 BE::write_u16(&mut scratch[2..], *size);
424 BE::write_u32(&mut scratch[4..], *offset);
425 buffer.extend(&scratch);
426 }
427 ValueRef::Medium { block_index } => {
428 let mut scratch = [0u8; 2];
429 BE::write_u16(&mut scratch, *block_index);
430 buffer.extend(scratch);
431 }
432 ValueRef::Inline { data, len } => {
433 buffer.extend(&data[..*len as usize]);
434 }
435 ValueRef::Blob { blob_id } => {
436 let mut scratch = [0u8; 4];
437 BE::write_u32(&mut scratch, *blob_id);
438 buffer.extend(scratch);
439 }
440 ValueRef::Deleted => { }
441 ValueRef::PendingSmall { .. } => {
442 unreachable!("PendingSmall should have been resolved");
443 }
444 }
445 }
446}
447
448struct PendingEntry<E> {
449 entry: E,
450 value_ref: ValueRef,
451}
452
453pub struct StreamingSstWriter<E: Entry> {
463 file: Option<BufWriter<File>>,
466 compress_buffer: Vec<u8>,
467 block_offsets: Vec<u32>,
468
469 pending_keys: VecDeque<PendingEntry<E>>,
492
493 first_pending_small_index: usize,
498
499 #[cfg(debug_assertions)]
501 current_small_block_id: u16,
502
503 pending_small_value_block: Vec<u8>,
505
506 key_buffer: Vec<u8>,
508
509 collected_fingerprints: Vec<u32>,
512
513 key_block_boundaries: Vec<(u64, u16)>,
515
516 min_hash: u64,
518 max_hash: u64,
519 entry_count: u64,
520 flags: MetaEntryFlags,
521
522 total_key_size: usize,
524 total_value_size: usize,
525
526 pending_key_total_size: usize,
528
529 current_key_block: KeyBlockAccumulator,
531
532 #[cfg(debug_assertions)]
534 finished: bool,
535}
536
537impl<E: Entry> StreamingSstWriter<E> {
538 pub fn new(file: &Path, flags: MetaEntryFlags, max_entry_count: u64) -> Result<Self> {
542 let file = BufWriter::new(File::create(file)?);
543
544 let estimated_key_blocks = (max_entry_count as usize)
547 .div_ceil(MAX_KEY_BLOCK_ENTRIES)
548 .max(1);
549 let entries_per_value_block = MIN_SMALL_VALUE_BLOCK_SIZE / AVG_SMALL_VALUE_SIZE;
552 let estimated_value_blocks = (max_entry_count as usize)
553 .div_ceil(entries_per_value_block)
554 .max(1);
555 let estimated_total_blocks = estimated_key_blocks + estimated_value_blocks + 1;
556
557 Ok(Self {
558 file: Some(file),
559 compress_buffer: Vec::with_capacity(MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE),
560 block_offsets: Vec::with_capacity(estimated_total_blocks),
561 pending_keys: VecDeque::with_capacity(entries_per_value_block),
562 first_pending_small_index: 0,
563 #[cfg(debug_assertions)]
564 current_small_block_id: 0,
565 pending_small_value_block: Vec::with_capacity(
566 MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE,
567 ),
568 key_buffer: Vec::with_capacity(MAX_KEY_BLOCK_SIZE),
569 collected_fingerprints: Vec::with_capacity(max_entry_count as usize),
570 key_block_boundaries: Vec::with_capacity(estimated_key_blocks),
571 min_hash: u64::MAX,
572 max_hash: 0,
573 entry_count: 0,
574 flags,
575 total_key_size: 0,
576 total_value_size: 0,
577 pending_key_total_size: 0,
578 current_key_block: KeyBlockAccumulator::new(),
579 #[cfg(debug_assertions)]
580 finished: false,
581 })
582 }
583
584 pub fn is_full(&self, max_entries: usize, max_data_size: usize) -> bool {
588 self.entry_count as usize >= max_entries
589 || self.total_key_size + self.total_value_size >= max_data_size
590 || !self.has_block_index_capacity()
591 }
592
593 fn has_block_index_capacity(&self) -> bool {
597 let blocks_written = self.block_offsets.len();
598 let pending_small_block = usize::from(!self.pending_small_value_block.is_empty());
603 let pending_key_blocks = self
604 .pending_keys
605 .len()
606 .div_ceil(MAX_KEY_BLOCK_ENTRIES)
607 .max(self.pending_key_total_size.div_ceil(MAX_KEY_BLOCK_SIZE))
608 .max(1);
609 let index_block = 1;
610 let buffer = BLOCK_INDEX_CAPACITY_BUFFER;
611 blocks_written + pending_small_block + pending_key_blocks + index_block + buffer
612 < u16::MAX as usize
613 }
614
615 pub fn add(&mut self, entry: E) -> Result<()> {
617 let key_hash = entry.key_hash();
618 let key_len = entry.key_len();
619
620 if self.entry_count == 0 {
622 self.min_hash = key_hash;
623 }
624 self.max_hash = key_hash;
625 self.entry_count += 1;
626
627 self.collected_fingerprints.push(key_hash as u32);
629
630 self.total_key_size += key_len;
632 self.pending_key_total_size += key_len;
633
634 let value_ref = match entry.value() {
636 EntryValue::Medium { value } => {
637 self.total_value_size += value.len();
638 let block_index = write_block_to_file(
639 self.file.as_mut().unwrap(),
640 &mut self.compress_buffer,
641 &mut self.block_offsets,
642 value,
643 true,
644 )
645 .context("Failed to write value block")?;
646 ValueRef::Medium { block_index }
647 }
648 EntryValue::MediumRaw {
649 uncompressed_size,
650 checksum,
651 block,
652 } => {
653 self.total_value_size += block.len();
656 let block_index = write_raw_block_to_file(
657 self.file.as_mut().unwrap(),
658 &mut self.block_offsets,
659 uncompressed_size,
660 checksum,
661 block,
662 )
663 .context("Failed to write compressed value block")?;
664 ValueRef::Medium { block_index }
665 }
666 EntryValue::Small { value } => {
667 self.total_value_size += value.len();
668
669 let offset = self.pending_small_value_block.len() as u32;
670 let size: u16 = value.len().try_into().unwrap();
671 self.pending_small_value_block.extend_from_slice(value);
672
673 if self.first_pending_small_index >= self.pending_keys.len() {
675 self.first_pending_small_index = self.pending_keys.len();
676 }
677
678 let value_ref = ValueRef::PendingSmall {
679 #[cfg(debug_assertions)]
680 small_block_id: self.current_small_block_id,
681 offset,
682 size,
683 };
684
685 self.push_pending_key_entry(entry, value_ref);
686
687 if self.pending_small_value_block.len() >= MIN_SMALL_VALUE_BLOCK_SIZE {
691 self.flush_small_value_block()?;
692 }
693
694 return Ok(());
695 }
696 EntryValue::Inline { value } => {
697 debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
698 let mut data = [0u8; MAX_INLINE_VALUE_SIZE];
699 data[..value.len()].copy_from_slice(value);
700 ValueRef::Inline {
701 data,
702 len: value.len() as u8,
703 }
704 }
705 EntryValue::Large { blob } => ValueRef::Blob { blob_id: blob },
706 EntryValue::Deleted => ValueRef::Deleted,
707 };
708
709 self.push_pending_key_entry(entry, value_ref);
710 self.try_flush_key_blocks()
711 }
712
713 fn push_pending_key_entry(&mut self, entry: E, value_ref: ValueRef) {
715 self.pending_keys
716 .push_back(PendingEntry { entry, value_ref });
717 }
718
719 fn try_flush_key_blocks(&mut self) -> Result<()> {
725 debug_assert!(!matches!(
726 self.pending_keys.back().unwrap().value_ref,
727 ValueRef::PendingSmall { .. }
728 ));
729 if self.first_pending_small_index != self.pending_keys.len() - 1 {
730 return Ok(());
732 }
733 self.advance_boundary_to(self.pending_keys.len())
734 }
735
736 fn advance_boundary_to(&mut self, new_boundary: usize) -> Result<()> {
742 let mut last_flushed_end = 0usize;
743 let mut cumulative_key_size = 0usize;
746 let mut flushed_key_size = 0usize;
747
748 for i in self.first_pending_small_index..new_boundary {
749 let entry = &self.pending_keys[i];
750 let key_len = entry.entry.key_len();
751 let key_hash = entry.entry.key_hash();
752 let value_type = entry.value_ref.entry_type();
753
754 if self.current_key_block.should_flush(key_len, key_hash) {
755 let block_end = last_flushed_end + self.current_key_block.entry_count;
756 let info = self.current_key_block.flush_info();
757 self.flush_key_block(last_flushed_end, block_end, info)?;
758 flushed_key_size = cumulative_key_size;
759 last_flushed_end = block_end;
760 self.current_key_block.reset();
761 }
762
763 cumulative_key_size += key_len;
764 self.current_key_block.add(key_len, key_hash, value_type);
765 }
766
767 if last_flushed_end > 0 {
768 self.pending_key_total_size -= flushed_key_size;
769 self.pending_keys.drain(..last_flushed_end);
770 }
771
772 self.first_pending_small_index = new_boundary - last_flushed_end;
773 Ok(())
774 }
775
776 fn flush_small_value_block(&mut self) -> Result<()> {
779 if self.pending_small_value_block.is_empty() {
782 return Ok(());
783 }
784
785 let block_index = write_block_to_file(
786 self.file.as_mut().unwrap(),
787 &mut self.compress_buffer,
788 &mut self.block_offsets,
789 &self.pending_small_value_block,
790 true,
791 )
792 .context("Failed to write small value block")?;
793
794 #[cfg(debug_assertions)]
798 let flushed_id = self.current_small_block_id;
799 for i in self.first_pending_small_index..self.pending_keys.len() {
800 let entry = &mut self.pending_keys[i];
801 if let ValueRef::PendingSmall {
802 #[cfg(debug_assertions)]
803 small_block_id,
804 offset,
805 size,
806 } = entry.value_ref
807 {
808 #[cfg(debug_assertions)]
809 debug_assert_eq!(
810 small_block_id, flushed_id,
811 "all pending small entries must reference the small value block that was just \
812 written"
813 );
814 entry.value_ref = ValueRef::Small {
815 block_index,
816 offset,
817 size,
818 };
819 }
820 }
821
822 self.advance_boundary_to(self.pending_keys.len())?;
825
826 #[cfg(debug_assertions)]
828 {
829 self.current_small_block_id += 1;
830 }
831 self.pending_small_value_block.clear();
832
833 Ok(())
834 }
835
836 fn flush_key_block(&mut self, start: usize, end: usize, info: KeyBlockFlushInfo) -> Result<()> {
838 let entry_count = end - start;
839 let has_hash = use_hash(info.max_key_len);
840 let try_compress = info.max_key_len >= MIN_KEY_SIZE_FOR_COMPRESSION;
841
842 self.key_buffer.clear();
843
844 if let KeyBlockFormat::Fixed {
845 key_len: key_size,
846 value_type,
847 } = info.format
848 {
849 let mut builder = FixedKeyBlockBuilder::new(
850 &mut self.key_buffer,
851 entry_count as u32,
852 has_hash,
853 key_size,
854 value_type,
855 );
856 for i in start..end {
857 let pending = &self.pending_keys[i];
858 builder.put(&pending.entry, &pending.value_ref, has_hash);
859 }
860 builder.finish();
861 } else {
862 let mut builder =
863 KeyBlockBuilder::new(&mut self.key_buffer, entry_count as u32, has_hash);
864
865 for i in start..end {
866 let pending = &self.pending_keys[i];
867 builder.put(&pending.entry, &pending.value_ref, has_hash);
868 }
869
870 builder.finish();
871 }
872
873 let first_hash = self.pending_keys[start].entry.key_hash();
875 let block_index = write_block_to_file(
876 self.file.as_mut().unwrap(),
877 &mut self.compress_buffer,
878 &mut self.block_offsets,
879 &self.key_buffer,
880 try_compress,
881 )
882 .context("Failed to write key block")?;
883 self.key_block_boundaries.push((first_hash, block_index));
884
885 Ok(())
886 }
887
888 pub fn close(mut self) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
891 #[cfg(debug_assertions)]
892 {
893 self.finished = true;
894 }
895
896 self.flush_small_value_block()?;
898
899 self.flush_remaining_key_blocks()?;
901
902 assert!(
903 !self.key_block_boundaries.is_empty(),
904 "StreamingSstWriter::close() called with no entries"
905 );
906
907 let mut file = self.file.take().unwrap();
908
909 let index_entry_count: u16 = (self.key_block_boundaries.len() - 1)
912 .try_into()
913 .expect("Index entries count overflow");
914 let index_block_size: usize =
915 INDEX_BLOCK_HEADER_SIZE + index_entry_count as usize * INDEX_BLOCK_ENTRY_SIZE;
916 let mut index_buf = Vec::with_capacity(index_block_size);
917 {
918 let first_block = self.key_block_boundaries[0].1;
919 let mut index_block = IndexBlockBuilder::new(&mut index_buf, first_block);
920 for &(hash, block) in &self.key_block_boundaries[1..] {
921 index_block.put(hash, block);
922 }
923 }
924 let index_checksum = checksum_block(&index_buf);
925 write_raw_block_to_file(
926 &mut file,
927 &mut self.block_offsets,
928 0,
929 index_checksum,
930 &index_buf,
931 )
932 .context("Failed to write index block")?;
933
934 for offset in &self.block_offsets {
936 file.write_u32::<BE>(*offset)
937 .context("Failed to write block offset")?;
938 }
939
940 let block_count: u16 = self
941 .block_offsets
942 .len()
943 .try_into()
944 .expect("Block count overflow");
945
946 let actual_count = self.collected_fingerprints.len() as u64;
950 let mut builder = qfilter::Builder::new(
951 qfilter::Filter::new(actual_count.max(1), AMQF_FALSE_POSITIVE_RATE)
952 .expect("Filter can't be constructed"),
953 );
954
955 let fp_size = builder.fingerprint_size();
956 assert!(fp_size < 32, "fp_size {fp_size} exceeds u32");
957 let fp_mask = (1u32 << fp_size) - 1;
958 self.collected_fingerprints
960 .sort_unstable_by_key(|&h| h & fp_mask);
961 for &h in &self.collected_fingerprints {
962 builder
963 .insert_fingerprint(false, h as u64)
964 .expect("AMQF insert failed");
965 }
966 let filter = builder.into_filter();
967
968 let amqf = postcard::to_allocvec(&filter).expect("AMQF serialization failed");
970
971 let last_block_end = self.block_offsets.last().copied().unwrap_or_default() as u64;
974 let offset_table_size = block_count as u64 * size_of::<u32>() as u64;
975 let file_size = last_block_end + offset_table_size;
976
977 let meta = StaticSortedFileBuilderMeta {
978 min_hash: self.min_hash,
979 max_hash: self.max_hash,
980 amqf: Cow::Owned(amqf),
981 block_count,
982 size: file_size,
983 flags: self.flags,
984 entries: self.entry_count,
985 };
986
987 Ok((meta, file.into_inner()?))
988 }
989
990 fn flush_remaining_key_blocks(&mut self) -> Result<()> {
997 if self.pending_keys.is_empty() {
998 return Ok(());
999 }
1000
1001 debug_assert_eq!(
1006 self.first_pending_small_index,
1007 self.pending_keys.len(),
1008 "expected no unresolved PendingSmall entries after flush_small_value_block"
1009 );
1010
1011 let total = self.pending_keys.len();
1012 let mut block_start = 0;
1013 let mut acc = KeyBlockAccumulator::new();
1014
1015 for i in 0..total {
1016 let entry = &self.pending_keys[i];
1017 let key_len = entry.entry.key_len();
1018 let key_hash = entry.entry.key_hash();
1019 let value_type = entry.value_ref.entry_type();
1020
1021 if acc.should_flush(key_len, key_hash) {
1022 self.flush_key_block(block_start, i, acc.flush_info())?;
1023 block_start = i;
1024 acc.reset();
1025 }
1026
1027 acc.add(key_len, key_hash, value_type);
1028 }
1029
1030 if block_start < total {
1032 self.flush_key_block(block_start, total, acc.flush_info())?;
1033 }
1034
1035 self.pending_keys.clear();
1037 Ok(())
1038 }
1039}
1040
1041#[cfg(debug_assertions)]
1042impl<E: Entry> Drop for StreamingSstWriter<E> {
1043 fn drop(&mut self) {
1044 if !std::thread::panicking() {
1046 assert!(
1047 self.finished || self.entry_count == 0,
1048 "StreamingSstWriter dropped without calling close()"
1049 );
1050 }
1051 }
1052}
1053
1054struct KeyBlockBuilder<'l> {
1063 current_entry: usize,
1064 header_size: usize,
1065 buffer: &'l mut Vec<u8>,
1066}
1067
1068const KEY_BLOCK_HEADER_SIZE: usize = 4;
1070
1071impl<'l> KeyBlockBuilder<'l> {
1072 fn new(buffer: &'l mut Vec<u8>, entry_count: u32, has_hash: bool) -> Self {
1074 debug_assert!(entry_count < (1 << 24));
1075
1076 const ESTIMATED_KEY_SIZE: usize = 16;
1077 buffer.reserve(entry_count as usize * ESTIMATED_KEY_SIZE);
1078 let block_type = if has_hash {
1079 BLOCK_TYPE_KEY_WITH_HASH
1080 } else {
1081 BLOCK_TYPE_KEY_NO_HASH
1082 };
1083 buffer.write_u8(block_type).unwrap();
1084 buffer.write_u24::<BE>(entry_count).unwrap();
1085 for _ in 0..entry_count {
1086 buffer.write_u32::<BE>(0).unwrap();
1087 }
1088 Self {
1089 current_entry: 0,
1090 header_size: buffer.len(),
1091 buffer,
1092 }
1093 }
1094
1095 fn write_entry_header(&mut self, entry_type: EntryType) {
1097 let pos = self.buffer.len() - self.header_size;
1098 let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4;
1099 let header = (pos as u32) | ((entry_type.0 as u32) << 24);
1100 BE::write_u32(&mut self.buffer[header_offset..header_offset + 4], header);
1101 }
1102
1103 fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1105 self.write_entry_header(value_ref.entry_type());
1106 if has_hash {
1107 self.buffer
1108 .extend_from_slice(&entry.key_hash().to_be_bytes());
1109 }
1110 entry.write_key_to(self.buffer);
1111 value_ref.write_value_to(self.buffer);
1112 self.current_entry += 1;
1113 }
1114
1115 fn finish(self) -> &'l mut Vec<u8> {
1117 self.buffer
1118 }
1119}
1120
1121const FIXED_KEY_BLOCK_HEADER_SIZE: usize = 6;
1127
1128struct FixedKeyBlockBuilder<'l> {
1132 buffer: &'l mut Vec<u8>,
1133}
1134
1135impl<'l> FixedKeyBlockBuilder<'l> {
1136 fn new(
1137 buffer: &'l mut Vec<u8>,
1138 entry_count: u32,
1139 has_hash: bool,
1140 key_size: u8,
1141 value_type: EntryType,
1142 ) -> Self {
1143 let hash_len: usize = if has_hash { 8 } else { 0 };
1144 let val_size = value_type_val_size(value_type);
1145 let stride = hash_len + key_size as usize + val_size;
1146 buffer.reserve(FIXED_KEY_BLOCK_HEADER_SIZE + entry_count as usize * stride);
1147
1148 let block_type = if has_hash {
1149 BLOCK_TYPE_FIXED_KEY_WITH_HASH
1150 } else {
1151 BLOCK_TYPE_FIXED_KEY_NO_HASH
1152 };
1153 buffer.extend_from_slice(&[
1154 block_type,
1155 (entry_count >> 16) as u8,
1156 (entry_count >> 8) as u8,
1157 entry_count as u8,
1158 key_size,
1159 value_type.0,
1160 ]);
1161
1162 Self { buffer }
1163 }
1164
1165 fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1167 if has_hash {
1168 self.buffer
1169 .extend_from_slice(&entry.key_hash().to_be_bytes());
1170 }
1171 entry.write_key_to(self.buffer);
1172 value_ref.write_value_to(self.buffer);
1173 }
1174
1175 fn finish(self) -> &'l mut Vec<u8> {
1176 self.buffer
1177 }
1178}
1179
1180fn value_type_val_size(ty: EntryType) -> usize {
1185 match ty.0 {
1186 KEY_BLOCK_ENTRY_TYPE_SMALL => SMALL_VALUE_REF_SIZE,
1187 KEY_BLOCK_ENTRY_TYPE_MEDIUM => MEDIUM_VALUE_REF_SIZE,
1188 KEY_BLOCK_ENTRY_TYPE_BLOB => BLOB_VALUE_REF_SIZE,
1189 KEY_BLOCK_ENTRY_TYPE_DELETED => DELETED_VALUE_REF_SIZE,
1190 ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
1191 (ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize
1192 }
1193 _ => panic!("Invalid key block entry type: {:?}", ty),
1194 }
1195}
1196
1197struct IndexBlockBuilder<W: Write> {
1203 writer: W,
1204}
1205
1206pub(crate) const INDEX_BLOCK_ENTRY_SIZE: usize = size_of::<u64>() + size_of::<u16>();
1208
1209pub(crate) const INDEX_BLOCK_HEADER_SIZE: usize = size_of::<u8>() + size_of::<u16>();
1211
1212impl<W: Write> IndexBlockBuilder<W> {
1213 fn new(mut writer: W, first_block: u16) -> Self {
1216 writer.write_u8(BLOCK_TYPE_INDEX).unwrap();
1217 writer.write_u16::<BE>(first_block).unwrap();
1218 Self { writer }
1219 }
1220
1221 fn put(&mut self, hash: u64, block: u16) {
1223 self.writer.write_u64::<BE>(hash).unwrap();
1224 self.writer.write_u16::<BE>(block).unwrap();
1225 }
1226}
1227
1228#[cfg(test)]
1229mod tests {
1230 use super::*;
1231 use crate::{
1232 key::hash_key,
1233 lookup_entry::LookupValue,
1234 static_sorted_file::{
1235 BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData,
1236 },
1237 };
1238
1239 fn make_cache() -> BlockCache {
1240 BlockCache::with(
1241 100,
1242 4 * 1024 * 1024,
1243 Default::default(),
1244 Default::default(),
1245 Default::default(),
1246 )
1247 }
1248
1249 struct TestEntry {
1251 key: Vec<u8>,
1252 hash: u64,
1253 value_kind: TestValueKind,
1254 }
1255
1256 enum TestValueKind {
1257 Inline(Vec<u8>),
1258 Small(Vec<u8>),
1259 Medium(Vec<u8>),
1260 MediumRaw(Vec<u8>),
1262 Blob(u32),
1263 Deleted,
1264 }
1265
1266 impl TestEntry {
1267 fn new(key: &[u8], value_kind: TestValueKind) -> Self {
1268 let key = key.to_vec();
1269 let hash = hash_key(&key);
1270 Self {
1271 key,
1272 hash,
1273 value_kind,
1274 }
1275 }
1276
1277 fn small(key: &[u8], value: &[u8]) -> Self {
1278 Self::new(key, TestValueKind::Small(value.to_vec()))
1279 }
1280
1281 fn inline(key: &[u8], value: &[u8]) -> Self {
1282 debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
1283 Self::new(key, TestValueKind::Inline(value.to_vec()))
1284 }
1285
1286 fn medium(key: &[u8], value: &[u8]) -> Self {
1287 Self::new(key, TestValueKind::Medium(value.to_vec()))
1288 }
1289
1290 fn blob(key: &[u8], blob_id: u32) -> Self {
1291 Self::new(key, TestValueKind::Blob(blob_id))
1292 }
1293
1294 fn deleted(key: &[u8]) -> Self {
1295 Self::new(key, TestValueKind::Deleted)
1296 }
1297
1298 fn medium_raw(key: &[u8], value: &[u8]) -> Self {
1299 Self::new(key, TestValueKind::MediumRaw(value.to_vec()))
1300 }
1301
1302 fn expected_value(&self) -> Option<&[u8]> {
1303 match &self.value_kind {
1304 TestValueKind::Inline(v)
1305 | TestValueKind::Small(v)
1306 | TestValueKind::Medium(v)
1307 | TestValueKind::MediumRaw(v) => Some(v),
1308 _ => None,
1309 }
1310 }
1311 }
1312
1313 impl Entry for TestEntry {
1314 fn key_hash(&self) -> u64 {
1315 self.hash
1316 }
1317
1318 fn key_len(&self) -> usize {
1319 self.key.len()
1320 }
1321
1322 fn write_key_to(&self, buf: &mut Vec<u8>) {
1323 buf.extend_from_slice(&self.key);
1324 }
1325
1326 fn value(&self) -> EntryValue<'_> {
1327 match &self.value_kind {
1328 TestValueKind::Inline(v) => EntryValue::Inline { value: v },
1329 TestValueKind::Small(v) => EntryValue::Small { value: v },
1330 TestValueKind::Medium(v) => EntryValue::Medium { value: v },
1331 TestValueKind::MediumRaw(v) => EntryValue::MediumRaw {
1332 uncompressed_size: 0,
1334 checksum: checksum_block(v),
1335 block: v,
1336 },
1337 TestValueKind::Blob(id) => EntryValue::Large { blob: *id },
1338 TestValueKind::Deleted => EntryValue::Deleted,
1339 }
1340 }
1341 }
1342
1343 fn sort_entries(entries: &mut [TestEntry]) {
1345 entries.sort_by_key(|e| e.hash);
1346 }
1347
1348 fn open_sst(
1350 dir: &Path,
1351 seq: u32,
1352 meta: &StaticSortedFileBuilderMeta<'_>,
1353 ) -> Result<StaticSortedFile> {
1354 StaticSortedFile::open(
1355 dir,
1356 StaticSortedFileMetaData {
1357 sequence_number: seq,
1358 block_count: meta.block_count,
1359 },
1360 )
1361 }
1362
1363 fn write_sst(
1365 dir: &Path,
1366 seq: u32,
1367 entries: &[TestEntry],
1368 flags: MetaEntryFlags,
1369 ) -> Result<StaticSortedFileBuilderMeta<'static>> {
1370 let sst_path = dir.join(format!("{seq:08}.sst"));
1371 let mut writer = StreamingSstWriter::new(&sst_path, flags, entries.len() as u64)?;
1372 for entry in entries {
1373 writer.add(entry)?;
1374 }
1375 let (meta, _file) = writer.close()?;
1376 Ok(meta)
1377 }
1378
1379 fn assert_lookup(
1381 sst: &StaticSortedFile,
1382 entry: &TestEntry,
1383 kc: &BlockCache,
1384 vc: &BlockCache,
1385 ) -> Result<()> {
1386 let result = sst.lookup::<_, false>(entry.hash, &entry.key, kc, vc)?;
1387 match (&entry.value_kind, result) {
1388 (_, SstLookupResult::Found(values))
1389 if values.len() == 1 && matches!(values[0], LookupValue::Slice { .. }) =>
1390 {
1391 let LookupValue::Slice { value } = &values[0] else {
1392 unreachable!()
1393 };
1394 let expected = entry
1395 .expected_value()
1396 .expect("Got Slice but entry has no value");
1397 assert_eq!(
1398 value.as_ref(),
1399 expected,
1400 "value mismatch for key {:?}",
1401 std::str::from_utf8(&entry.key)
1402 );
1403 }
1404 (TestValueKind::Blob(expected_id), SstLookupResult::Found(values))
1405 if values.len() == 1 && matches!(values[0], LookupValue::Blob { .. }) =>
1406 {
1407 let LookupValue::Blob { sequence_number } = &values[0] else {
1408 unreachable!()
1409 };
1410 assert_eq!(*sequence_number, *expected_id);
1411 }
1412 (TestValueKind::Deleted, SstLookupResult::Found(values))
1413 if values.len() == 1 && matches!(values[0], LookupValue::Deleted) => {}
1414 _ => {
1415 panic!(
1416 "Unexpected lookup result for key {:?}",
1417 std::str::from_utf8(&entry.key)
1418 );
1419 }
1420 }
1421 Ok(())
1422 }
1423
1424 #[test]
1425 fn single_inline_entry() -> Result<()> {
1426 let dir = tempfile::tempdir()?;
1427 let mut entries = vec![TestEntry::inline(b"key1", b"val1")];
1428 sort_entries(&mut entries);
1429
1430 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1431 assert_eq!(meta.entries, 1);
1432
1433 let sst = open_sst(dir.path(), 1, &meta)?;
1434 let kc = make_cache();
1435 let vc = make_cache();
1436 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1437 Ok(())
1438 }
1439
1440 #[test]
1441 fn single_small_entry() -> Result<()> {
1442 let dir = tempfile::tempdir()?;
1443 let value = vec![0xAB; 100]; let mut entries = vec![TestEntry::small(b"skey", &value)];
1445 sort_entries(&mut entries);
1446
1447 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1448 assert_eq!(meta.entries, 1);
1449
1450 let sst = open_sst(dir.path(), 1, &meta)?;
1451 let kc = make_cache();
1452 let vc = make_cache();
1453 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1454 Ok(())
1455 }
1456
1457 #[test]
1458 fn single_medium_entry() -> Result<()> {
1459 let dir = tempfile::tempdir()?;
1460 let value = vec![0xCD; 8192]; let mut entries = vec![TestEntry::medium(b"mkey", &value)];
1462 sort_entries(&mut entries);
1463
1464 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1465 assert_eq!(meta.entries, 1);
1466
1467 let sst = open_sst(dir.path(), 1, &meta)?;
1468 let kc = make_cache();
1469 let vc = make_cache();
1470 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1471 Ok(())
1472 }
1473
1474 #[test]
1475 fn single_blob_entry() -> Result<()> {
1476 let dir = tempfile::tempdir()?;
1477 let mut entries = vec![TestEntry::blob(b"bkey", 42)];
1478 sort_entries(&mut entries);
1479
1480 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1481 assert_eq!(meta.entries, 1);
1482
1483 let sst = open_sst(dir.path(), 1, &meta)?;
1484 let kc = make_cache();
1485 let vc = make_cache();
1486 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1487 Ok(())
1488 }
1489
1490 #[test]
1491 fn single_deleted_entry() -> Result<()> {
1492 let dir = tempfile::tempdir()?;
1493 let mut entries = vec![TestEntry::deleted(b"dkey")];
1494 sort_entries(&mut entries);
1495
1496 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1497 assert_eq!(meta.entries, 1);
1498
1499 let sst = open_sst(dir.path(), 1, &meta)?;
1500 let kc = make_cache();
1501 let vc = make_cache();
1502 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1503 Ok(())
1504 }
1505
1506 #[test]
1507 fn many_small_values() -> Result<()> {
1508 let dir = tempfile::tempdir()?;
1509 let count = 200;
1512 let mut entries: Vec<TestEntry> = (0..count)
1513 .map(|i| {
1514 let key = format!("key-{i:04}");
1515 let value = vec![(i & 0xFF) as u8; 200];
1516 TestEntry::small(key.as_bytes(), &value)
1517 })
1518 .collect();
1519 sort_entries(&mut entries);
1520
1521 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1522 assert_eq!(meta.entries, count as u64);
1523
1524 let sst = open_sst(dir.path(), 1, &meta)?;
1525 let kc = make_cache();
1526 let vc = make_cache();
1527
1528 for entry in &entries {
1529 assert_lookup(&sst, entry, &kc, &vc)?;
1530 }
1531 Ok(())
1532 }
1533
1534 #[test]
1535 fn many_medium_values() -> Result<()> {
1536 let dir = tempfile::tempdir()?;
1537 let count = 50;
1538 let mut entries: Vec<TestEntry> = (0..count)
1539 .map(|i| {
1540 let key = format!("mkey-{i:04}");
1541 let value = vec![(i & 0xFF) as u8; 8192];
1542 TestEntry::medium(key.as_bytes(), &value)
1543 })
1544 .collect();
1545 sort_entries(&mut entries);
1546
1547 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1548 assert_eq!(meta.entries, count as u64);
1549
1550 let sst = open_sst(dir.path(), 1, &meta)?;
1551 let kc = make_cache();
1552 let vc = make_cache();
1553
1554 for entry in &entries {
1555 assert_lookup(&sst, entry, &kc, &vc)?;
1556 }
1557 Ok(())
1558 }
1559
1560 #[test]
1561 fn mixed_value_types() -> Result<()> {
1562 let dir = tempfile::tempdir()?;
1563 let mut entries = vec![
1564 TestEntry::inline(b"a-inline", b"tiny"),
1565 TestEntry::small(b"b-small", &[0x11; 200]),
1566 TestEntry::medium(b"c-medium", &[0x22; 8192]),
1567 TestEntry::blob(b"d-blob", 99),
1568 TestEntry::deleted(b"e-deleted"),
1569 TestEntry::small(b"f-small2", &[0x33; 300]),
1570 TestEntry::inline(b"g-inline2", b"mini"),
1571 TestEntry::medium(b"h-medium2", &[0x44; 16384]),
1572 ];
1573 sort_entries(&mut entries);
1574
1575 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1576 assert_eq!(meta.entries, 8);
1577
1578 let sst = open_sst(dir.path(), 1, &meta)?;
1579 let kc = make_cache();
1580 let vc = make_cache();
1581
1582 for entry in &entries {
1583 assert_lookup(&sst, entry, &kc, &vc)?;
1584 }
1585 Ok(())
1586 }
1587
1588 #[test]
1589 fn is_full_entry_count_limit() {
1590 let dir = tempfile::tempdir().unwrap();
1591 let sst_path = dir.path().join("test.sst");
1592 let mut writer =
1593 StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1594
1595 let max_entries = 50;
1596 for i in 0..max_entries {
1597 let key = format!("k{i:06}");
1598 let entry = TestEntry::inline(key.as_bytes(), &[0; 4]);
1599 writer.add(entry).unwrap();
1600 }
1601
1602 assert_eq!(writer.entry_count, max_entries as u64);
1603 assert!(
1604 writer.is_full(max_entries, usize::MAX),
1605 "Should be full when entry count reaches max_entries"
1606 );
1607 assert!(
1608 !writer.is_full(max_entries + 1, usize::MAX),
1609 "Should not be full when limit is higher"
1610 );
1611 writer.close().unwrap();
1612 }
1613
1614 #[test]
1615 fn is_full_data_size_limit() {
1616 let dir = tempfile::tempdir().unwrap();
1617 let sst_path = dir.path().join("test.sst");
1618 let mut writer =
1619 StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1620
1621 let value = vec![0u8; 1000];
1622 for i in 0..10 {
1623 let key = format!("k{i:06}");
1624 let entry = TestEntry::small(key.as_bytes(), &value);
1625 writer.add(entry).unwrap();
1626 }
1627
1628 let total = writer.total_key_size + writer.total_value_size;
1629 assert!(total > 10_000, "total data should exceed 10KB");
1630 assert!(writer.is_full(usize::MAX, total - 1));
1631 assert!(!writer.is_full(usize::MAX, total + 1));
1632 writer.close().unwrap();
1633 }
1634
1635 #[test]
1636 fn write_static_stored_file_matches_streaming() -> Result<()> {
1637 let dir = tempfile::tempdir()?;
1638
1639 let mut entries: Vec<TestEntry> = (0..100)
1640 .map(|i| {
1641 let key = format!("rkey-{i:04}");
1642 if i % 3 == 0 {
1643 TestEntry::inline(key.as_bytes(), &[(i & 0xFF) as u8; 4])
1644 } else if i % 3 == 1 {
1645 TestEntry::small(key.as_bytes(), &[(i & 0xFF) as u8; 200])
1646 } else {
1647 TestEntry::medium(key.as_bytes(), &[(i & 0xFF) as u8; 8192])
1648 }
1649 })
1650 .collect();
1651 sort_entries(&mut entries);
1652
1653 let batch_path = dir.path().join("00000001.sst");
1655 let (meta1, _) =
1656 write_static_stored_file(&entries, &batch_path, MetaEntryFlags::default())?;
1657
1658 let streaming_path = dir.path().join("00000002.sst");
1660 let mut writer = StreamingSstWriter::new(
1661 &streaming_path,
1662 MetaEntryFlags::default(),
1663 entries.len() as u64,
1664 )?;
1665 for entry in &entries {
1666 writer.add(entry)?;
1667 }
1668 let (meta2, _) = writer.close()?;
1669
1670 assert_eq!(meta1.entries, meta2.entries);
1672 assert_eq!(meta1.min_hash, meta2.min_hash);
1673 assert_eq!(meta1.max_hash, meta2.max_hash);
1674 assert_eq!(meta1.block_count, meta2.block_count);
1675
1676 let sst1 = StaticSortedFile::open(
1678 dir.path(),
1679 StaticSortedFileMetaData {
1680 sequence_number: 1,
1681 block_count: meta1.block_count,
1682 },
1683 )?;
1684 let sst2 = StaticSortedFile::open(
1685 dir.path(),
1686 StaticSortedFileMetaData {
1687 sequence_number: 2,
1688 block_count: meta2.block_count,
1689 },
1690 )?;
1691 let kc = make_cache();
1692 let vc = make_cache();
1693
1694 for entry in &entries {
1695 let r1 = sst1.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1696 let r2 = sst2.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1697 match (&r1, &r2) {
1698 (SstLookupResult::Found(v1), SstLookupResult::Found(v2))
1699 if v1.len() == 1 && v2.len() == 1 =>
1700 {
1701 match (&v1[0], &v2[0]) {
1702 (
1703 LookupValue::Slice { value: val1 },
1704 LookupValue::Slice { value: val2 },
1705 ) => {
1706 assert_eq!(
1707 val1.as_ref(),
1708 val2.as_ref(),
1709 "Value mismatch for key {:?}",
1710 std::str::from_utf8(&entry.key)
1711 );
1712 }
1713 (LookupValue::Deleted, LookupValue::Deleted) => {}
1714 (
1715 LookupValue::Blob {
1716 sequence_number: s1,
1717 },
1718 LookupValue::Blob {
1719 sequence_number: s2,
1720 },
1721 ) => {
1722 assert_eq!(s1, s2);
1723 }
1724 _ => panic!(
1725 "Mismatched results for key {:?}",
1726 std::str::from_utf8(&entry.key)
1727 ),
1728 }
1729 }
1730 _ => panic!(
1731 "Mismatched results for key {:?}",
1732 std::str::from_utf8(&entry.key)
1733 ),
1734 }
1735 }
1736 Ok(())
1737 }
1738
1739 #[test]
1740 #[should_panic(expected = "StreamingSstWriter::close() called with no entries")]
1741 fn close_empty_writer_panics() {
1742 let dir = tempfile::tempdir().unwrap();
1743 let sst_path = dir.path().join("empty.sst");
1744 let writer =
1745 StreamingSstWriter::<TestEntry>::new(&sst_path, MetaEntryFlags::default(), 0).unwrap();
1746 writer.close().unwrap();
1747 }
1748
1749 #[test]
1750 fn key_block_boundary_at_max_entries() -> Result<()> {
1751 let dir = tempfile::tempdir()?;
1752 let count = MAX_KEY_BLOCK_ENTRIES + 1;
1753 let mut entries: Vec<TestEntry> = (0..count)
1754 .map(|i| {
1755 let key = format!("boundary-{i:06}");
1756 TestEntry::inline(key.as_bytes(), &[0u8; 4])
1757 })
1758 .collect();
1759 sort_entries(&mut entries);
1760
1761 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1762 assert_eq!(meta.entries, count as u64);
1763 assert!(
1765 meta.block_count >= 3,
1766 "expected at least 2 key blocks + 1 index block"
1767 );
1768
1769 let sst = open_sst(dir.path(), 1, &meta)?;
1770 let kc = make_cache();
1771 let vc = make_cache();
1772 for entry in &entries {
1773 assert_lookup(&sst, entry, &kc, &vc)?;
1774 }
1775 Ok(())
1776 }
1777
1778 #[test]
1779 fn single_medium_raw_entry() -> Result<()> {
1780 let dir = tempfile::tempdir()?;
1781 let value = vec![0xBE; 8192];
1782 let mut entries = vec![TestEntry::medium_raw(b"rkey", &value)];
1783 sort_entries(&mut entries);
1784
1785 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1786 assert_eq!(meta.entries, 1);
1787
1788 let sst = open_sst(dir.path(), 1, &meta)?;
1789 let kc = make_cache();
1790 let vc = make_cache();
1791 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1792 Ok(())
1793 }
1794
1795 fn corrupt_sst_byte(dir: &Path, seq: u32, pos: u64) {
1797 use std::io::{Seek, SeekFrom, Write as _};
1798
1799 let sst_path = dir.join(format!("{seq:08}.sst"));
1800 let file_bytes = std::fs::read(&sst_path).unwrap();
1801 let original = file_bytes[pos as usize];
1802 let mut file = std::fs::OpenOptions::new()
1803 .write(true)
1804 .open(&sst_path)
1805 .unwrap();
1806 file.seek(SeekFrom::Start(pos)).unwrap();
1807 file.write_all(&[original ^ 0xFF]).unwrap();
1808 file.sync_all().unwrap();
1809 }
1810
1811 fn assert_corruption_detected(
1813 dir: &Path,
1814 seq: u32,
1815 meta: &StaticSortedFileBuilderMeta<'_>,
1816 entries: &[TestEntry],
1817 ) {
1818 let sst = open_sst(dir, seq, meta).unwrap();
1819 let kc = make_cache();
1820 let vc = make_cache();
1821 match sst.lookup::<_, false>(entries[0].hash, &entries[0].key, &kc, &vc) {
1822 Err(err) => {
1823 let msg = format!("{err}");
1824 assert!(
1825 msg.contains("corruption"),
1826 "Expected corruption error, got: {msg}"
1827 );
1828 }
1829 Ok(_) => panic!("Expected checksum error, but lookup succeeded"),
1830 }
1831 }
1832
1833 #[test]
1834 fn checksum_detects_corrupted_compressed_block() {
1835 let dir = tempfile::tempdir().unwrap();
1836 let value = vec![0xCD; 8192];
1838 let entries = vec![TestEntry::medium(b"mkey", &value)];
1839
1840 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1841
1842 corrupt_sst_byte(dir.path(), 1, 4);
1845 assert_corruption_detected(dir.path(), 1, &meta, &entries);
1846 }
1847
1848 #[test]
1849 fn checksum_detects_corrupted_uncompressed_block() {
1850 let dir = tempfile::tempdir().unwrap();
1851 let entries = vec![TestEntry::inline(b"key1", b"val1")];
1853
1854 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1855
1856 corrupt_sst_byte(dir.path(), 1, BLOCK_HEADER_SIZE as u64 + 1);
1858 assert_corruption_detected(dir.path(), 1, &meta, &entries);
1859 }
1860}