1use std::{
2 borrow::Cow,
3 collections::VecDeque,
4 fs::File,
5 io::{BufWriter, Write},
6 path::Path,
7};
8
9use anyhow::{Context, Result};
10use byteorder::{BE, ByteOrder, WriteBytesExt};
11use turbo_bincode::turbo_bincode_encode;
12
13use crate::{
14 compression::{checksum_block, compress_into_buffer},
15 constants::{MAX_INLINE_VALUE_SIZE, MAX_SMALL_VALUE_SIZE, MIN_SMALL_VALUE_BLOCK_SIZE},
16 meta_file::{AmqfBincodeWrapper, MetaEntryFlags},
17 static_sorted_file::{
18 BLOB_VALUE_REF_SIZE, BLOCK_TYPE_FIXED_KEY_NO_HASH, BLOCK_TYPE_FIXED_KEY_WITH_HASH,
19 BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY_NO_HASH, BLOCK_TYPE_KEY_WITH_HASH, DELETED_VALUE_REF_SIZE,
20 KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, KEY_BLOCK_ENTRY_TYPE_INLINE_MIN,
21 KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL, MEDIUM_VALUE_REF_SIZE,
22 SMALL_VALUE_REF_SIZE,
23 },
24};
25
26pub const BLOCK_HEADER_SIZE: usize = 8;
28
29const MAX_KEY_BLOCK_ENTRIES: usize = MAX_KEY_BLOCK_SIZE / KEY_BLOCK_ENTRY_META_OVERHEAD;
31const MAX_KEY_BLOCK_SIZE: usize = 16 * 1024;
34const KEY_BLOCK_ENTRY_META_OVERHEAD: usize = 20;
43const AMQF_FALSE_POSITIVE_RATE: f64 = 0.01;
45const AVG_SMALL_VALUE_SIZE: usize = 64;
50
51const BLOCK_INDEX_CAPACITY_BUFFER: usize = 16;
55
56const MIN_KEY_SIZE_FOR_COMPRESSION: usize = 16;
62
63const MAX_FIXED_KEY_LEN: usize = u8::MAX as usize;
68
69#[derive(Clone, Copy, PartialEq, Eq, Debug)]
74struct EntryType(u8);
75
76#[derive(Clone, Copy)]
84enum KeyBlockFormat {
85 Unknown,
87 Fixed { key_len: u8, value_type: EntryType },
89 Variable,
91}
92
93impl KeyBlockFormat {
94 fn update(&mut self, key_len: usize, value_type: EntryType) {
99 *self = match *self {
100 KeyBlockFormat::Unknown => {
101 if key_len <= MAX_FIXED_KEY_LEN {
102 KeyBlockFormat::Fixed {
103 key_len: key_len as u8,
104 value_type,
105 }
106 } else {
107 KeyBlockFormat::Variable
108 }
109 }
110 KeyBlockFormat::Fixed {
111 key_len: k,
112 value_type: v,
113 } if k as usize == key_len && v == value_type => KeyBlockFormat::Fixed {
114 key_len: k,
115 value_type: v,
116 },
117 KeyBlockFormat::Fixed { .. } | KeyBlockFormat::Variable => KeyBlockFormat::Variable,
118 };
119 }
120}
121
122#[derive(Clone, Copy)]
124struct KeyBlockFlushInfo {
125 max_key_len: usize,
126 format: KeyBlockFormat,
127}
128
129struct KeyBlockAccumulator {
135 size: usize,
137 entry_count: usize,
139 max_key_len: usize,
141 last_hash: u64,
144 format: KeyBlockFormat,
146}
147
148impl KeyBlockAccumulator {
149 fn new() -> Self {
150 Self {
151 size: 0,
152 entry_count: 0,
153 max_key_len: 0,
154 last_hash: 0,
155 format: KeyBlockFormat::Unknown,
156 }
157 }
158
159 fn add(&mut self, key_len: usize, key_hash: u64, value_type: EntryType) {
161 self.size += key_len + KEY_BLOCK_ENTRY_META_OVERHEAD;
162 self.max_key_len = self.max_key_len.max(key_len);
163 self.entry_count += 1;
164 self.last_hash = key_hash;
165 self.format.update(key_len, value_type);
166 }
167
168 fn flush_info(&self) -> KeyBlockFlushInfo {
170 KeyBlockFlushInfo {
171 max_key_len: self.max_key_len,
172 format: self.format,
173 }
174 }
175
176 fn should_flush(&self, next_key_len: usize, next_key_hash: u64) -> bool {
180 if self.entry_count == 0 {
181 return false;
182 }
183 let would_exceed_size =
184 self.size + next_key_len + KEY_BLOCK_ENTRY_META_OVERHEAD > MAX_KEY_BLOCK_SIZE;
185 let would_exceed_entries = self.entry_count >= MAX_KEY_BLOCK_ENTRIES;
186 (would_exceed_size || would_exceed_entries) && self.last_hash != next_key_hash
188 }
189
190 fn reset(&mut self) {
192 self.size = 0;
193 self.entry_count = 0;
194 self.max_key_len = 0;
195 self.format = KeyBlockFormat::Unknown;
196 }
198}
199
200fn use_hash(max_key_len: usize) -> bool {
202 max_key_len > 32
203}
204
205pub trait Entry {
207 fn key_hash(&self) -> u64;
209 fn key_len(&self) -> usize;
211 fn write_key_to(&self, buf: &mut Vec<u8>);
213
214 fn value(&self) -> EntryValue<'_>;
216}
217
218impl<E: Entry> Entry for &E {
219 fn key_hash(&self) -> u64 {
220 (*self).key_hash()
221 }
222 fn key_len(&self) -> usize {
223 (*self).key_len()
224 }
225 fn write_key_to(&self, buf: &mut Vec<u8>) {
226 (*self).write_key_to(buf)
227 }
228 fn value(&self) -> EntryValue<'_> {
229 (*self).value()
230 }
231}
232
233#[derive(Copy, Clone)]
235pub enum EntryValue<'l> {
236 Inline { value: &'l [u8] },
238 Small { value: &'l [u8] },
240 Medium { value: &'l [u8] },
242 MediumRaw {
244 uncompressed_size: u32,
247 checksum: u32,
249 block: &'l [u8],
250 },
251 Large { blob: u32 },
253 Deleted,
255}
256
257#[derive(Debug, Clone)]
258pub struct StaticSortedFileBuilderMeta<'a> {
259 pub min_hash: u64,
261 pub max_hash: u64,
263 pub amqf: Cow<'a, [u8]>,
265 pub block_count: u16,
267 pub size: u64,
269 pub flags: MetaEntryFlags,
271 pub entries: u64,
273}
274
275pub fn write_static_stored_file<E: Entry>(
282 entries: &[E],
283 file: &Path,
284 flags: MetaEntryFlags,
285) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
286 debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted());
287 let mut writer = StreamingSstWriter::new(file, flags, entries.len() as u64)?;
288 for entry in entries {
289 writer.add(entry)?;
290 }
291 writer.close()
292}
293
294fn write_raw_block_to_file(
303 file: &mut BufWriter<File>,
304 block_offsets: &mut Vec<u32>,
305 uncompressed_size: u32,
306 checksum: u32,
307 block: &[u8],
308) -> Result<u16> {
309 let block_index: u16 = block_offsets
310 .len()
311 .try_into()
312 .expect("Block index overflow");
313
314 let len: u32 = (block.len() + BLOCK_HEADER_SIZE).try_into().unwrap();
315 let offset = block_offsets
316 .last()
317 .copied()
318 .unwrap_or_default()
319 .checked_add(len)
320 .expect("Block offset overflow");
321 block_offsets.push(offset);
322
323 file.write_u32::<BE>(uncompressed_size)
324 .context("Failed to write uncompressed size")?;
325 file.write_u32::<BE>(checksum)
326 .context("Failed to write checksum")?;
327 file.write_all(block)
328 .context("Failed to write block data")?;
329 Ok(block_index)
330}
331
332fn write_block_to_file(
334 file: &mut BufWriter<File>,
335 compress_buffer: &mut Vec<u8>,
336 block_offsets: &mut Vec<u32>,
337 block: &[u8],
338 try_compress: bool,
339) -> Result<u16> {
340 let (uncompressed_size, data_to_write): (u32, &[u8]) = if try_compress {
341 compress_into_buffer(block, compress_buffer)?;
342 if compress_buffer.len() < block.len() - (block.len() / 8) {
344 (block.len().try_into().unwrap(), compress_buffer.as_slice())
345 } else {
346 (0, block)
347 }
348 } else {
349 (0, block)
350 };
351
352 let checksum = checksum_block(data_to_write);
354
355 let result = write_raw_block_to_file(
356 file,
357 block_offsets,
358 uncompressed_size,
359 checksum,
360 data_to_write,
361 );
362 compress_buffer.clear();
363 result
364}
365
366enum ValueRef {
372 Small {
374 block_index: u16,
375 offset: u32,
376 size: u16,
377 },
378 PendingSmall {
381 #[cfg(debug_assertions)]
382 small_block_id: u16,
383 offset: u32,
384 size: u16,
385 },
386 Medium { block_index: u16 },
388 Inline {
390 data: [u8; MAX_INLINE_VALUE_SIZE],
391 len: u8,
392 },
393 Blob { blob_id: u32 },
395 Deleted,
397}
398
399impl ValueRef {
400 fn entry_type(&self) -> EntryType {
402 EntryType(match self {
403 ValueRef::Small { .. } | ValueRef::PendingSmall { .. } => KEY_BLOCK_ENTRY_TYPE_SMALL,
404 ValueRef::Medium { .. } => KEY_BLOCK_ENTRY_TYPE_MEDIUM,
405 ValueRef::Inline { len, .. } => KEY_BLOCK_ENTRY_TYPE_INLINE_MIN + *len,
406 ValueRef::Blob { .. } => KEY_BLOCK_ENTRY_TYPE_BLOB,
407 ValueRef::Deleted => KEY_BLOCK_ENTRY_TYPE_DELETED,
408 })
409 }
410
411 fn write_value_to(&self, buffer: &mut Vec<u8>) {
416 match self {
417 ValueRef::Small {
418 block_index,
419 offset,
420 size,
421 } => {
422 let mut scratch = [0u8; 8];
423 BE::write_u16(&mut scratch, *block_index);
424 BE::write_u16(&mut scratch[2..], *size);
425 BE::write_u32(&mut scratch[4..], *offset);
426 buffer.extend(&scratch);
427 }
428 ValueRef::Medium { block_index } => {
429 let mut scratch = [0u8; 2];
430 BE::write_u16(&mut scratch, *block_index);
431 buffer.extend(scratch);
432 }
433 ValueRef::Inline { data, len } => {
434 buffer.extend(&data[..*len as usize]);
435 }
436 ValueRef::Blob { blob_id } => {
437 let mut scratch = [0u8; 4];
438 BE::write_u32(&mut scratch, *blob_id);
439 buffer.extend(scratch);
440 }
441 ValueRef::Deleted => { }
442 ValueRef::PendingSmall { .. } => {
443 unreachable!("PendingSmall should have been resolved");
444 }
445 }
446 }
447}
448
449struct PendingEntry<E> {
450 entry: E,
451 value_ref: ValueRef,
452}
453
454pub struct StreamingSstWriter<E: Entry> {
464 file: Option<BufWriter<File>>,
467 compress_buffer: Vec<u8>,
468 block_offsets: Vec<u32>,
469
470 pending_keys: VecDeque<PendingEntry<E>>,
493
494 first_pending_small_index: usize,
499
500 #[cfg(debug_assertions)]
502 current_small_block_id: u16,
503
504 pending_small_value_block: Vec<u8>,
506
507 key_buffer: Vec<u8>,
509
510 filter: Option<qfilter::Filter>,
512
513 key_block_boundaries: Vec<(u64, u16)>,
515
516 min_hash: u64,
518 max_hash: u64,
519 entry_count: u64,
520 flags: MetaEntryFlags,
521
522 total_key_size: usize,
524 total_value_size: usize,
525
526 pending_key_total_size: usize,
528
529 current_key_block: KeyBlockAccumulator,
531
532 #[cfg(debug_assertions)]
534 finished: bool,
535}
536
537impl<E: Entry> StreamingSstWriter<E> {
538 pub fn new(file: &Path, flags: MetaEntryFlags, max_entry_count: u64) -> Result<Self> {
544 let file = BufWriter::new(File::create(file)?);
545 let filter = qfilter::Filter::new(max_entry_count.max(1), AMQF_FALSE_POSITIVE_RATE)
546 .expect("Filter can't be constructed");
547
548 let estimated_key_blocks = (max_entry_count as usize)
551 .div_ceil(MAX_KEY_BLOCK_ENTRIES)
552 .max(1);
553 let entries_per_value_block = MIN_SMALL_VALUE_BLOCK_SIZE / AVG_SMALL_VALUE_SIZE;
556 let estimated_value_blocks = (max_entry_count as usize)
557 .div_ceil(entries_per_value_block)
558 .max(1);
559 let estimated_total_blocks = estimated_key_blocks + estimated_value_blocks + 1;
560
561 Ok(Self {
562 file: Some(file),
563 compress_buffer: Vec::with_capacity(MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE),
564 block_offsets: Vec::with_capacity(estimated_total_blocks),
565 pending_keys: VecDeque::with_capacity(entries_per_value_block),
566 first_pending_small_index: 0,
567 #[cfg(debug_assertions)]
568 current_small_block_id: 0,
569 pending_small_value_block: Vec::with_capacity(
570 MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE,
571 ),
572 key_buffer: Vec::with_capacity(MAX_KEY_BLOCK_SIZE),
573 filter: Some(filter),
574 key_block_boundaries: Vec::with_capacity(estimated_key_blocks),
575 min_hash: u64::MAX,
576 max_hash: 0,
577 entry_count: 0,
578 flags,
579 total_key_size: 0,
580 total_value_size: 0,
581 pending_key_total_size: 0,
582 current_key_block: KeyBlockAccumulator::new(),
583 #[cfg(debug_assertions)]
584 finished: false,
585 })
586 }
587
588 pub fn is_full(&self, max_entries: usize, max_data_size: usize) -> bool {
592 self.entry_count as usize >= max_entries
593 || self.total_key_size + self.total_value_size >= max_data_size
594 || !self.has_block_index_capacity()
595 }
596
597 fn has_block_index_capacity(&self) -> bool {
601 let blocks_written = self.block_offsets.len();
602 let pending_small_block = usize::from(!self.pending_small_value_block.is_empty());
607 let pending_key_blocks = self
608 .pending_keys
609 .len()
610 .div_ceil(MAX_KEY_BLOCK_ENTRIES)
611 .max(self.pending_key_total_size.div_ceil(MAX_KEY_BLOCK_SIZE))
612 .max(1);
613 let index_block = 1;
614 let buffer = BLOCK_INDEX_CAPACITY_BUFFER;
615 blocks_written + pending_small_block + pending_key_blocks + index_block + buffer
616 < u16::MAX as usize
617 }
618
619 pub fn add(&mut self, entry: E) -> Result<()> {
621 let key_hash = entry.key_hash();
622 let key_len = entry.key_len();
623
624 if self.entry_count == 0 {
626 self.min_hash = key_hash;
627 }
628 self.max_hash = key_hash;
629 self.entry_count += 1;
630
631 self.filter
633 .as_mut()
634 .unwrap()
635 .insert_fingerprint(false, key_hash)
636 .expect("AMQF insert failed");
637
638 self.total_key_size += key_len;
640 self.pending_key_total_size += key_len;
641
642 let value_ref = match entry.value() {
644 EntryValue::Medium { value } => {
645 self.total_value_size += value.len();
646 let block_index = write_block_to_file(
647 self.file.as_mut().unwrap(),
648 &mut self.compress_buffer,
649 &mut self.block_offsets,
650 value,
651 true,
652 )
653 .context("Failed to write value block")?;
654 ValueRef::Medium { block_index }
655 }
656 EntryValue::MediumRaw {
657 uncompressed_size,
658 checksum,
659 block,
660 } => {
661 self.total_value_size += block.len();
664 let block_index = write_raw_block_to_file(
665 self.file.as_mut().unwrap(),
666 &mut self.block_offsets,
667 uncompressed_size,
668 checksum,
669 block,
670 )
671 .context("Failed to write compressed value block")?;
672 ValueRef::Medium { block_index }
673 }
674 EntryValue::Small { value } => {
675 self.total_value_size += value.len();
676
677 let offset = self.pending_small_value_block.len() as u32;
678 let size: u16 = value.len().try_into().unwrap();
679 self.pending_small_value_block.extend_from_slice(value);
680
681 if self.first_pending_small_index >= self.pending_keys.len() {
683 self.first_pending_small_index = self.pending_keys.len();
684 }
685
686 let value_ref = ValueRef::PendingSmall {
687 #[cfg(debug_assertions)]
688 small_block_id: self.current_small_block_id,
689 offset,
690 size,
691 };
692
693 self.push_pending_key_entry(entry, value_ref);
694
695 if self.pending_small_value_block.len() >= MIN_SMALL_VALUE_BLOCK_SIZE {
699 self.flush_small_value_block()?;
700 }
701
702 return Ok(());
703 }
704 EntryValue::Inline { value } => {
705 debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
706 let mut data = [0u8; MAX_INLINE_VALUE_SIZE];
707 data[..value.len()].copy_from_slice(value);
708 ValueRef::Inline {
709 data,
710 len: value.len() as u8,
711 }
712 }
713 EntryValue::Large { blob } => ValueRef::Blob { blob_id: blob },
714 EntryValue::Deleted => ValueRef::Deleted,
715 };
716
717 self.push_pending_key_entry(entry, value_ref);
718 self.try_flush_key_blocks()
719 }
720
721 fn push_pending_key_entry(&mut self, entry: E, value_ref: ValueRef) {
723 self.pending_keys
724 .push_back(PendingEntry { entry, value_ref });
725 }
726
727 fn try_flush_key_blocks(&mut self) -> Result<()> {
733 debug_assert!(!matches!(
734 self.pending_keys.back().unwrap().value_ref,
735 ValueRef::PendingSmall { .. }
736 ));
737 if self.first_pending_small_index != self.pending_keys.len() - 1 {
738 return Ok(());
740 }
741 self.advance_boundary_to(self.pending_keys.len())
742 }
743
744 fn advance_boundary_to(&mut self, new_boundary: usize) -> Result<()> {
750 let mut last_flushed_end = 0usize;
751 let mut cumulative_key_size = 0usize;
754 let mut flushed_key_size = 0usize;
755
756 for i in self.first_pending_small_index..new_boundary {
757 let entry = &self.pending_keys[i];
758 let key_len = entry.entry.key_len();
759 let key_hash = entry.entry.key_hash();
760 let value_type = entry.value_ref.entry_type();
761
762 if self.current_key_block.should_flush(key_len, key_hash) {
763 let block_end = last_flushed_end + self.current_key_block.entry_count;
764 let info = self.current_key_block.flush_info();
765 self.flush_key_block(last_flushed_end, block_end, info)?;
766 flushed_key_size = cumulative_key_size;
767 last_flushed_end = block_end;
768 self.current_key_block.reset();
769 }
770
771 cumulative_key_size += key_len;
772 self.current_key_block.add(key_len, key_hash, value_type);
773 }
774
775 if last_flushed_end > 0 {
776 self.pending_key_total_size -= flushed_key_size;
777 self.pending_keys.drain(..last_flushed_end);
778 }
779
780 self.first_pending_small_index = new_boundary - last_flushed_end;
781 Ok(())
782 }
783
784 fn flush_small_value_block(&mut self) -> Result<()> {
787 if self.pending_small_value_block.is_empty() {
790 return Ok(());
791 }
792
793 let block_index = write_block_to_file(
794 self.file.as_mut().unwrap(),
795 &mut self.compress_buffer,
796 &mut self.block_offsets,
797 &self.pending_small_value_block,
798 true,
799 )
800 .context("Failed to write small value block")?;
801
802 #[cfg(debug_assertions)]
806 let flushed_id = self.current_small_block_id;
807 for i in self.first_pending_small_index..self.pending_keys.len() {
808 let entry = &mut self.pending_keys[i];
809 if let ValueRef::PendingSmall {
810 #[cfg(debug_assertions)]
811 small_block_id,
812 offset,
813 size,
814 } = entry.value_ref
815 {
816 #[cfg(debug_assertions)]
817 debug_assert_eq!(
818 small_block_id, flushed_id,
819 "all pending small entries must reference the small value block that was just \
820 written"
821 );
822 entry.value_ref = ValueRef::Small {
823 block_index,
824 offset,
825 size,
826 };
827 }
828 }
829
830 self.advance_boundary_to(self.pending_keys.len())?;
833
834 #[cfg(debug_assertions)]
836 {
837 self.current_small_block_id += 1;
838 }
839 self.pending_small_value_block.clear();
840
841 Ok(())
842 }
843
844 fn flush_key_block(&mut self, start: usize, end: usize, info: KeyBlockFlushInfo) -> Result<()> {
846 let entry_count = end - start;
847 let has_hash = use_hash(info.max_key_len);
848 let try_compress = info.max_key_len >= MIN_KEY_SIZE_FOR_COMPRESSION;
849
850 self.key_buffer.clear();
851
852 if let KeyBlockFormat::Fixed {
853 key_len: key_size,
854 value_type,
855 } = info.format
856 {
857 let mut builder = FixedKeyBlockBuilder::new(
858 &mut self.key_buffer,
859 entry_count as u32,
860 has_hash,
861 key_size,
862 value_type,
863 );
864 for i in start..end {
865 let pending = &self.pending_keys[i];
866 builder.put(&pending.entry, &pending.value_ref, has_hash);
867 }
868 builder.finish();
869 } else {
870 let mut builder =
871 KeyBlockBuilder::new(&mut self.key_buffer, entry_count as u32, has_hash);
872
873 for i in start..end {
874 let pending = &self.pending_keys[i];
875 builder.put(&pending.entry, &pending.value_ref, has_hash);
876 }
877
878 builder.finish();
879 }
880
881 let first_hash = self.pending_keys[start].entry.key_hash();
883 let block_index = write_block_to_file(
884 self.file.as_mut().unwrap(),
885 &mut self.compress_buffer,
886 &mut self.block_offsets,
887 &self.key_buffer,
888 try_compress,
889 )
890 .context("Failed to write key block")?;
891 self.key_block_boundaries.push((first_hash, block_index));
892
893 Ok(())
894 }
895
896 pub fn close(mut self) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
899 #[cfg(debug_assertions)]
900 {
901 self.finished = true;
902 }
903
904 self.flush_small_value_block()?;
906
907 self.flush_remaining_key_blocks()?;
909
910 assert!(
911 !self.key_block_boundaries.is_empty(),
912 "StreamingSstWriter::close() called with no entries"
913 );
914
915 let mut file = self.file.take().unwrap();
916
917 let index_entry_count: u16 = (self.key_block_boundaries.len() - 1)
920 .try_into()
921 .expect("Index entries count overflow");
922 let index_block_size: usize =
923 INDEX_BLOCK_HEADER_SIZE + index_entry_count as usize * INDEX_BLOCK_ENTRY_SIZE;
924 let mut index_buf = Vec::with_capacity(index_block_size);
925 {
926 let first_block = self.key_block_boundaries[0].1;
927 let mut index_block = IndexBlockBuilder::new(&mut index_buf, first_block);
928 for &(hash, block) in &self.key_block_boundaries[1..] {
929 index_block.put(hash, block);
930 }
931 }
932 let index_checksum = checksum_block(&index_buf);
933 write_raw_block_to_file(
934 &mut file,
935 &mut self.block_offsets,
936 0,
937 index_checksum,
938 &index_buf,
939 )
940 .context("Failed to write index block")?;
941
942 for offset in &self.block_offsets {
944 file.write_u32::<BE>(*offset)
945 .context("Failed to write block offset")?;
946 }
947
948 let block_count: u16 = self
949 .block_offsets
950 .len()
951 .try_into()
952 .expect("Block count overflow");
953
954 let mut filter = self.filter.take().unwrap();
957 filter.shrink_to_fit();
958
959 let amqf =
961 turbo_bincode_encode(&AmqfBincodeWrapper(filter)).expect("AMQF serialization failed");
962
963 let last_block_end = self.block_offsets.last().copied().unwrap_or_default() as u64;
966 let offset_table_size = block_count as u64 * size_of::<u32>() as u64;
967 let file_size = last_block_end + offset_table_size;
968
969 let meta = StaticSortedFileBuilderMeta {
970 min_hash: self.min_hash,
971 max_hash: self.max_hash,
972 amqf: Cow::Owned(amqf.into_vec()),
973 block_count,
974 size: file_size,
975 flags: self.flags,
976 entries: self.entry_count,
977 };
978
979 Ok((meta, file.into_inner()?))
980 }
981
982 fn flush_remaining_key_blocks(&mut self) -> Result<()> {
989 if self.pending_keys.is_empty() {
990 return Ok(());
991 }
992
993 debug_assert_eq!(
998 self.first_pending_small_index,
999 self.pending_keys.len(),
1000 "expected no unresolved PendingSmall entries after flush_small_value_block"
1001 );
1002
1003 let total = self.pending_keys.len();
1004 let mut block_start = 0;
1005 let mut acc = KeyBlockAccumulator::new();
1006
1007 for i in 0..total {
1008 let entry = &self.pending_keys[i];
1009 let key_len = entry.entry.key_len();
1010 let key_hash = entry.entry.key_hash();
1011 let value_type = entry.value_ref.entry_type();
1012
1013 if acc.should_flush(key_len, key_hash) {
1014 self.flush_key_block(block_start, i, acc.flush_info())?;
1015 block_start = i;
1016 acc.reset();
1017 }
1018
1019 acc.add(key_len, key_hash, value_type);
1020 }
1021
1022 if block_start < total {
1024 self.flush_key_block(block_start, total, acc.flush_info())?;
1025 }
1026
1027 self.pending_keys.clear();
1029 Ok(())
1030 }
1031}
1032
1033#[cfg(debug_assertions)]
1034impl<E: Entry> Drop for StreamingSstWriter<E> {
1035 fn drop(&mut self) {
1036 if !std::thread::panicking() {
1038 assert!(
1039 self.finished || self.entry_count == 0,
1040 "StreamingSstWriter dropped without calling close()"
1041 );
1042 }
1043 }
1044}
1045
1046struct KeyBlockBuilder<'l> {
1055 current_entry: usize,
1056 header_size: usize,
1057 buffer: &'l mut Vec<u8>,
1058}
1059
1060const KEY_BLOCK_HEADER_SIZE: usize = 4;
1062
1063impl<'l> KeyBlockBuilder<'l> {
1064 fn new(buffer: &'l mut Vec<u8>, entry_count: u32, has_hash: bool) -> Self {
1066 debug_assert!(entry_count < (1 << 24));
1067
1068 const ESTIMATED_KEY_SIZE: usize = 16;
1069 buffer.reserve(entry_count as usize * ESTIMATED_KEY_SIZE);
1070 let block_type = if has_hash {
1071 BLOCK_TYPE_KEY_WITH_HASH
1072 } else {
1073 BLOCK_TYPE_KEY_NO_HASH
1074 };
1075 buffer.write_u8(block_type).unwrap();
1076 buffer.write_u24::<BE>(entry_count).unwrap();
1077 for _ in 0..entry_count {
1078 buffer.write_u32::<BE>(0).unwrap();
1079 }
1080 Self {
1081 current_entry: 0,
1082 header_size: buffer.len(),
1083 buffer,
1084 }
1085 }
1086
1087 fn write_entry_header(&mut self, entry_type: EntryType) {
1089 let pos = self.buffer.len() - self.header_size;
1090 let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4;
1091 let header = (pos as u32) | ((entry_type.0 as u32) << 24);
1092 BE::write_u32(&mut self.buffer[header_offset..header_offset + 4], header);
1093 }
1094
1095 fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1097 self.write_entry_header(value_ref.entry_type());
1098 if has_hash {
1099 self.buffer
1100 .extend_from_slice(&entry.key_hash().to_be_bytes());
1101 }
1102 entry.write_key_to(self.buffer);
1103 value_ref.write_value_to(self.buffer);
1104 self.current_entry += 1;
1105 }
1106
1107 fn finish(self) -> &'l mut Vec<u8> {
1109 self.buffer
1110 }
1111}
1112
1113const FIXED_KEY_BLOCK_HEADER_SIZE: usize = 6;
1119
1120struct FixedKeyBlockBuilder<'l> {
1124 buffer: &'l mut Vec<u8>,
1125}
1126
1127impl<'l> FixedKeyBlockBuilder<'l> {
1128 fn new(
1129 buffer: &'l mut Vec<u8>,
1130 entry_count: u32,
1131 has_hash: bool,
1132 key_size: u8,
1133 value_type: EntryType,
1134 ) -> Self {
1135 let hash_len: usize = if has_hash { 8 } else { 0 };
1136 let val_size = value_type_val_size(value_type);
1137 let stride = hash_len + key_size as usize + val_size;
1138 buffer.reserve(FIXED_KEY_BLOCK_HEADER_SIZE + entry_count as usize * stride);
1139
1140 let block_type = if has_hash {
1141 BLOCK_TYPE_FIXED_KEY_WITH_HASH
1142 } else {
1143 BLOCK_TYPE_FIXED_KEY_NO_HASH
1144 };
1145 buffer.extend_from_slice(&[
1146 block_type,
1147 (entry_count >> 16) as u8,
1148 (entry_count >> 8) as u8,
1149 entry_count as u8,
1150 key_size,
1151 value_type.0,
1152 ]);
1153
1154 Self { buffer }
1155 }
1156
1157 fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1159 if has_hash {
1160 self.buffer
1161 .extend_from_slice(&entry.key_hash().to_be_bytes());
1162 }
1163 entry.write_key_to(self.buffer);
1164 value_ref.write_value_to(self.buffer);
1165 }
1166
1167 fn finish(self) -> &'l mut Vec<u8> {
1168 self.buffer
1169 }
1170}
1171
1172fn value_type_val_size(ty: EntryType) -> usize {
1177 match ty.0 {
1178 KEY_BLOCK_ENTRY_TYPE_SMALL => SMALL_VALUE_REF_SIZE,
1179 KEY_BLOCK_ENTRY_TYPE_MEDIUM => MEDIUM_VALUE_REF_SIZE,
1180 KEY_BLOCK_ENTRY_TYPE_BLOB => BLOB_VALUE_REF_SIZE,
1181 KEY_BLOCK_ENTRY_TYPE_DELETED => DELETED_VALUE_REF_SIZE,
1182 ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
1183 (ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize
1184 }
1185 _ => panic!("Invalid key block entry type: {:?}", ty),
1186 }
1187}
1188
1189struct IndexBlockBuilder<W: Write> {
1195 writer: W,
1196}
1197
1198const INDEX_BLOCK_ENTRY_SIZE: usize = size_of::<u64>() + size_of::<u16>();
1200
1201const INDEX_BLOCK_HEADER_SIZE: usize = size_of::<u8>() + size_of::<u16>();
1203
1204impl<W: Write> IndexBlockBuilder<W> {
1205 fn new(mut writer: W, first_block: u16) -> Self {
1208 writer.write_u8(BLOCK_TYPE_INDEX).unwrap();
1209 writer.write_u16::<BE>(first_block).unwrap();
1210 Self { writer }
1211 }
1212
1213 fn put(&mut self, hash: u64, block: u16) {
1215 self.writer.write_u64::<BE>(hash).unwrap();
1216 self.writer.write_u16::<BE>(block).unwrap();
1217 }
1218}
1219
1220#[cfg(test)]
1221mod tests {
1222 use std::hash::BuildHasherDefault;
1223
1224 use quick_cache::sync::Cache;
1225 use rustc_hash::FxHasher;
1226
1227 use super::*;
1228 use crate::{
1229 key::hash_key,
1230 lookup_entry::LookupValue,
1231 static_sorted_file::{
1232 BlockWeighter, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData,
1233 },
1234 };
1235
1236 type TestBlockCache =
1237 Cache<(u32, u16), crate::ArcBytes, BlockWeighter, BuildHasherDefault<FxHasher>>;
1238
1239 fn make_cache() -> TestBlockCache {
1240 TestBlockCache::with(
1241 100,
1242 4 * 1024 * 1024,
1243 Default::default(),
1244 Default::default(),
1245 Default::default(),
1246 )
1247 }
1248
1249 struct TestEntry {
1251 key: Vec<u8>,
1252 hash: u64,
1253 value_kind: TestValueKind,
1254 }
1255
1256 enum TestValueKind {
1257 Inline(Vec<u8>),
1258 Small(Vec<u8>),
1259 Medium(Vec<u8>),
1260 MediumRaw(Vec<u8>),
1262 Blob(u32),
1263 Deleted,
1264 }
1265
1266 impl TestEntry {
1267 fn new(key: &[u8], value_kind: TestValueKind) -> Self {
1268 let key = key.to_vec();
1269 let hash = hash_key(&key);
1270 Self {
1271 key,
1272 hash,
1273 value_kind,
1274 }
1275 }
1276
1277 fn small(key: &[u8], value: &[u8]) -> Self {
1278 Self::new(key, TestValueKind::Small(value.to_vec()))
1279 }
1280
1281 fn inline(key: &[u8], value: &[u8]) -> Self {
1282 debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
1283 Self::new(key, TestValueKind::Inline(value.to_vec()))
1284 }
1285
1286 fn medium(key: &[u8], value: &[u8]) -> Self {
1287 Self::new(key, TestValueKind::Medium(value.to_vec()))
1288 }
1289
1290 fn blob(key: &[u8], blob_id: u32) -> Self {
1291 Self::new(key, TestValueKind::Blob(blob_id))
1292 }
1293
1294 fn deleted(key: &[u8]) -> Self {
1295 Self::new(key, TestValueKind::Deleted)
1296 }
1297
1298 fn medium_raw(key: &[u8], value: &[u8]) -> Self {
1299 Self::new(key, TestValueKind::MediumRaw(value.to_vec()))
1300 }
1301
1302 fn expected_value(&self) -> Option<&[u8]> {
1303 match &self.value_kind {
1304 TestValueKind::Inline(v)
1305 | TestValueKind::Small(v)
1306 | TestValueKind::Medium(v)
1307 | TestValueKind::MediumRaw(v) => Some(v),
1308 _ => None,
1309 }
1310 }
1311 }
1312
1313 impl Entry for TestEntry {
1314 fn key_hash(&self) -> u64 {
1315 self.hash
1316 }
1317
1318 fn key_len(&self) -> usize {
1319 self.key.len()
1320 }
1321
1322 fn write_key_to(&self, buf: &mut Vec<u8>) {
1323 buf.extend_from_slice(&self.key);
1324 }
1325
1326 fn value(&self) -> EntryValue<'_> {
1327 match &self.value_kind {
1328 TestValueKind::Inline(v) => EntryValue::Inline { value: v },
1329 TestValueKind::Small(v) => EntryValue::Small { value: v },
1330 TestValueKind::Medium(v) => EntryValue::Medium { value: v },
1331 TestValueKind::MediumRaw(v) => EntryValue::MediumRaw {
1332 uncompressed_size: 0,
1334 checksum: checksum_block(v),
1335 block: v,
1336 },
1337 TestValueKind::Blob(id) => EntryValue::Large { blob: *id },
1338 TestValueKind::Deleted => EntryValue::Deleted,
1339 }
1340 }
1341 }
1342
1343 fn sort_entries(entries: &mut [TestEntry]) {
1345 entries.sort_by_key(|e| e.hash);
1346 }
1347
1348 fn open_sst(
1350 dir: &Path,
1351 seq: u32,
1352 meta: &StaticSortedFileBuilderMeta<'_>,
1353 ) -> Result<StaticSortedFile> {
1354 StaticSortedFile::open(
1355 dir,
1356 StaticSortedFileMetaData {
1357 sequence_number: seq,
1358 block_count: meta.block_count,
1359 },
1360 )
1361 }
1362
1363 fn write_sst(
1365 dir: &Path,
1366 seq: u32,
1367 entries: &[TestEntry],
1368 flags: MetaEntryFlags,
1369 ) -> Result<StaticSortedFileBuilderMeta<'static>> {
1370 let sst_path = dir.join(format!("{seq:08}.sst"));
1371 let mut writer = StreamingSstWriter::new(&sst_path, flags, entries.len() as u64)?;
1372 for entry in entries {
1373 writer.add(entry)?;
1374 }
1375 let (meta, _file) = writer.close()?;
1376 Ok(meta)
1377 }
1378
1379 fn assert_lookup(
1381 sst: &StaticSortedFile,
1382 entry: &TestEntry,
1383 kc: &TestBlockCache,
1384 vc: &TestBlockCache,
1385 ) -> Result<()> {
1386 let result = sst.lookup::<_, false>(entry.hash, &entry.key, kc, vc)?;
1387 match (&entry.value_kind, result) {
1388 (_, SstLookupResult::Found(values))
1389 if values.len() == 1 && matches!(values[0], LookupValue::Slice { .. }) =>
1390 {
1391 let LookupValue::Slice { value } = &values[0] else {
1392 unreachable!()
1393 };
1394 let expected = entry
1395 .expected_value()
1396 .expect("Got Slice but entry has no value");
1397 assert_eq!(
1398 value.as_ref(),
1399 expected,
1400 "value mismatch for key {:?}",
1401 std::str::from_utf8(&entry.key)
1402 );
1403 }
1404 (TestValueKind::Blob(expected_id), SstLookupResult::Found(values))
1405 if values.len() == 1 && matches!(values[0], LookupValue::Blob { .. }) =>
1406 {
1407 let LookupValue::Blob { sequence_number } = &values[0] else {
1408 unreachable!()
1409 };
1410 assert_eq!(*sequence_number, *expected_id);
1411 }
1412 (TestValueKind::Deleted, SstLookupResult::Found(values))
1413 if values.len() == 1 && matches!(values[0], LookupValue::Deleted) => {}
1414 _ => {
1415 panic!(
1416 "Unexpected lookup result for key {:?}",
1417 std::str::from_utf8(&entry.key)
1418 );
1419 }
1420 }
1421 Ok(())
1422 }
1423
1424 #[test]
1425 fn single_inline_entry() -> Result<()> {
1426 let dir = tempfile::tempdir()?;
1427 let mut entries = vec![TestEntry::inline(b"key1", b"val1")];
1428 sort_entries(&mut entries);
1429
1430 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1431 assert_eq!(meta.entries, 1);
1432
1433 let sst = open_sst(dir.path(), 1, &meta)?;
1434 let kc = make_cache();
1435 let vc = make_cache();
1436 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1437 Ok(())
1438 }
1439
1440 #[test]
1441 fn single_small_entry() -> Result<()> {
1442 let dir = tempfile::tempdir()?;
1443 let value = vec![0xAB; 100]; let mut entries = vec![TestEntry::small(b"skey", &value)];
1445 sort_entries(&mut entries);
1446
1447 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1448 assert_eq!(meta.entries, 1);
1449
1450 let sst = open_sst(dir.path(), 1, &meta)?;
1451 let kc = make_cache();
1452 let vc = make_cache();
1453 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1454 Ok(())
1455 }
1456
1457 #[test]
1458 fn single_medium_entry() -> Result<()> {
1459 let dir = tempfile::tempdir()?;
1460 let value = vec![0xCD; 8192]; let mut entries = vec![TestEntry::medium(b"mkey", &value)];
1462 sort_entries(&mut entries);
1463
1464 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1465 assert_eq!(meta.entries, 1);
1466
1467 let sst = open_sst(dir.path(), 1, &meta)?;
1468 let kc = make_cache();
1469 let vc = make_cache();
1470 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1471 Ok(())
1472 }
1473
1474 #[test]
1475 fn single_blob_entry() -> Result<()> {
1476 let dir = tempfile::tempdir()?;
1477 let mut entries = vec![TestEntry::blob(b"bkey", 42)];
1478 sort_entries(&mut entries);
1479
1480 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1481 assert_eq!(meta.entries, 1);
1482
1483 let sst = open_sst(dir.path(), 1, &meta)?;
1484 let kc = make_cache();
1485 let vc = make_cache();
1486 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1487 Ok(())
1488 }
1489
1490 #[test]
1491 fn single_deleted_entry() -> Result<()> {
1492 let dir = tempfile::tempdir()?;
1493 let mut entries = vec![TestEntry::deleted(b"dkey")];
1494 sort_entries(&mut entries);
1495
1496 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1497 assert_eq!(meta.entries, 1);
1498
1499 let sst = open_sst(dir.path(), 1, &meta)?;
1500 let kc = make_cache();
1501 let vc = make_cache();
1502 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1503 Ok(())
1504 }
1505
1506 #[test]
1507 fn many_small_values() -> Result<()> {
1508 let dir = tempfile::tempdir()?;
1509 let count = 200;
1512 let mut entries: Vec<TestEntry> = (0..count)
1513 .map(|i| {
1514 let key = format!("key-{i:04}");
1515 let value = vec![(i & 0xFF) as u8; 200];
1516 TestEntry::small(key.as_bytes(), &value)
1517 })
1518 .collect();
1519 sort_entries(&mut entries);
1520
1521 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1522 assert_eq!(meta.entries, count as u64);
1523
1524 let sst = open_sst(dir.path(), 1, &meta)?;
1525 let kc = make_cache();
1526 let vc = make_cache();
1527
1528 for entry in &entries {
1529 assert_lookup(&sst, entry, &kc, &vc)?;
1530 }
1531 Ok(())
1532 }
1533
1534 #[test]
1535 fn many_medium_values() -> Result<()> {
1536 let dir = tempfile::tempdir()?;
1537 let count = 50;
1538 let mut entries: Vec<TestEntry> = (0..count)
1539 .map(|i| {
1540 let key = format!("mkey-{i:04}");
1541 let value = vec![(i & 0xFF) as u8; 8192];
1542 TestEntry::medium(key.as_bytes(), &value)
1543 })
1544 .collect();
1545 sort_entries(&mut entries);
1546
1547 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1548 assert_eq!(meta.entries, count as u64);
1549
1550 let sst = open_sst(dir.path(), 1, &meta)?;
1551 let kc = make_cache();
1552 let vc = make_cache();
1553
1554 for entry in &entries {
1555 assert_lookup(&sst, entry, &kc, &vc)?;
1556 }
1557 Ok(())
1558 }
1559
1560 #[test]
1561 fn mixed_value_types() -> Result<()> {
1562 let dir = tempfile::tempdir()?;
1563 let mut entries = vec![
1564 TestEntry::inline(b"a-inline", b"tiny"),
1565 TestEntry::small(b"b-small", &[0x11; 200]),
1566 TestEntry::medium(b"c-medium", &[0x22; 8192]),
1567 TestEntry::blob(b"d-blob", 99),
1568 TestEntry::deleted(b"e-deleted"),
1569 TestEntry::small(b"f-small2", &[0x33; 300]),
1570 TestEntry::inline(b"g-inline2", b"mini"),
1571 TestEntry::medium(b"h-medium2", &[0x44; 16384]),
1572 ];
1573 sort_entries(&mut entries);
1574
1575 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1576 assert_eq!(meta.entries, 8);
1577
1578 let sst = open_sst(dir.path(), 1, &meta)?;
1579 let kc = make_cache();
1580 let vc = make_cache();
1581
1582 for entry in &entries {
1583 assert_lookup(&sst, entry, &kc, &vc)?;
1584 }
1585 Ok(())
1586 }
1587
1588 #[test]
1589 fn is_full_entry_count_limit() {
1590 let dir = tempfile::tempdir().unwrap();
1591 let sst_path = dir.path().join("test.sst");
1592 let mut writer =
1593 StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1594
1595 let max_entries = 50;
1596 for i in 0..max_entries {
1597 let key = format!("k{i:06}");
1598 let entry = TestEntry::inline(key.as_bytes(), &[0; 4]);
1599 writer.add(entry).unwrap();
1600 }
1601
1602 assert_eq!(writer.entry_count, max_entries as u64);
1603 assert!(
1604 writer.is_full(max_entries, usize::MAX),
1605 "Should be full when entry count reaches max_entries"
1606 );
1607 assert!(
1608 !writer.is_full(max_entries + 1, usize::MAX),
1609 "Should not be full when limit is higher"
1610 );
1611 writer.close().unwrap();
1612 }
1613
1614 #[test]
1615 fn is_full_data_size_limit() {
1616 let dir = tempfile::tempdir().unwrap();
1617 let sst_path = dir.path().join("test.sst");
1618 let mut writer =
1619 StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1620
1621 let value = vec![0u8; 1000];
1622 for i in 0..10 {
1623 let key = format!("k{i:06}");
1624 let entry = TestEntry::small(key.as_bytes(), &value);
1625 writer.add(entry).unwrap();
1626 }
1627
1628 let total = writer.total_key_size + writer.total_value_size;
1629 assert!(total > 10_000, "total data should exceed 10KB");
1630 assert!(writer.is_full(usize::MAX, total - 1));
1631 assert!(!writer.is_full(usize::MAX, total + 1));
1632 writer.close().unwrap();
1633 }
1634
1635 #[test]
1636 fn write_static_stored_file_matches_streaming() -> Result<()> {
1637 let dir = tempfile::tempdir()?;
1638
1639 let mut entries: Vec<TestEntry> = (0..100)
1640 .map(|i| {
1641 let key = format!("rkey-{i:04}");
1642 if i % 3 == 0 {
1643 TestEntry::inline(key.as_bytes(), &[(i & 0xFF) as u8; 4])
1644 } else if i % 3 == 1 {
1645 TestEntry::small(key.as_bytes(), &[(i & 0xFF) as u8; 200])
1646 } else {
1647 TestEntry::medium(key.as_bytes(), &[(i & 0xFF) as u8; 8192])
1648 }
1649 })
1650 .collect();
1651 sort_entries(&mut entries);
1652
1653 let batch_path = dir.path().join("00000001.sst");
1655 let (meta1, _) =
1656 write_static_stored_file(&entries, &batch_path, MetaEntryFlags::default())?;
1657
1658 let streaming_path = dir.path().join("00000002.sst");
1660 let mut writer = StreamingSstWriter::new(
1661 &streaming_path,
1662 MetaEntryFlags::default(),
1663 entries.len() as u64,
1664 )?;
1665 for entry in &entries {
1666 writer.add(entry)?;
1667 }
1668 let (meta2, _) = writer.close()?;
1669
1670 assert_eq!(meta1.entries, meta2.entries);
1672 assert_eq!(meta1.min_hash, meta2.min_hash);
1673 assert_eq!(meta1.max_hash, meta2.max_hash);
1674 assert_eq!(meta1.block_count, meta2.block_count);
1675
1676 let sst1 = StaticSortedFile::open(
1678 dir.path(),
1679 StaticSortedFileMetaData {
1680 sequence_number: 1,
1681 block_count: meta1.block_count,
1682 },
1683 )?;
1684 let sst2 = StaticSortedFile::open(
1685 dir.path(),
1686 StaticSortedFileMetaData {
1687 sequence_number: 2,
1688 block_count: meta2.block_count,
1689 },
1690 )?;
1691 let kc = make_cache();
1692 let vc = make_cache();
1693
1694 for entry in &entries {
1695 let r1 = sst1.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1696 let r2 = sst2.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1697 match (&r1, &r2) {
1698 (SstLookupResult::Found(v1), SstLookupResult::Found(v2))
1699 if v1.len() == 1 && v2.len() == 1 =>
1700 {
1701 match (&v1[0], &v2[0]) {
1702 (
1703 LookupValue::Slice { value: val1 },
1704 LookupValue::Slice { value: val2 },
1705 ) => {
1706 assert_eq!(
1707 val1.as_ref(),
1708 val2.as_ref(),
1709 "Value mismatch for key {:?}",
1710 std::str::from_utf8(&entry.key)
1711 );
1712 }
1713 (LookupValue::Deleted, LookupValue::Deleted) => {}
1714 (
1715 LookupValue::Blob {
1716 sequence_number: s1,
1717 },
1718 LookupValue::Blob {
1719 sequence_number: s2,
1720 },
1721 ) => {
1722 assert_eq!(s1, s2);
1723 }
1724 _ => panic!(
1725 "Mismatched results for key {:?}",
1726 std::str::from_utf8(&entry.key)
1727 ),
1728 }
1729 }
1730 _ => panic!(
1731 "Mismatched results for key {:?}",
1732 std::str::from_utf8(&entry.key)
1733 ),
1734 }
1735 }
1736 Ok(())
1737 }
1738
1739 #[test]
1740 #[should_panic(expected = "StreamingSstWriter::close() called with no entries")]
1741 fn close_empty_writer_panics() {
1742 let dir = tempfile::tempdir().unwrap();
1743 let sst_path = dir.path().join("empty.sst");
1744 let writer =
1745 StreamingSstWriter::<TestEntry>::new(&sst_path, MetaEntryFlags::default(), 0).unwrap();
1746 writer.close().unwrap();
1747 }
1748
1749 #[test]
1750 fn key_block_boundary_at_max_entries() -> Result<()> {
1751 let dir = tempfile::tempdir()?;
1752 let count = MAX_KEY_BLOCK_ENTRIES + 1;
1753 let mut entries: Vec<TestEntry> = (0..count)
1754 .map(|i| {
1755 let key = format!("boundary-{i:06}");
1756 TestEntry::inline(key.as_bytes(), &[0u8; 4])
1757 })
1758 .collect();
1759 sort_entries(&mut entries);
1760
1761 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1762 assert_eq!(meta.entries, count as u64);
1763 assert!(
1765 meta.block_count >= 3,
1766 "expected at least 2 key blocks + 1 index block"
1767 );
1768
1769 let sst = open_sst(dir.path(), 1, &meta)?;
1770 let kc = make_cache();
1771 let vc = make_cache();
1772 for entry in &entries {
1773 assert_lookup(&sst, entry, &kc, &vc)?;
1774 }
1775 Ok(())
1776 }
1777
1778 #[test]
1779 fn single_medium_raw_entry() -> Result<()> {
1780 let dir = tempfile::tempdir()?;
1781 let value = vec![0xBE; 8192];
1782 let mut entries = vec![TestEntry::medium_raw(b"rkey", &value)];
1783 sort_entries(&mut entries);
1784
1785 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1786 assert_eq!(meta.entries, 1);
1787
1788 let sst = open_sst(dir.path(), 1, &meta)?;
1789 let kc = make_cache();
1790 let vc = make_cache();
1791 assert_lookup(&sst, &entries[0], &kc, &vc)?;
1792 Ok(())
1793 }
1794
1795 fn corrupt_sst_byte(dir: &Path, seq: u32, pos: u64) {
1797 use std::io::{Seek, SeekFrom, Write as _};
1798
1799 let sst_path = dir.join(format!("{seq:08}.sst"));
1800 let file_bytes = std::fs::read(&sst_path).unwrap();
1801 let original = file_bytes[pos as usize];
1802 let mut file = std::fs::OpenOptions::new()
1803 .write(true)
1804 .open(&sst_path)
1805 .unwrap();
1806 file.seek(SeekFrom::Start(pos)).unwrap();
1807 file.write_all(&[original ^ 0xFF]).unwrap();
1808 file.sync_all().unwrap();
1809 }
1810
1811 fn assert_corruption_detected(
1813 dir: &Path,
1814 seq: u32,
1815 meta: &StaticSortedFileBuilderMeta<'_>,
1816 entries: &[TestEntry],
1817 ) {
1818 let sst = open_sst(dir, seq, meta).unwrap();
1819 let kc = make_cache();
1820 let vc = make_cache();
1821 match sst.lookup::<_, false>(entries[0].hash, &entries[0].key, &kc, &vc) {
1822 Err(err) => {
1823 let msg = format!("{err}");
1824 assert!(
1825 msg.contains("corruption"),
1826 "Expected corruption error, got: {msg}"
1827 );
1828 }
1829 Ok(_) => panic!("Expected checksum error, but lookup succeeded"),
1830 }
1831 }
1832
1833 #[test]
1834 fn checksum_detects_corrupted_compressed_block() {
1835 let dir = tempfile::tempdir().unwrap();
1836 let value = vec![0xCD; 8192];
1838 let entries = vec![TestEntry::medium(b"mkey", &value)];
1839
1840 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1841
1842 corrupt_sst_byte(dir.path(), 1, 4);
1845 assert_corruption_detected(dir.path(), 1, &meta, &entries);
1846 }
1847
1848 #[test]
1849 fn checksum_detects_corrupted_uncompressed_block() {
1850 let dir = tempfile::tempdir().unwrap();
1851 let entries = vec![TestEntry::inline(b"key1", b"val1")];
1853
1854 let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1855
1856 corrupt_sst_byte(dir.path(), 1, BLOCK_HEADER_SIZE as u64 + 1);
1858 assert_corruption_detected(dir.path(), 1, &meta, &entries);
1859 }
1860}