Skip to main content

turbo_persistence/
static_sorted_file_builder.rs

1use std::{
2    borrow::Cow,
3    collections::VecDeque,
4    fs::File,
5    io::{BufWriter, Write},
6    path::Path,
7};
8
9use anyhow::{Context, Result};
10use byteorder::{BE, ByteOrder, WriteBytesExt};
11
12use crate::{
13    compression::{checksum_block, compress_into_buffer},
14    constants::{MAX_INLINE_VALUE_SIZE, MAX_SMALL_VALUE_SIZE, MIN_SMALL_VALUE_BLOCK_SIZE},
15    meta_file::MetaEntryFlags,
16    static_sorted_file::{
17        BLOB_VALUE_REF_SIZE, BLOCK_TYPE_FIXED_KEY_NO_HASH, BLOCK_TYPE_FIXED_KEY_WITH_HASH,
18        BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY_NO_HASH, BLOCK_TYPE_KEY_WITH_HASH, DELETED_VALUE_REF_SIZE,
19        KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, KEY_BLOCK_ENTRY_TYPE_INLINE_MIN,
20        KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL, MEDIUM_VALUE_REF_SIZE,
21        SMALL_VALUE_REF_SIZE,
22    },
23};
24
25/// Size of the per-block header on disk: 4 bytes uncompressed_size + 4 bytes CRC32 checksum.
26pub const BLOCK_HEADER_SIZE: usize = 8;
27
28/// The maximum number of entries that should go into a single key block
29const MAX_KEY_BLOCK_ENTRIES: usize = MAX_KEY_BLOCK_SIZE / KEY_BLOCK_ENTRY_META_OVERHEAD;
30/// The maximum bytes that should go into a single key block
31// Note this must fit into 3 bytes length
32const MAX_KEY_BLOCK_SIZE: usize = 16 * 1024;
33/// Overhead of bytes that should be counted for entries in a key block in addition to the key size.
34/// This covers the worst case (small values):
35/// - 1 byte type (key block header)
36/// - 3 bytes position (key block header)
37/// - 8 bytes hash (optional, but unknown at collection time)
38/// - 2 bytes block index
39/// - 2 bytes size
40/// - 4 bytes position in block
41const KEY_BLOCK_ENTRY_META_OVERHEAD: usize = 20;
42/// The aimed false positive rate for the AMQF
43const AMQF_FALSE_POSITIVE_RATE: f64 = 0.01;
44/// Assumed average small value size for pre-allocation estimates.
45/// Intentionally conservative (small values range from MAX_INLINE_VALUE_SIZE+1 to
46/// MAX_SMALL_VALUE_SIZE = 4096): a low estimate over-counts value blocks, which is
47/// preferable to under-allocating vectors.
48const AVG_SMALL_VALUE_SIZE: usize = 64;
49
50/// Safety margin for block index capacity estimation in
51/// [`StreamingSstWriter::has_block_index_capacity`]. Accounts for rounding in the entry-count and
52/// byte-size based estimates of pending key blocks.
53const BLOCK_INDEX_CAPACITY_BUFFER: usize = 16;
54
55/// Minimum key size (in bytes) for attempting LZ4 compression on key blocks.
56///
57/// Keys are sorted by hash, so we should not expect correlation in the data between nearby keys in
58/// a block. For small keys (below this threshold), compression is unlikely to be able to exploit
59/// patterns and only wastes CPU time. We skip the compression attempt entirely in this case.
60const MIN_KEY_SIZE_FOR_COMPRESSION: usize = 16;
61
62/// Maximum key length that can use fixed-size key block layout.
63///
64/// The on-disk fixed-key header stores the key size as a single byte, so keys longer than this
65/// fall back to variable-size layout.
66const MAX_FIXED_KEY_LEN: usize = u8::MAX as usize;
67
68/// Newtype for the key block entry type byte.
69///
70/// This encodes what kind of value reference an entry has (small, medium, blob, deleted, or
71/// inline with embedded length). See `KEY_BLOCK_ENTRY_TYPE_*` constants.
72#[derive(Clone, Copy, PartialEq, Eq, Debug)]
73struct EntryType(u8);
74
75/// Tracks whether a key block's entries are uniform enough for fixed-size layout.
76///
77/// State transitions:
78/// - `Unknown` → first entry → `Fixed { key_len, value_type }`
79/// - `Fixed` + matching entry → stays `Fixed`
80/// - `Fixed` + mismatched key_len or value_type → `Variable`
81/// - `Variable` → stays `Variable`
82#[derive(Clone, Copy)]
83enum KeyBlockFormat {
84    /// No entries yet — format undetermined.
85    Unknown,
86    /// All entries so far have uniform key length and value type.
87    Fixed { key_len: u8, value_type: EntryType },
88    /// Entries have mixed key lengths or value types; must use offset table.
89    Variable,
90}
91
92impl KeyBlockFormat {
93    /// Updates the format after seeing an entry with the given key length and value type.
94    ///
95    /// A `Fixed` state is only reachable when all entries have matching key length and value type,
96    /// and the key length fits in a u8 (required by the on-disk header).
97    fn update(&mut self, key_len: usize, value_type: EntryType) {
98        *self = match *self {
99            KeyBlockFormat::Unknown => {
100                if key_len <= MAX_FIXED_KEY_LEN {
101                    KeyBlockFormat::Fixed {
102                        key_len: key_len as u8,
103                        value_type,
104                    }
105                } else {
106                    KeyBlockFormat::Variable
107                }
108            }
109            KeyBlockFormat::Fixed {
110                key_len: k,
111                value_type: v,
112            } if k as usize == key_len && v == value_type => KeyBlockFormat::Fixed {
113                key_len: k,
114                value_type: v,
115            },
116            KeyBlockFormat::Fixed { .. } | KeyBlockFormat::Variable => KeyBlockFormat::Variable,
117        };
118    }
119}
120
121/// Copy-able snapshot of the accumulator state needed by [`flush_key_block`].
122#[derive(Clone, Copy)]
123struct KeyBlockFlushInfo {
124    max_key_len: usize,
125    format: KeyBlockFormat,
126}
127
128/// Tracks the accumulated state of the current incomplete key block.
129///
130/// During streaming, this sits on [`StreamingSstWriter`] and tracks the tail of the resolved
131/// prefix. Entries are added one at a time; when [`should_flush`](Self::should_flush) returns
132/// `true`, the caller should flush the block and call [`reset`](Self::reset).
133struct KeyBlockAccumulator {
134    /// Accumulated byte size (keys + per-entry overhead) of entries in this block.
135    size: usize,
136    /// Number of entries accumulated so far.
137    entry_count: usize,
138    /// Maximum key length among accumulated entries (determines whether hashes are stored).
139    max_key_len: usize,
140    /// Hash of the most recently added entry (used to avoid splitting entries with equal hashes
141    /// across blocks).
142    last_hash: u64,
143    /// Whether the block qualifies for fixed-size layout.
144    format: KeyBlockFormat,
145}
146
147impl KeyBlockAccumulator {
148    fn new() -> Self {
149        Self {
150            size: 0,
151            entry_count: 0,
152            max_key_len: 0,
153            last_hash: 0,
154            format: KeyBlockFormat::Unknown,
155        }
156    }
157
158    /// Records a new entry in the accumulator.
159    fn add(&mut self, key_len: usize, key_hash: u64, value_type: EntryType) {
160        self.size += key_len + KEY_BLOCK_ENTRY_META_OVERHEAD;
161        self.max_key_len = self.max_key_len.max(key_len);
162        self.entry_count += 1;
163        self.last_hash = key_hash;
164        self.format.update(key_len, value_type);
165    }
166
167    /// Snapshots the state needed by `flush_key_block`.
168    fn flush_info(&self) -> KeyBlockFlushInfo {
169        KeyBlockFlushInfo {
170            max_key_len: self.max_key_len,
171            format: self.format,
172        }
173    }
174
175    /// Returns `true` if the block should be flushed before adding an entry with the given key
176    /// length and hash. Returns `false` for empty blocks and when the next entry shares its hash
177    /// with the current last entry (to avoid splitting equal-hash runs).
178    fn should_flush(&self, next_key_len: usize, next_key_hash: u64) -> bool {
179        if self.entry_count == 0 {
180            return false;
181        }
182        let would_exceed_size =
183            self.size + next_key_len + KEY_BLOCK_ENTRY_META_OVERHEAD > MAX_KEY_BLOCK_SIZE;
184        let would_exceed_entries = self.entry_count >= MAX_KEY_BLOCK_ENTRIES;
185        // Never split entries with the same hash across blocks.
186        (would_exceed_size || would_exceed_entries) && self.last_hash != next_key_hash
187    }
188
189    /// Resets the accumulator for a new key block.
190    fn reset(&mut self) {
191        self.size = 0;
192        self.entry_count = 0;
193        self.max_key_len = 0;
194        self.format = KeyBlockFormat::Unknown;
195        // last_hash is intentionally not reset -- it is overwritten on the next add() call.
196    }
197}
198
199/// Determines whether to store the hash per entry based on max key length.
200fn use_hash(max_key_len: usize) -> bool {
201    max_key_len > 32
202}
203
204/// Trait for entries from that SST files can be created
205pub trait Entry {
206    /// Returns the hash of the key
207    fn key_hash(&self) -> u64;
208    /// Returns the length of the key
209    fn key_len(&self) -> usize;
210    /// Writes the key to a buffer
211    fn write_key_to(&self, buf: &mut Vec<u8>);
212
213    /// Returns the value
214    fn value(&self) -> EntryValue<'_>;
215}
216
217impl<E: Entry> Entry for &E {
218    fn key_hash(&self) -> u64 {
219        (*self).key_hash()
220    }
221    fn key_len(&self) -> usize {
222        (*self).key_len()
223    }
224    fn write_key_to(&self, buf: &mut Vec<u8>) {
225        (*self).write_key_to(buf)
226    }
227    fn value(&self) -> EntryValue<'_> {
228        (*self).value()
229    }
230}
231
232/// Reference to a value
233#[derive(Copy, Clone)]
234pub enum EntryValue<'l> {
235    /// Inline value stored directly in the key block.
236    Inline { value: &'l [u8] },
237    /// Small-sized value. They are stored in shared value blocks.
238    Small { value: &'l [u8] },
239    /// Medium-sized value. They are stored in their own value block.
240    Medium { value: &'l [u8] },
241    /// Medium-sized value. They are stored in their own value block. In the raw form as on disk.
242    MediumRaw {
243        /// The uncompressed size of the block data. `0` means the block is stored uncompressed
244        /// (and thus the size is the `len` of the block)
245        uncompressed_size: u32,
246        /// CRC32 checksum of the on-disk block data (after compression).
247        checksum: u32,
248        block: &'l [u8],
249    },
250    /// Large-sized value. They are stored in a blob file.
251    Large { blob: u32 },
252    /// Tombstone. The value was removed.
253    Deleted,
254}
255
256#[derive(Debug, Clone)]
257pub struct StaticSortedFileBuilderMeta<'a> {
258    /// The minimum hash of the keys in the SST file
259    pub min_hash: u64,
260    /// The maximum hash of the keys in the SST file
261    pub max_hash: u64,
262    /// The AMQF data
263    pub amqf: Cow<'a, [u8]>,
264    /// The number of blocks in the SST file
265    pub block_count: u16,
266    /// The file size of the SST file
267    pub size: u64,
268    /// The status flags for this SST file
269    pub flags: MetaEntryFlags,
270    /// The number of entries in the SST file
271    pub entries: u64,
272}
273
274/// Writes an SST file from a pre-sorted slice of entries.
275///
276/// This is a convenience wrapper around [`StreamingSstWriter`] for callers that already have all
277/// entries in memory.
278// TODO: Consider adding a variant that takes ownership (Vec<E> or drain iterator)
279// to free entry memory as blocks are written.
280pub fn write_static_stored_file<E: Entry>(
281    entries: &[E],
282    file: &Path,
283    flags: MetaEntryFlags,
284) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
285    debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted());
286    let mut writer = StreamingSstWriter::new(file, flags, entries.len() as u64)?;
287    for entry in entries {
288        writer.add(entry)?;
289    }
290    writer.close()
291}
292
293// ---------------------------------------------------------------------------
294// Block I/O helpers (free functions for borrow-checker friendliness)
295// ---------------------------------------------------------------------------
296
297/// Writes a raw (already-formatted) block to the file. Returns the block index assigned.
298///
299/// `uncompressed_size` is the original uncompressed size of the block data, or `0` if the block
300/// is stored uncompressed.
301fn write_raw_block_to_file(
302    file: &mut BufWriter<File>,
303    block_offsets: &mut Vec<u32>,
304    uncompressed_size: u32,
305    checksum: u32,
306    block: &[u8],
307) -> Result<u16> {
308    let block_index: u16 = block_offsets
309        .len()
310        .try_into()
311        .expect("Block index overflow");
312
313    let len: u32 = (block.len() + BLOCK_HEADER_SIZE).try_into().unwrap();
314    let offset = block_offsets
315        .last()
316        .copied()
317        .unwrap_or_default()
318        .checked_add(len)
319        .expect("Block offset overflow");
320    block_offsets.push(offset);
321
322    file.write_u32::<BE>(uncompressed_size)
323        .context("Failed to write uncompressed size")?;
324    file.write_u32::<BE>(checksum)
325        .context("Failed to write checksum")?;
326    file.write_all(block)
327        .context("Failed to write block data")?;
328    Ok(block_index)
329}
330
331/// Writes a block to the file, optionally compressing it. Returns the block index assigned.
332fn write_block_to_file(
333    file: &mut BufWriter<File>,
334    compress_buffer: &mut Vec<u8>,
335    block_offsets: &mut Vec<u32>,
336    block: &[u8],
337    try_compress: bool,
338) -> Result<u16> {
339    let (uncompressed_size, data_to_write): (u32, &[u8]) = if try_compress {
340        compress_into_buffer(block, compress_buffer)?;
341        // Same threshold as LevelDB/RocksDB: require at least 12.5% savings.
342        if compress_buffer.len() < block.len() - (block.len() / 8) {
343            (block.len().try_into().unwrap(), compress_buffer.as_slice())
344        } else {
345            (0, block)
346        }
347    } else {
348        (0, block)
349    };
350
351    // Checksum is computed on the on-disk data (after compression).
352    let checksum = checksum_block(data_to_write);
353
354    let result = write_raw_block_to_file(
355        file,
356        block_offsets,
357        uncompressed_size,
358        checksum,
359        data_to_write,
360    );
361    compress_buffer.clear();
362    result
363}
364
365// ---------------------------------------------------------------------------
366// StreamingSstWriter
367// ---------------------------------------------------------------------------
368
369/// Where a key entry's value lives (or will live once the small block flushes).
370enum ValueRef {
371    /// Value in a known small value block (already flushed).
372    Small {
373        block_index: u16,
374        offset: u32,
375        size: u16,
376    },
377    /// Value is in a small value block that hasn't been written yet. Will be resolved in-place
378    /// to [`ValueRef::Small`] when the small block is flushed.
379    PendingSmall {
380        #[cfg(debug_assertions)]
381        small_block_id: u16,
382        offset: u32,
383        size: u16,
384    },
385    /// Medium value already written to its own block.
386    Medium { block_index: u16 },
387    /// Inline value (stored directly in the key block).
388    Inline {
389        data: [u8; MAX_INLINE_VALUE_SIZE],
390        len: u8,
391    },
392    /// Large blob stored externally.
393    Blob { blob_id: u32 },
394    /// Tombstone.
395    Deleted,
396}
397
398impl ValueRef {
399    /// Returns the key block entry type for this value reference.
400    fn entry_type(&self) -> EntryType {
401        EntryType(match self {
402            ValueRef::Small { .. } | ValueRef::PendingSmall { .. } => KEY_BLOCK_ENTRY_TYPE_SMALL,
403            ValueRef::Medium { .. } => KEY_BLOCK_ENTRY_TYPE_MEDIUM,
404            ValueRef::Inline { len, .. } => KEY_BLOCK_ENTRY_TYPE_INLINE_MIN + *len,
405            ValueRef::Blob { .. } => KEY_BLOCK_ENTRY_TYPE_BLOB,
406            ValueRef::Deleted => KEY_BLOCK_ENTRY_TYPE_DELETED,
407        })
408    }
409
410    /// Writes the value bytes for this reference to a buffer.
411    ///
412    /// This is the shared serialization logic used by both variable-size and fixed-size key block
413    /// builders.
414    fn write_value_to(&self, buffer: &mut Vec<u8>) {
415        match self {
416            ValueRef::Small {
417                block_index,
418                offset,
419                size,
420            } => {
421                let mut scratch = [0u8; 8];
422                BE::write_u16(&mut scratch, *block_index);
423                BE::write_u16(&mut scratch[2..], *size);
424                BE::write_u32(&mut scratch[4..], *offset);
425                buffer.extend(&scratch);
426            }
427            ValueRef::Medium { block_index } => {
428                let mut scratch = [0u8; 2];
429                BE::write_u16(&mut scratch, *block_index);
430                buffer.extend(scratch);
431            }
432            ValueRef::Inline { data, len } => {
433                buffer.extend(&data[..*len as usize]);
434            }
435            ValueRef::Blob { blob_id } => {
436                let mut scratch = [0u8; 4];
437                BE::write_u32(&mut scratch, *blob_id);
438                buffer.extend(scratch);
439            }
440            ValueRef::Deleted => { /* no value bytes */ }
441            ValueRef::PendingSmall { .. } => {
442                unreachable!("PendingSmall should have been resolved");
443            }
444        }
445    }
446}
447
448struct PendingEntry<E> {
449    entry: E,
450    value_ref: ValueRef,
451}
452
453/// A streaming SST file writer that writes blocks to disk incrementally.
454///
455/// Instead of materializing all entries in memory and then writing all value blocks followed by all
456/// key blocks, this writer interleaves block writes as entries arrive. Medium values are written
457/// immediately, small values are accumulated into blocks, and key blocks are flushed as soon as
458/// their value references are all resolved.
459///
460/// The SST reader is block-index-addressed (not file-position-addressed), so interleaving block
461/// types is fully compatible.
462pub struct StreamingSstWriter<E: Entry> {
463    // File I/O. Wrapped in Option so close() can take ownership without a partial-move
464    // compile error (partial moves are forbidden when the type has a Drop impl).
465    file: Option<BufWriter<File>>,
466    compress_buffer: Vec<u8>,
467    block_offsets: Vec<u32>,
468
469    /// Pending key entries waiting to be flushed as key blocks.
470    ///
471    /// Entries are appended at the back and drained from the front once flushed.
472    ///
473    /// ```text
474    ///  Resolved entries              Unresolved entries
475    ///  (value block index known)     (PendingSmall references)
476    /// |------------------------------|--------------------------|
477    /// 0                     first_pending_small_index         len()
478    ///
479    ///  ^-- current_key_block tracks      ^-- these wait for
480    ///      the incomplete tail block         flush_small_value_block()
481    ///      within this region                to resolve them
482    /// ```
483    ///
484    /// [`advance_boundary_to`](Self::advance_boundary_to) scans the resolved prefix, flushes
485    /// complete key blocks from the front, and drains them. When a small value block is flushed,
486    /// all `PendingSmall` entries are resolved in-place and the boundary advances to `len()`.
487    ///
488    /// **Unbounded growth note:** If a small number of small values appear early, followed by
489    /// many medium/inline values, the queue grows because the front entries block on the
490    /// unflushed small value block while the back keeps accepting resolved entries.
491    pending_keys: VecDeque<PendingEntry<E>>,
492
493    /// Index into `pending_keys` of the first entry that has a `PendingSmall` reference for the
494    /// current (unflushed) small value block. All entries before this index are fully resolved
495    /// (their value block indices are known). Equals `pending_keys.len()` when no pending small
496    /// entries exist.
497    first_pending_small_index: usize,
498
499    /// The current small_block_id being accumulated into (debug-only consistency check).
500    #[cfg(debug_assertions)]
501    current_small_block_id: u16,
502
503    // Pending small value block buffer.
504    pending_small_value_block: Vec<u8>,
505
506    // Reusable buffer for building key blocks
507    key_buffer: Vec<u8>,
508
509    // Collected key hashes truncated to u32 for deferred AMQF construction via sorted Builder
510    // in close(). Fingerprint size is always <32 bits, so the lower 32 bits suffice.
511    collected_fingerprints: Vec<u32>,
512
513    // Index block data: (first_hash, block_index) for each key block written
514    key_block_boundaries: Vec<(u64, u16)>,
515
516    // Metadata
517    min_hash: u64,
518    max_hash: u64,
519    entry_count: u64,
520    flags: MetaEntryFlags,
521
522    // Fullness tracking (for compaction callers)
523    total_key_size: usize,
524    total_value_size: usize,
525
526    /// Total byte size of keys in `pending_keys` (for block capacity estimation).
527    pending_key_total_size: usize,
528
529    /// State of the current incomplete key block at the tail of the resolved prefix.
530    current_key_block: KeyBlockAccumulator,
531
532    /// Set to `true` by `close()` so the Drop guard can detect writers dropped without closing.
533    #[cfg(debug_assertions)]
534    finished: bool,
535}
536
537impl<E: Entry> StreamingSstWriter<E> {
538    /// Creates a new streaming SST writer.
539    ///
540    /// `max_entry_count` is used to pre-allocate buffers and estimate block counts.
541    pub fn new(file: &Path, flags: MetaEntryFlags, max_entry_count: u64) -> Result<Self> {
542        let file = BufWriter::new(File::create(file)?);
543
544        // Estimate number of key blocks based on max entry count.
545        // Each key block holds up to MAX_KEY_BLOCK_ENTRIES entries.
546        let estimated_key_blocks = (max_entry_count as usize)
547            .div_ceil(MAX_KEY_BLOCK_ENTRIES)
548            .max(1);
549        // Estimate value blocks assuming all entries are small values of average size.
550        // Each small value block holds ~MIN_SMALL_VALUE_BLOCK_SIZE / AVG_SMALL_VALUE_SIZE entries.
551        let entries_per_value_block = MIN_SMALL_VALUE_BLOCK_SIZE / AVG_SMALL_VALUE_SIZE;
552        let estimated_value_blocks = (max_entry_count as usize)
553            .div_ceil(entries_per_value_block)
554            .max(1);
555        let estimated_total_blocks = estimated_key_blocks + estimated_value_blocks + 1;
556
557        Ok(Self {
558            file: Some(file),
559            compress_buffer: Vec::with_capacity(MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE),
560            block_offsets: Vec::with_capacity(estimated_total_blocks),
561            pending_keys: VecDeque::with_capacity(entries_per_value_block),
562            first_pending_small_index: 0,
563            #[cfg(debug_assertions)]
564            current_small_block_id: 0,
565            pending_small_value_block: Vec::with_capacity(
566                MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE,
567            ),
568            key_buffer: Vec::with_capacity(MAX_KEY_BLOCK_SIZE),
569            collected_fingerprints: Vec::with_capacity(max_entry_count as usize),
570            key_block_boundaries: Vec::with_capacity(estimated_key_blocks),
571            min_hash: u64::MAX,
572            max_hash: 0,
573            entry_count: 0,
574            flags,
575            total_key_size: 0,
576            total_value_size: 0,
577            pending_key_total_size: 0,
578            current_key_block: KeyBlockAccumulator::new(),
579            #[cfg(debug_assertions)]
580            finished: false,
581        })
582    }
583
584    /// Returns true if the SST file has reached capacity limits.
585    ///
586    /// This is intended for compaction callers that need to split output across multiple SST files.
587    pub fn is_full(&self, max_entries: usize, max_data_size: usize) -> bool {
588        self.entry_count as usize >= max_entries
589            || self.total_key_size + self.total_value_size >= max_data_size
590            || !self.has_block_index_capacity()
591    }
592
593    /// Returns true if the SST file has room for more blocks without overflowing the `u16` block
594    /// index. Uses the exact count of blocks already written plus a conservative estimate of
595    /// blocks still needed for pending entries and the index.
596    fn has_block_index_capacity(&self) -> bool {
597        let blocks_written = self.block_offsets.len();
598        // Blocks still needed:
599        // - 1 pending small value block (if buffer is non-empty)
600        // - key blocks for pending entries (upper bound from both entry count and byte size)
601        // - 1 index block
602        let pending_small_block = usize::from(!self.pending_small_value_block.is_empty());
603        let pending_key_blocks = self
604            .pending_keys
605            .len()
606            .div_ceil(MAX_KEY_BLOCK_ENTRIES)
607            .max(self.pending_key_total_size.div_ceil(MAX_KEY_BLOCK_SIZE))
608            .max(1);
609        let index_block = 1;
610        let buffer = BLOCK_INDEX_CAPACITY_BUFFER;
611        blocks_written + pending_small_block + pending_key_blocks + index_block + buffer
612            < u16::MAX as usize
613    }
614
615    /// Adds an entry to the SST file. Entries must be added in (key-hash, key) order.
616    pub fn add(&mut self, entry: E) -> Result<()> {
617        let key_hash = entry.key_hash();
618        let key_len = entry.key_len();
619
620        // Update metadata
621        if self.entry_count == 0 {
622            self.min_hash = key_hash;
623        }
624        self.max_hash = key_hash;
625        self.entry_count += 1;
626
627        // Collect hash for deferred AMQF construction in close()
628        self.collected_fingerprints.push(key_hash as u32);
629
630        // Track key size for fullness and block capacity
631        self.total_key_size += key_len;
632        self.pending_key_total_size += key_len;
633
634        // Route value
635        let value_ref = match entry.value() {
636            EntryValue::Medium { value } => {
637                self.total_value_size += value.len();
638                let block_index = write_block_to_file(
639                    self.file.as_mut().unwrap(),
640                    &mut self.compress_buffer,
641                    &mut self.block_offsets,
642                    value,
643                    true,
644                )
645                .context("Failed to write value block")?;
646                ValueRef::Medium { block_index }
647            }
648            EntryValue::MediumRaw {
649                uncompressed_size,
650                checksum,
651                block,
652            } => {
653                // Note: tracks compressed block size (not uncompressed) unlike EntryValue::Medium.
654                // Both are acceptable approximations of disk usage for is_full() thresholds.
655                self.total_value_size += block.len();
656                let block_index = write_raw_block_to_file(
657                    self.file.as_mut().unwrap(),
658                    &mut self.block_offsets,
659                    uncompressed_size,
660                    checksum,
661                    block,
662                )
663                .context("Failed to write compressed value block")?;
664                ValueRef::Medium { block_index }
665            }
666            EntryValue::Small { value } => {
667                self.total_value_size += value.len();
668
669                let offset = self.pending_small_value_block.len() as u32;
670                let size: u16 = value.len().try_into().unwrap();
671                self.pending_small_value_block.extend_from_slice(value);
672
673                // Track where the first PendingSmall entry is in the queue
674                if self.first_pending_small_index >= self.pending_keys.len() {
675                    self.first_pending_small_index = self.pending_keys.len();
676                }
677
678                let value_ref = ValueRef::PendingSmall {
679                    #[cfg(debug_assertions)]
680                    small_block_id: self.current_small_block_id,
681                    offset,
682                    size,
683                };
684
685                self.push_pending_key_entry(entry, value_ref);
686
687                // Eagerly flush the small block AFTER pushing the new entry. This resolves
688                // the just-pushed entry immediately via advance_boundary_to(), so key blocks
689                // can be flushed incrementally.
690                if self.pending_small_value_block.len() >= MIN_SMALL_VALUE_BLOCK_SIZE {
691                    self.flush_small_value_block()?;
692                }
693
694                return Ok(());
695            }
696            EntryValue::Inline { value } => {
697                debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
698                let mut data = [0u8; MAX_INLINE_VALUE_SIZE];
699                data[..value.len()].copy_from_slice(value);
700                ValueRef::Inline {
701                    data,
702                    len: value.len() as u8,
703                }
704            }
705            EntryValue::Large { blob } => ValueRef::Blob { blob_id: blob },
706            EntryValue::Deleted => ValueRef::Deleted,
707        };
708
709        self.push_pending_key_entry(entry, value_ref);
710        self.try_flush_key_blocks()
711    }
712
713    /// Appends a new entry to the pending-keys queue.
714    fn push_pending_key_entry(&mut self, entry: E, value_ref: ValueRef) {
715        self.pending_keys
716            .push_back(PendingEntry { entry, value_ref });
717    }
718
719    /// Advances `first_pending_small_index` past the just-pushed entry if it is resolved and
720    /// sits right at the current boundary. Flushes complete key blocks incrementally.
721    ///
722    /// Must be called immediately after [`push_pending_key_entry`] with a resolved
723    /// (non-`PendingSmall`) entry.
724    fn try_flush_key_blocks(&mut self) -> Result<()> {
725        debug_assert!(!matches!(
726            self.pending_keys.back().unwrap().value_ref,
727            ValueRef::PendingSmall { .. }
728        ));
729        if self.first_pending_small_index != self.pending_keys.len() - 1 {
730            // Boundary is blocked by earlier unresolved PendingSmall entries.
731            return Ok(());
732        }
733        self.advance_boundary_to(self.pending_keys.len())
734    }
735
736    /// Advances the resolved boundary from its current position to `new_boundary`,
737    /// incrementally tracking key block sizes and flushing complete key blocks.
738    ///
739    /// All entries in `pending_keys[self.first_pending_small_index..new_boundary]`
740    /// must have resolved (non-`PendingSmall`) value references.
741    fn advance_boundary_to(&mut self, new_boundary: usize) -> Result<()> {
742        let mut last_flushed_end = 0usize;
743        // Cumulative key sizes of all entries visited so far, and the snapshot at the last
744        // flush point. The difference at the end gives the total key size of drained entries.
745        let mut cumulative_key_size = 0usize;
746        let mut flushed_key_size = 0usize;
747
748        for i in self.first_pending_small_index..new_boundary {
749            let entry = &self.pending_keys[i];
750            let key_len = entry.entry.key_len();
751            let key_hash = entry.entry.key_hash();
752            let value_type = entry.value_ref.entry_type();
753
754            if self.current_key_block.should_flush(key_len, key_hash) {
755                let block_end = last_flushed_end + self.current_key_block.entry_count;
756                let info = self.current_key_block.flush_info();
757                self.flush_key_block(last_flushed_end, block_end, info)?;
758                flushed_key_size = cumulative_key_size;
759                last_flushed_end = block_end;
760                self.current_key_block.reset();
761            }
762
763            cumulative_key_size += key_len;
764            self.current_key_block.add(key_len, key_hash, value_type);
765        }
766
767        if last_flushed_end > 0 {
768            self.pending_key_total_size -= flushed_key_size;
769            self.pending_keys.drain(..last_flushed_end);
770        }
771
772        self.first_pending_small_index = new_boundary - last_flushed_end;
773        Ok(())
774    }
775
776    /// Flushes the current pending small value block to disk and resolves all `PendingSmall`
777    /// entries in-place.
778    fn flush_small_value_block(&mut self) -> Result<()> {
779        // Early return if empty -- this simplifies trailing small value block handling in
780        // `close()` where we call this unconditionally.
781        if self.pending_small_value_block.is_empty() {
782            return Ok(());
783        }
784
785        let block_index = write_block_to_file(
786            self.file.as_mut().unwrap(),
787            &mut self.compress_buffer,
788            &mut self.block_offsets,
789            &self.pending_small_value_block,
790            true,
791        )
792        .context("Failed to write small value block")?;
793
794        // Resolve all PendingSmall entries for this block in-place.
795        // Only scan from first_pending_small_index -- entries before it are guaranteed
796        // already resolved (from previous flush calls).
797        #[cfg(debug_assertions)]
798        let flushed_id = self.current_small_block_id;
799        for i in self.first_pending_small_index..self.pending_keys.len() {
800            let entry = &mut self.pending_keys[i];
801            if let ValueRef::PendingSmall {
802                #[cfg(debug_assertions)]
803                small_block_id,
804                offset,
805                size,
806            } = entry.value_ref
807            {
808                #[cfg(debug_assertions)]
809                debug_assert_eq!(
810                    small_block_id, flushed_id,
811                    "all pending small entries must reference the small value block that was just \
812                     written"
813                );
814                entry.value_ref = ValueRef::Small {
815                    block_index,
816                    offset,
817                    size,
818                };
819            }
820        }
821
822        // All PendingSmall entries are now resolved. Advance the boundary through all of
823        // them, flushing key blocks incrementally as we go.
824        self.advance_boundary_to(self.pending_keys.len())?;
825
826        // Advance to next small block id (debug-only consistency check)
827        #[cfg(debug_assertions)]
828        {
829            self.current_small_block_id += 1;
830        }
831        self.pending_small_value_block.clear();
832
833        Ok(())
834    }
835
836    /// Flushes a single key block from `pending_keys[start..end]`.
837    fn flush_key_block(&mut self, start: usize, end: usize, info: KeyBlockFlushInfo) -> Result<()> {
838        let entry_count = end - start;
839        let has_hash = use_hash(info.max_key_len);
840        let try_compress = info.max_key_len >= MIN_KEY_SIZE_FOR_COMPRESSION;
841
842        self.key_buffer.clear();
843
844        if let KeyBlockFormat::Fixed {
845            key_len: key_size,
846            value_type,
847        } = info.format
848        {
849            let mut builder = FixedKeyBlockBuilder::new(
850                &mut self.key_buffer,
851                entry_count as u32,
852                has_hash,
853                key_size,
854                value_type,
855            );
856            for i in start..end {
857                let pending = &self.pending_keys[i];
858                builder.put(&pending.entry, &pending.value_ref, has_hash);
859            }
860            builder.finish();
861        } else {
862            let mut builder =
863                KeyBlockBuilder::new(&mut self.key_buffer, entry_count as u32, has_hash);
864
865            for i in start..end {
866                let pending = &self.pending_keys[i];
867                builder.put(&pending.entry, &pending.value_ref, has_hash);
868            }
869
870            builder.finish();
871        }
872
873        // Record boundary
874        let first_hash = self.pending_keys[start].entry.key_hash();
875        let block_index = write_block_to_file(
876            self.file.as_mut().unwrap(),
877            &mut self.compress_buffer,
878            &mut self.block_offsets,
879            &self.key_buffer,
880            try_compress,
881        )
882        .context("Failed to write key block")?;
883        self.key_block_boundaries.push((first_hash, block_index));
884
885        Ok(())
886    }
887
888    /// Finishes writing the SST file. Flushes remaining blocks, writes the index, and returns
889    /// metadata.
890    pub fn close(mut self) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
891        #[cfg(debug_assertions)]
892        {
893            self.finished = true;
894        }
895
896        // Flush remaining small value block (even if under MIN_SMALL_VALUE_BLOCK_SIZE).
897        self.flush_small_value_block()?;
898
899        // Now all PendingSmall entries are resolved. Flush all remaining key blocks.
900        self.flush_remaining_key_blocks()?;
901
902        assert!(
903            !self.key_block_boundaries.is_empty(),
904            "StreamingSstWriter::close() called with no entries"
905        );
906
907        let mut file = self.file.take().unwrap();
908
909        // Write index block (never compressed). Buffer into a Vec first so we can
910        // compute the checksum, then write via the standard block helper.
911        let index_entry_count: u16 = (self.key_block_boundaries.len() - 1)
912            .try_into()
913            .expect("Index entries count overflow");
914        let index_block_size: usize =
915            INDEX_BLOCK_HEADER_SIZE + index_entry_count as usize * INDEX_BLOCK_ENTRY_SIZE;
916        let mut index_buf = Vec::with_capacity(index_block_size);
917        {
918            let first_block = self.key_block_boundaries[0].1;
919            let mut index_block = IndexBlockBuilder::new(&mut index_buf, first_block);
920            for &(hash, block) in &self.key_block_boundaries[1..] {
921                index_block.put(hash, block);
922            }
923        }
924        let index_checksum = checksum_block(&index_buf);
925        write_raw_block_to_file(
926            &mut file,
927            &mut self.block_offsets,
928            0,
929            index_checksum,
930            &index_buf,
931        )
932        .context("Failed to write index block")?;
933
934        // Write block offset table
935        for offset in &self.block_offsets {
936            file.write_u32::<BE>(*offset)
937                .context("Failed to write block offset")?;
938        }
939
940        let block_count: u16 = self
941            .block_offsets
942            .len()
943            .try_into()
944            .expect("Block count overflow");
945
946        // Build AMQF from collected hashes using sorted Builder insertion.
947        // Hashes are already sorted by key_hash (SST invariant), but fingerprints
948        // (truncated hashes) may not be sorted, so we sort by `fingerprint & mask`.
949        let actual_count = self.collected_fingerprints.len() as u64;
950        let mut builder = qfilter::Builder::new(
951            qfilter::Filter::new(actual_count.max(1), AMQF_FALSE_POSITIVE_RATE)
952                .expect("Filter can't be constructed"),
953        );
954
955        let fp_size = builder.fingerprint_size();
956        assert!(fp_size < 32, "fp_size {fp_size} exceeds u32");
957        let fp_mask = (1u32 << fp_size) - 1;
958        // Mask in-place to fingerprint size and sort.
959        self.collected_fingerprints
960            .sort_unstable_by_key(|&h| h & fp_mask);
961        for &h in &self.collected_fingerprints {
962            builder
963                .insert_fingerprint(false, h as u64)
964                .expect("AMQF insert failed");
965        }
966        let filter = builder.into_filter();
967
968        // Serialize AMQF using postcard for zero-copy deserialization via FilterRef
969        let amqf = postcard::to_allocvec(&filter).expect("AMQF serialization failed");
970
971        // Compute file size from block offsets rather than calling stream_position()
972        // (which requires a flush + seek).
973        let last_block_end = self.block_offsets.last().copied().unwrap_or_default() as u64;
974        let offset_table_size = block_count as u64 * size_of::<u32>() as u64;
975        let file_size = last_block_end + offset_table_size;
976
977        let meta = StaticSortedFileBuilderMeta {
978            min_hash: self.min_hash,
979            max_hash: self.max_hash,
980            amqf: Cow::Owned(amqf),
981            block_count,
982            size: file_size,
983            flags: self.flags,
984            entries: self.entry_count,
985        };
986
987        Ok((meta, file.into_inner()?))
988    }
989
990    /// Flushes all remaining entries as key blocks. Called from `close()` after all small value
991    /// blocks have been flushed, so all PendingSmall entries are resolved.
992    ///
993    /// This loop mirrors [`advance_boundary_to`], but uses a local accumulator (since the
994    /// `self.current_key_block` state is stale) and flushes the final incomplete block
995    /// (unlike `advance_boundary_to`, which keeps it for more entries during streaming).
996    fn flush_remaining_key_blocks(&mut self) -> Result<()> {
997        if self.pending_keys.is_empty() {
998            return Ok(());
999        }
1000
1001        // After flush_small_value_block() in close(), no PendingSmall entries should remain.
1002        // first_pending_small_index may be non-zero (when all entries are medium/inline/etc
1003        // and advance_boundary_to was never called), but it must equal pending_keys.len(),
1004        // meaning no entries after the boundary exist.
1005        debug_assert_eq!(
1006            self.first_pending_small_index,
1007            self.pending_keys.len(),
1008            "expected no unresolved PendingSmall entries after flush_small_value_block"
1009        );
1010
1011        let total = self.pending_keys.len();
1012        let mut block_start = 0;
1013        let mut acc = KeyBlockAccumulator::new();
1014
1015        for i in 0..total {
1016            let entry = &self.pending_keys[i];
1017            let key_len = entry.entry.key_len();
1018            let key_hash = entry.entry.key_hash();
1019            let value_type = entry.value_ref.entry_type();
1020
1021            if acc.should_flush(key_len, key_hash) {
1022                self.flush_key_block(block_start, i, acc.flush_info())?;
1023                block_start = i;
1024                acc.reset();
1025            }
1026
1027            acc.add(key_len, key_hash, value_type);
1028        }
1029
1030        // Flush the final block
1031        if block_start < total {
1032            self.flush_key_block(block_start, total, acc.flush_info())?;
1033        }
1034
1035        // Free VecDeque memory. Numeric fields are not reset because close() consumes self.
1036        self.pending_keys.clear();
1037        Ok(())
1038    }
1039}
1040
1041#[cfg(debug_assertions)]
1042impl<E: Entry> Drop for StreamingSstWriter<E> {
1043    fn drop(&mut self) {
1044        // Skip assertion during panic unwinding to avoid a double-panic (which would abort).
1045        if !std::thread::panicking() {
1046            assert!(
1047                self.finished || self.entry_count == 0,
1048                "StreamingSstWriter dropped without calling close()"
1049            );
1050        }
1051    }
1052}
1053
1054// ---------------------------------------------------------------------------
1055// KeyBlockBuilder
1056// ---------------------------------------------------------------------------
1057
1058/// Builder for a single key block.
1059///
1060/// Entries are added via `put_*` methods which write key data and value references into the buffer.
1061/// The block format uses a fixed-size header table followed by variable-length entry data.
1062struct KeyBlockBuilder<'l> {
1063    current_entry: usize,
1064    header_size: usize,
1065    buffer: &'l mut Vec<u8>,
1066}
1067
1068/// The size of the key block header (block type + entry count).
1069const KEY_BLOCK_HEADER_SIZE: usize = 4;
1070
1071impl<'l> KeyBlockBuilder<'l> {
1072    /// Creates a new key block builder for the number of entries.
1073    fn new(buffer: &'l mut Vec<u8>, entry_count: u32, has_hash: bool) -> Self {
1074        debug_assert!(entry_count < (1 << 24));
1075
1076        const ESTIMATED_KEY_SIZE: usize = 16;
1077        buffer.reserve(entry_count as usize * ESTIMATED_KEY_SIZE);
1078        let block_type = if has_hash {
1079            BLOCK_TYPE_KEY_WITH_HASH
1080        } else {
1081            BLOCK_TYPE_KEY_NO_HASH
1082        };
1083        buffer.write_u8(block_type).unwrap();
1084        buffer.write_u24::<BE>(entry_count).unwrap();
1085        for _ in 0..entry_count {
1086            buffer.write_u32::<BE>(0).unwrap();
1087        }
1088        Self {
1089            current_entry: 0,
1090            header_size: buffer.len(),
1091            buffer,
1092        }
1093    }
1094
1095    /// Writes the entry header (position + type) for the current entry.
1096    fn write_entry_header(&mut self, entry_type: EntryType) {
1097        let pos = self.buffer.len() - self.header_size;
1098        let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4;
1099        let header = (pos as u32) | ((entry_type.0 as u32) << 24);
1100        BE::write_u32(&mut self.buffer[header_offset..header_offset + 4], header);
1101    }
1102
1103    /// Writes a single entry (header + hash + key + value data) to the block.
1104    fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1105        self.write_entry_header(value_ref.entry_type());
1106        if has_hash {
1107            self.buffer
1108                .extend_from_slice(&entry.key_hash().to_be_bytes());
1109        }
1110        entry.write_key_to(self.buffer);
1111        value_ref.write_value_to(self.buffer);
1112        self.current_entry += 1;
1113    }
1114
1115    /// Returns the key block buffer.
1116    fn finish(self) -> &'l mut Vec<u8> {
1117        self.buffer
1118    }
1119}
1120
1121// ---------------------------------------------------------------------------
1122// FixedKeyBlockBuilder
1123// ---------------------------------------------------------------------------
1124
1125/// The size of the fixed-size key block header (block type + entry count + key size + value type).
1126const FIXED_KEY_BLOCK_HEADER_SIZE: usize = 6;
1127
1128/// Builder for a fixed-size key block where all entries share the same key size and value type.
1129///
1130/// No offset table is written — entry positions are computed arithmetically from the stride.
1131struct FixedKeyBlockBuilder<'l> {
1132    buffer: &'l mut Vec<u8>,
1133}
1134
1135impl<'l> FixedKeyBlockBuilder<'l> {
1136    fn new(
1137        buffer: &'l mut Vec<u8>,
1138        entry_count: u32,
1139        has_hash: bool,
1140        key_size: u8,
1141        value_type: EntryType,
1142    ) -> Self {
1143        let hash_len: usize = if has_hash { 8 } else { 0 };
1144        let val_size = value_type_val_size(value_type);
1145        let stride = hash_len + key_size as usize + val_size;
1146        buffer.reserve(FIXED_KEY_BLOCK_HEADER_SIZE + entry_count as usize * stride);
1147
1148        let block_type = if has_hash {
1149            BLOCK_TYPE_FIXED_KEY_WITH_HASH
1150        } else {
1151            BLOCK_TYPE_FIXED_KEY_NO_HASH
1152        };
1153        buffer.extend_from_slice(&[
1154            block_type,
1155            (entry_count >> 16) as u8,
1156            (entry_count >> 8) as u8,
1157            entry_count as u8,
1158            key_size,
1159            value_type.0,
1160        ]);
1161
1162        Self { buffer }
1163    }
1164
1165    /// Writes a single entry (hash + key + value data) to the block.
1166    fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1167        if has_hash {
1168            self.buffer
1169                .extend_from_slice(&entry.key_hash().to_be_bytes());
1170        }
1171        entry.write_key_to(self.buffer);
1172        value_ref.write_value_to(self.buffer);
1173    }
1174
1175    fn finish(self) -> &'l mut Vec<u8> {
1176        self.buffer
1177    }
1178}
1179
1180/// Returns the value size for a given entry type (builder-side, infallible).
1181///
1182/// This mirrors `entry_val_size` in the reader but panics on invalid types since the builder
1183/// only produces valid types.
1184fn value_type_val_size(ty: EntryType) -> usize {
1185    match ty.0 {
1186        KEY_BLOCK_ENTRY_TYPE_SMALL => SMALL_VALUE_REF_SIZE,
1187        KEY_BLOCK_ENTRY_TYPE_MEDIUM => MEDIUM_VALUE_REF_SIZE,
1188        KEY_BLOCK_ENTRY_TYPE_BLOB => BLOB_VALUE_REF_SIZE,
1189        KEY_BLOCK_ENTRY_TYPE_DELETED => DELETED_VALUE_REF_SIZE,
1190        ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
1191            (ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize
1192        }
1193        _ => panic!("Invalid key block entry type: {:?}", ty),
1194    }
1195}
1196
1197// ---------------------------------------------------------------------------
1198// IndexBlockBuilder
1199// ---------------------------------------------------------------------------
1200
1201/// Builder for a single index block.
1202struct IndexBlockBuilder<W: Write> {
1203    writer: W,
1204}
1205
1206/// Size of a single index block entry (u64 hash + u16 block index).
1207pub(crate) const INDEX_BLOCK_ENTRY_SIZE: usize = size_of::<u64>() + size_of::<u16>();
1208
1209/// Size of the index block header (u8 type + u16 first_block).
1210pub(crate) const INDEX_BLOCK_HEADER_SIZE: usize = size_of::<u8>() + size_of::<u16>();
1211
1212impl<W: Write> IndexBlockBuilder<W> {
1213    /// Creates a new builder for an index block with the specified number of entries and a pointer
1214    /// to the first block.
1215    fn new(mut writer: W, first_block: u16) -> Self {
1216        writer.write_u8(BLOCK_TYPE_INDEX).unwrap();
1217        writer.write_u16::<BE>(first_block).unwrap();
1218        Self { writer }
1219    }
1220
1221    /// Adds a hash boundary to the index block.
1222    fn put(&mut self, hash: u64, block: u16) {
1223        self.writer.write_u64::<BE>(hash).unwrap();
1224        self.writer.write_u16::<BE>(block).unwrap();
1225    }
1226}
1227
1228#[cfg(test)]
1229mod tests {
1230    use super::*;
1231    use crate::{
1232        key::hash_key,
1233        lookup_entry::LookupValue,
1234        static_sorted_file::{
1235            BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData,
1236        },
1237    };
1238
1239    fn make_cache() -> BlockCache {
1240        BlockCache::with(
1241            100,
1242            4 * 1024 * 1024,
1243            Default::default(),
1244            Default::default(),
1245            Default::default(),
1246        )
1247    }
1248
1249    /// A simple entry type for testing with configurable value type.
1250    struct TestEntry {
1251        key: Vec<u8>,
1252        hash: u64,
1253        value_kind: TestValueKind,
1254    }
1255
1256    enum TestValueKind {
1257        Inline(Vec<u8>),
1258        Small(Vec<u8>),
1259        Medium(Vec<u8>),
1260        /// Already-formatted block with `uncompressed_size = 0` (stored as-is).
1261        MediumRaw(Vec<u8>),
1262        Blob(u32),
1263        Deleted,
1264    }
1265
1266    impl TestEntry {
1267        fn new(key: &[u8], value_kind: TestValueKind) -> Self {
1268            let key = key.to_vec();
1269            let hash = hash_key(&key);
1270            Self {
1271                key,
1272                hash,
1273                value_kind,
1274            }
1275        }
1276
1277        fn small(key: &[u8], value: &[u8]) -> Self {
1278            Self::new(key, TestValueKind::Small(value.to_vec()))
1279        }
1280
1281        fn inline(key: &[u8], value: &[u8]) -> Self {
1282            debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
1283            Self::new(key, TestValueKind::Inline(value.to_vec()))
1284        }
1285
1286        fn medium(key: &[u8], value: &[u8]) -> Self {
1287            Self::new(key, TestValueKind::Medium(value.to_vec()))
1288        }
1289
1290        fn blob(key: &[u8], blob_id: u32) -> Self {
1291            Self::new(key, TestValueKind::Blob(blob_id))
1292        }
1293
1294        fn deleted(key: &[u8]) -> Self {
1295            Self::new(key, TestValueKind::Deleted)
1296        }
1297
1298        fn medium_raw(key: &[u8], value: &[u8]) -> Self {
1299            Self::new(key, TestValueKind::MediumRaw(value.to_vec()))
1300        }
1301
1302        fn expected_value(&self) -> Option<&[u8]> {
1303            match &self.value_kind {
1304                TestValueKind::Inline(v)
1305                | TestValueKind::Small(v)
1306                | TestValueKind::Medium(v)
1307                | TestValueKind::MediumRaw(v) => Some(v),
1308                _ => None,
1309            }
1310        }
1311    }
1312
1313    impl Entry for TestEntry {
1314        fn key_hash(&self) -> u64 {
1315            self.hash
1316        }
1317
1318        fn key_len(&self) -> usize {
1319            self.key.len()
1320        }
1321
1322        fn write_key_to(&self, buf: &mut Vec<u8>) {
1323            buf.extend_from_slice(&self.key);
1324        }
1325
1326        fn value(&self) -> EntryValue<'_> {
1327            match &self.value_kind {
1328                TestValueKind::Inline(v) => EntryValue::Inline { value: v },
1329                TestValueKind::Small(v) => EntryValue::Small { value: v },
1330                TestValueKind::Medium(v) => EntryValue::Medium { value: v },
1331                TestValueKind::MediumRaw(v) => EntryValue::MediumRaw {
1332                    // uncompressed_size = 0 means the block is stored as-is (no compression).
1333                    uncompressed_size: 0,
1334                    checksum: checksum_block(v),
1335                    block: v,
1336                },
1337                TestValueKind::Blob(id) => EntryValue::Large { blob: *id },
1338                TestValueKind::Deleted => EntryValue::Deleted,
1339            }
1340        }
1341    }
1342
1343    /// Sort entries by hash (required by SST writer).
1344    fn sort_entries(entries: &mut [TestEntry]) {
1345        entries.sort_by_key(|e| e.hash);
1346    }
1347
1348    /// Open an SST file for lookup given a path and metadata.
1349    fn open_sst(
1350        dir: &Path,
1351        seq: u32,
1352        meta: &StaticSortedFileBuilderMeta<'_>,
1353    ) -> Result<StaticSortedFile> {
1354        StaticSortedFile::open(
1355            dir,
1356            StaticSortedFileMetaData {
1357                sequence_number: seq,
1358                block_count: meta.block_count,
1359            },
1360        )
1361    }
1362
1363    /// Helper: write entries via StreamingSstWriter, return meta.
1364    fn write_sst(
1365        dir: &Path,
1366        seq: u32,
1367        entries: &[TestEntry],
1368        flags: MetaEntryFlags,
1369    ) -> Result<StaticSortedFileBuilderMeta<'static>> {
1370        let sst_path = dir.join(format!("{seq:08}.sst"));
1371        let mut writer = StreamingSstWriter::new(&sst_path, flags, entries.len() as u64)?;
1372        for entry in entries {
1373            writer.add(entry)?;
1374        }
1375        let (meta, _file) = writer.close()?;
1376        Ok(meta)
1377    }
1378
1379    /// Lookup a key in an SST file and assert it matches the expected value kind.
1380    fn assert_lookup(
1381        sst: &StaticSortedFile,
1382        entry: &TestEntry,
1383        kc: &BlockCache,
1384        vc: &BlockCache,
1385    ) -> Result<()> {
1386        let result = sst.lookup::<_, false>(entry.hash, &entry.key, kc, vc)?;
1387        match (&entry.value_kind, result) {
1388            (_, SstLookupResult::Found(values))
1389                if values.len() == 1 && matches!(values[0], LookupValue::Slice { .. }) =>
1390            {
1391                let LookupValue::Slice { value } = &values[0] else {
1392                    unreachable!()
1393                };
1394                let expected = entry
1395                    .expected_value()
1396                    .expect("Got Slice but entry has no value");
1397                assert_eq!(
1398                    value.as_ref(),
1399                    expected,
1400                    "value mismatch for key {:?}",
1401                    std::str::from_utf8(&entry.key)
1402                );
1403            }
1404            (TestValueKind::Blob(expected_id), SstLookupResult::Found(values))
1405                if values.len() == 1 && matches!(values[0], LookupValue::Blob { .. }) =>
1406            {
1407                let LookupValue::Blob { sequence_number } = &values[0] else {
1408                    unreachable!()
1409                };
1410                assert_eq!(*sequence_number, *expected_id);
1411            }
1412            (TestValueKind::Deleted, SstLookupResult::Found(values))
1413                if values.len() == 1 && matches!(values[0], LookupValue::Deleted) => {}
1414            _ => {
1415                panic!(
1416                    "Unexpected lookup result for key {:?}",
1417                    std::str::from_utf8(&entry.key)
1418                );
1419            }
1420        }
1421        Ok(())
1422    }
1423
1424    #[test]
1425    fn single_inline_entry() -> Result<()> {
1426        let dir = tempfile::tempdir()?;
1427        let mut entries = vec![TestEntry::inline(b"key1", b"val1")];
1428        sort_entries(&mut entries);
1429
1430        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1431        assert_eq!(meta.entries, 1);
1432
1433        let sst = open_sst(dir.path(), 1, &meta)?;
1434        let kc = make_cache();
1435        let vc = make_cache();
1436        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1437        Ok(())
1438    }
1439
1440    #[test]
1441    fn single_small_entry() -> Result<()> {
1442        let dir = tempfile::tempdir()?;
1443        let value = vec![0xAB; 100]; // > MAX_INLINE_VALUE_SIZE, <= MAX_SMALL_VALUE_SIZE
1444        let mut entries = vec![TestEntry::small(b"skey", &value)];
1445        sort_entries(&mut entries);
1446
1447        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1448        assert_eq!(meta.entries, 1);
1449
1450        let sst = open_sst(dir.path(), 1, &meta)?;
1451        let kc = make_cache();
1452        let vc = make_cache();
1453        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1454        Ok(())
1455    }
1456
1457    #[test]
1458    fn single_medium_entry() -> Result<()> {
1459        let dir = tempfile::tempdir()?;
1460        let value = vec![0xCD; 8192]; // > MAX_SMALL_VALUE_SIZE
1461        let mut entries = vec![TestEntry::medium(b"mkey", &value)];
1462        sort_entries(&mut entries);
1463
1464        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1465        assert_eq!(meta.entries, 1);
1466
1467        let sst = open_sst(dir.path(), 1, &meta)?;
1468        let kc = make_cache();
1469        let vc = make_cache();
1470        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1471        Ok(())
1472    }
1473
1474    #[test]
1475    fn single_blob_entry() -> Result<()> {
1476        let dir = tempfile::tempdir()?;
1477        let mut entries = vec![TestEntry::blob(b"bkey", 42)];
1478        sort_entries(&mut entries);
1479
1480        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1481        assert_eq!(meta.entries, 1);
1482
1483        let sst = open_sst(dir.path(), 1, &meta)?;
1484        let kc = make_cache();
1485        let vc = make_cache();
1486        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1487        Ok(())
1488    }
1489
1490    #[test]
1491    fn single_deleted_entry() -> Result<()> {
1492        let dir = tempfile::tempdir()?;
1493        let mut entries = vec![TestEntry::deleted(b"dkey")];
1494        sort_entries(&mut entries);
1495
1496        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1497        assert_eq!(meta.entries, 1);
1498
1499        let sst = open_sst(dir.path(), 1, &meta)?;
1500        let kc = make_cache();
1501        let vc = make_cache();
1502        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1503        Ok(())
1504    }
1505
1506    #[test]
1507    fn many_small_values() -> Result<()> {
1508        let dir = tempfile::tempdir()?;
1509        // Create enough small entries to trigger multiple small value block flushes.
1510        // MIN_SMALL_VALUE_BLOCK_SIZE = 8KB, each value is 200 bytes -> ~40 entries per block.
1511        let count = 200;
1512        let mut entries: Vec<TestEntry> = (0..count)
1513            .map(|i| {
1514                let key = format!("key-{i:04}");
1515                let value = vec![(i & 0xFF) as u8; 200];
1516                TestEntry::small(key.as_bytes(), &value)
1517            })
1518            .collect();
1519        sort_entries(&mut entries);
1520
1521        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1522        assert_eq!(meta.entries, count as u64);
1523
1524        let sst = open_sst(dir.path(), 1, &meta)?;
1525        let kc = make_cache();
1526        let vc = make_cache();
1527
1528        for entry in &entries {
1529            assert_lookup(&sst, entry, &kc, &vc)?;
1530        }
1531        Ok(())
1532    }
1533
1534    #[test]
1535    fn many_medium_values() -> Result<()> {
1536        let dir = tempfile::tempdir()?;
1537        let count = 50;
1538        let mut entries: Vec<TestEntry> = (0..count)
1539            .map(|i| {
1540                let key = format!("mkey-{i:04}");
1541                let value = vec![(i & 0xFF) as u8; 8192];
1542                TestEntry::medium(key.as_bytes(), &value)
1543            })
1544            .collect();
1545        sort_entries(&mut entries);
1546
1547        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1548        assert_eq!(meta.entries, count as u64);
1549
1550        let sst = open_sst(dir.path(), 1, &meta)?;
1551        let kc = make_cache();
1552        let vc = make_cache();
1553
1554        for entry in &entries {
1555            assert_lookup(&sst, entry, &kc, &vc)?;
1556        }
1557        Ok(())
1558    }
1559
1560    #[test]
1561    fn mixed_value_types() -> Result<()> {
1562        let dir = tempfile::tempdir()?;
1563        let mut entries = vec![
1564            TestEntry::inline(b"a-inline", b"tiny"),
1565            TestEntry::small(b"b-small", &[0x11; 200]),
1566            TestEntry::medium(b"c-medium", &[0x22; 8192]),
1567            TestEntry::blob(b"d-blob", 99),
1568            TestEntry::deleted(b"e-deleted"),
1569            TestEntry::small(b"f-small2", &[0x33; 300]),
1570            TestEntry::inline(b"g-inline2", b"mini"),
1571            TestEntry::medium(b"h-medium2", &[0x44; 16384]),
1572        ];
1573        sort_entries(&mut entries);
1574
1575        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1576        assert_eq!(meta.entries, 8);
1577
1578        let sst = open_sst(dir.path(), 1, &meta)?;
1579        let kc = make_cache();
1580        let vc = make_cache();
1581
1582        for entry in &entries {
1583            assert_lookup(&sst, entry, &kc, &vc)?;
1584        }
1585        Ok(())
1586    }
1587
1588    #[test]
1589    fn is_full_entry_count_limit() {
1590        let dir = tempfile::tempdir().unwrap();
1591        let sst_path = dir.path().join("test.sst");
1592        let mut writer =
1593            StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1594
1595        let max_entries = 50;
1596        for i in 0..max_entries {
1597            let key = format!("k{i:06}");
1598            let entry = TestEntry::inline(key.as_bytes(), &[0; 4]);
1599            writer.add(entry).unwrap();
1600        }
1601
1602        assert_eq!(writer.entry_count, max_entries as u64);
1603        assert!(
1604            writer.is_full(max_entries, usize::MAX),
1605            "Should be full when entry count reaches max_entries"
1606        );
1607        assert!(
1608            !writer.is_full(max_entries + 1, usize::MAX),
1609            "Should not be full when limit is higher"
1610        );
1611        writer.close().unwrap();
1612    }
1613
1614    #[test]
1615    fn is_full_data_size_limit() {
1616        let dir = tempfile::tempdir().unwrap();
1617        let sst_path = dir.path().join("test.sst");
1618        let mut writer =
1619            StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1620
1621        let value = vec![0u8; 1000];
1622        for i in 0..10 {
1623            let key = format!("k{i:06}");
1624            let entry = TestEntry::small(key.as_bytes(), &value);
1625            writer.add(entry).unwrap();
1626        }
1627
1628        let total = writer.total_key_size + writer.total_value_size;
1629        assert!(total > 10_000, "total data should exceed 10KB");
1630        assert!(writer.is_full(usize::MAX, total - 1));
1631        assert!(!writer.is_full(usize::MAX, total + 1));
1632        writer.close().unwrap();
1633    }
1634
1635    #[test]
1636    fn write_static_stored_file_matches_streaming() -> Result<()> {
1637        let dir = tempfile::tempdir()?;
1638
1639        let mut entries: Vec<TestEntry> = (0..100)
1640            .map(|i| {
1641                let key = format!("rkey-{i:04}");
1642                if i % 3 == 0 {
1643                    TestEntry::inline(key.as_bytes(), &[(i & 0xFF) as u8; 4])
1644                } else if i % 3 == 1 {
1645                    TestEntry::small(key.as_bytes(), &[(i & 0xFF) as u8; 200])
1646                } else {
1647                    TestEntry::medium(key.as_bytes(), &[(i & 0xFF) as u8; 8192])
1648                }
1649            })
1650            .collect();
1651        sort_entries(&mut entries);
1652
1653        // Write via convenience function
1654        let batch_path = dir.path().join("00000001.sst");
1655        let (meta1, _) =
1656            write_static_stored_file(&entries, &batch_path, MetaEntryFlags::default())?;
1657
1658        // Write via streaming API
1659        let streaming_path = dir.path().join("00000002.sst");
1660        let mut writer = StreamingSstWriter::new(
1661            &streaming_path,
1662            MetaEntryFlags::default(),
1663            entries.len() as u64,
1664        )?;
1665        for entry in &entries {
1666            writer.add(entry)?;
1667        }
1668        let (meta2, _) = writer.close()?;
1669
1670        // Metadata should match
1671        assert_eq!(meta1.entries, meta2.entries);
1672        assert_eq!(meta1.min_hash, meta2.min_hash);
1673        assert_eq!(meta1.max_hash, meta2.max_hash);
1674        assert_eq!(meta1.block_count, meta2.block_count);
1675
1676        // Both files should produce the same lookup results
1677        let sst1 = StaticSortedFile::open(
1678            dir.path(),
1679            StaticSortedFileMetaData {
1680                sequence_number: 1,
1681                block_count: meta1.block_count,
1682            },
1683        )?;
1684        let sst2 = StaticSortedFile::open(
1685            dir.path(),
1686            StaticSortedFileMetaData {
1687                sequence_number: 2,
1688                block_count: meta2.block_count,
1689            },
1690        )?;
1691        let kc = make_cache();
1692        let vc = make_cache();
1693
1694        for entry in &entries {
1695            let r1 = sst1.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1696            let r2 = sst2.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1697            match (&r1, &r2) {
1698                (SstLookupResult::Found(v1), SstLookupResult::Found(v2))
1699                    if v1.len() == 1 && v2.len() == 1 =>
1700                {
1701                    match (&v1[0], &v2[0]) {
1702                        (
1703                            LookupValue::Slice { value: val1 },
1704                            LookupValue::Slice { value: val2 },
1705                        ) => {
1706                            assert_eq!(
1707                                val1.as_ref(),
1708                                val2.as_ref(),
1709                                "Value mismatch for key {:?}",
1710                                std::str::from_utf8(&entry.key)
1711                            );
1712                        }
1713                        (LookupValue::Deleted, LookupValue::Deleted) => {}
1714                        (
1715                            LookupValue::Blob {
1716                                sequence_number: s1,
1717                            },
1718                            LookupValue::Blob {
1719                                sequence_number: s2,
1720                            },
1721                        ) => {
1722                            assert_eq!(s1, s2);
1723                        }
1724                        _ => panic!(
1725                            "Mismatched results for key {:?}",
1726                            std::str::from_utf8(&entry.key)
1727                        ),
1728                    }
1729                }
1730                _ => panic!(
1731                    "Mismatched results for key {:?}",
1732                    std::str::from_utf8(&entry.key)
1733                ),
1734            }
1735        }
1736        Ok(())
1737    }
1738
1739    #[test]
1740    #[should_panic(expected = "StreamingSstWriter::close() called with no entries")]
1741    fn close_empty_writer_panics() {
1742        let dir = tempfile::tempdir().unwrap();
1743        let sst_path = dir.path().join("empty.sst");
1744        let writer =
1745            StreamingSstWriter::<TestEntry>::new(&sst_path, MetaEntryFlags::default(), 0).unwrap();
1746        writer.close().unwrap();
1747    }
1748
1749    #[test]
1750    fn key_block_boundary_at_max_entries() -> Result<()> {
1751        let dir = tempfile::tempdir()?;
1752        let count = MAX_KEY_BLOCK_ENTRIES + 1;
1753        let mut entries: Vec<TestEntry> = (0..count)
1754            .map(|i| {
1755                let key = format!("boundary-{i:06}");
1756                TestEntry::inline(key.as_bytes(), &[0u8; 4])
1757            })
1758            .collect();
1759        sort_entries(&mut entries);
1760
1761        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1762        assert_eq!(meta.entries, count as u64);
1763        // count > MAX_KEY_BLOCK_ENTRIES so we need at least 2 key blocks plus 1 index block
1764        assert!(
1765            meta.block_count >= 3,
1766            "expected at least 2 key blocks + 1 index block"
1767        );
1768
1769        let sst = open_sst(dir.path(), 1, &meta)?;
1770        let kc = make_cache();
1771        let vc = make_cache();
1772        for entry in &entries {
1773            assert_lookup(&sst, entry, &kc, &vc)?;
1774        }
1775        Ok(())
1776    }
1777
1778    #[test]
1779    fn single_medium_raw_entry() -> Result<()> {
1780        let dir = tempfile::tempdir()?;
1781        let value = vec![0xBE; 8192];
1782        let mut entries = vec![TestEntry::medium_raw(b"rkey", &value)];
1783        sort_entries(&mut entries);
1784
1785        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1786        assert_eq!(meta.entries, 1);
1787
1788        let sst = open_sst(dir.path(), 1, &meta)?;
1789        let kc = make_cache();
1790        let vc = make_cache();
1791        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1792        Ok(())
1793    }
1794
1795    /// Flip a single byte in an SST file at the given position.
1796    fn corrupt_sst_byte(dir: &Path, seq: u32, pos: u64) {
1797        use std::io::{Seek, SeekFrom, Write as _};
1798
1799        let sst_path = dir.join(format!("{seq:08}.sst"));
1800        let file_bytes = std::fs::read(&sst_path).unwrap();
1801        let original = file_bytes[pos as usize];
1802        let mut file = std::fs::OpenOptions::new()
1803            .write(true)
1804            .open(&sst_path)
1805            .unwrap();
1806        file.seek(SeekFrom::Start(pos)).unwrap();
1807        file.write_all(&[original ^ 0xFF]).unwrap();
1808        file.sync_all().unwrap();
1809    }
1810
1811    /// Assert that looking up the first entry in a corrupted SST returns a corruption error.
1812    fn assert_corruption_detected(
1813        dir: &Path,
1814        seq: u32,
1815        meta: &StaticSortedFileBuilderMeta<'_>,
1816        entries: &[TestEntry],
1817    ) {
1818        let sst = open_sst(dir, seq, meta).unwrap();
1819        let kc = make_cache();
1820        let vc = make_cache();
1821        match sst.lookup::<_, false>(entries[0].hash, &entries[0].key, &kc, &vc) {
1822            Err(err) => {
1823                let msg = format!("{err}");
1824                assert!(
1825                    msg.contains("corruption"),
1826                    "Expected corruption error, got: {msg}"
1827                );
1828            }
1829            Ok(_) => panic!("Expected checksum error, but lookup succeeded"),
1830        }
1831    }
1832
1833    #[test]
1834    fn checksum_detects_corrupted_compressed_block() {
1835        let dir = tempfile::tempdir().unwrap();
1836        // Medium value is large enough to get its own value block, which will be compressed
1837        let value = vec![0xCD; 8192];
1838        let entries = vec![TestEntry::medium(b"mkey", &value)];
1839
1840        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1841
1842        // Corrupt the stored checksum of the first block (bytes 4..8).
1843        // This guarantees a mismatch regardless of whether LZ4 decompression succeeds.
1844        corrupt_sst_byte(dir.path(), 1, 4);
1845        assert_corruption_detected(dir.path(), 1, &meta, &entries);
1846    }
1847
1848    #[test]
1849    fn checksum_detects_corrupted_uncompressed_block() {
1850        let dir = tempfile::tempdir().unwrap();
1851        // Single inline entry - the key block will be small and likely stored uncompressed
1852        let entries = vec![TestEntry::inline(b"key1", b"val1")];
1853
1854        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1855
1856        // Corrupt a byte in the first block's data (after the 8-byte header)
1857        corrupt_sst_byte(dir.path(), 1, BLOCK_HEADER_SIZE as u64 + 1);
1858        assert_corruption_detected(dir.path(), 1, &meta, &entries);
1859    }
1860}