Skip to main content

turbo_persistence/
static_sorted_file_builder.rs

1use std::{
2    borrow::Cow,
3    collections::VecDeque,
4    fs::File,
5    io::{BufWriter, Write},
6    path::Path,
7};
8
9use anyhow::{Context, Result};
10use byteorder::{BE, ByteOrder, WriteBytesExt};
11use turbo_bincode::turbo_bincode_encode;
12
13use crate::{
14    compression::{checksum_block, compress_into_buffer},
15    constants::{MAX_INLINE_VALUE_SIZE, MAX_SMALL_VALUE_SIZE, MIN_SMALL_VALUE_BLOCK_SIZE},
16    meta_file::{AmqfBincodeWrapper, MetaEntryFlags},
17    static_sorted_file::{
18        BLOB_VALUE_REF_SIZE, BLOCK_TYPE_FIXED_KEY_NO_HASH, BLOCK_TYPE_FIXED_KEY_WITH_HASH,
19        BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY_NO_HASH, BLOCK_TYPE_KEY_WITH_HASH, DELETED_VALUE_REF_SIZE,
20        KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, KEY_BLOCK_ENTRY_TYPE_INLINE_MIN,
21        KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL, MEDIUM_VALUE_REF_SIZE,
22        SMALL_VALUE_REF_SIZE,
23    },
24};
25
26/// Size of the per-block header on disk: 4 bytes uncompressed_size + 4 bytes CRC32 checksum.
27pub const BLOCK_HEADER_SIZE: usize = 8;
28
29/// The maximum number of entries that should go into a single key block
30const MAX_KEY_BLOCK_ENTRIES: usize = MAX_KEY_BLOCK_SIZE / KEY_BLOCK_ENTRY_META_OVERHEAD;
31/// The maximum bytes that should go into a single key block
32// Note this must fit into 3 bytes length
33const MAX_KEY_BLOCK_SIZE: usize = 16 * 1024;
34/// Overhead of bytes that should be counted for entries in a key block in addition to the key size.
35/// This covers the worst case (small values):
36/// - 1 byte type (key block header)
37/// - 3 bytes position (key block header)
38/// - 8 bytes hash (optional, but unknown at collection time)
39/// - 2 bytes block index
40/// - 2 bytes size
41/// - 4 bytes position in block
42const KEY_BLOCK_ENTRY_META_OVERHEAD: usize = 20;
43/// The aimed false positive rate for the AMQF
44const AMQF_FALSE_POSITIVE_RATE: f64 = 0.01;
45/// Assumed average small value size for pre-allocation estimates.
46/// Intentionally conservative (small values range from MAX_INLINE_VALUE_SIZE+1 to
47/// MAX_SMALL_VALUE_SIZE = 4096): a low estimate over-counts value blocks, which is
48/// preferable to under-allocating vectors.
49const AVG_SMALL_VALUE_SIZE: usize = 64;
50
51/// Safety margin for block index capacity estimation in
52/// [`StreamingSstWriter::has_block_index_capacity`]. Accounts for rounding in the entry-count and
53/// byte-size based estimates of pending key blocks.
54const BLOCK_INDEX_CAPACITY_BUFFER: usize = 16;
55
56/// Minimum key size (in bytes) for attempting LZ4 compression on key blocks.
57///
58/// Keys are sorted by hash, so we should not expect correlation in the data between nearby keys in
59/// a block. For small keys (below this threshold), compression is unlikely to be able to exploit
60/// patterns and only wastes CPU time. We skip the compression attempt entirely in this case.
61const MIN_KEY_SIZE_FOR_COMPRESSION: usize = 16;
62
63/// Maximum key length that can use fixed-size key block layout.
64///
65/// The on-disk fixed-key header stores the key size as a single byte, so keys longer than this
66/// fall back to variable-size layout.
67const MAX_FIXED_KEY_LEN: usize = u8::MAX as usize;
68
69/// Newtype for the key block entry type byte.
70///
71/// This encodes what kind of value reference an entry has (small, medium, blob, deleted, or
72/// inline with embedded length). See `KEY_BLOCK_ENTRY_TYPE_*` constants.
73#[derive(Clone, Copy, PartialEq, Eq, Debug)]
74struct EntryType(u8);
75
76/// Tracks whether a key block's entries are uniform enough for fixed-size layout.
77///
78/// State transitions:
79/// - `Unknown` → first entry → `Fixed { key_len, value_type }`
80/// - `Fixed` + matching entry → stays `Fixed`
81/// - `Fixed` + mismatched key_len or value_type → `Variable`
82/// - `Variable` → stays `Variable`
83#[derive(Clone, Copy)]
84enum KeyBlockFormat {
85    /// No entries yet — format undetermined.
86    Unknown,
87    /// All entries so far have uniform key length and value type.
88    Fixed { key_len: u8, value_type: EntryType },
89    /// Entries have mixed key lengths or value types; must use offset table.
90    Variable,
91}
92
93impl KeyBlockFormat {
94    /// Updates the format after seeing an entry with the given key length and value type.
95    ///
96    /// A `Fixed` state is only reachable when all entries have matching key length and value type,
97    /// and the key length fits in a u8 (required by the on-disk header).
98    fn update(&mut self, key_len: usize, value_type: EntryType) {
99        *self = match *self {
100            KeyBlockFormat::Unknown => {
101                if key_len <= MAX_FIXED_KEY_LEN {
102                    KeyBlockFormat::Fixed {
103                        key_len: key_len as u8,
104                        value_type,
105                    }
106                } else {
107                    KeyBlockFormat::Variable
108                }
109            }
110            KeyBlockFormat::Fixed {
111                key_len: k,
112                value_type: v,
113            } if k as usize == key_len && v == value_type => KeyBlockFormat::Fixed {
114                key_len: k,
115                value_type: v,
116            },
117            KeyBlockFormat::Fixed { .. } | KeyBlockFormat::Variable => KeyBlockFormat::Variable,
118        };
119    }
120}
121
122/// Copy-able snapshot of the accumulator state needed by [`flush_key_block`].
123#[derive(Clone, Copy)]
124struct KeyBlockFlushInfo {
125    max_key_len: usize,
126    format: KeyBlockFormat,
127}
128
129/// Tracks the accumulated state of the current incomplete key block.
130///
131/// During streaming, this sits on [`StreamingSstWriter`] and tracks the tail of the resolved
132/// prefix. Entries are added one at a time; when [`should_flush`](Self::should_flush) returns
133/// `true`, the caller should flush the block and call [`reset`](Self::reset).
134struct KeyBlockAccumulator {
135    /// Accumulated byte size (keys + per-entry overhead) of entries in this block.
136    size: usize,
137    /// Number of entries accumulated so far.
138    entry_count: usize,
139    /// Maximum key length among accumulated entries (determines whether hashes are stored).
140    max_key_len: usize,
141    /// Hash of the most recently added entry (used to avoid splitting entries with equal hashes
142    /// across blocks).
143    last_hash: u64,
144    /// Whether the block qualifies for fixed-size layout.
145    format: KeyBlockFormat,
146}
147
148impl KeyBlockAccumulator {
149    fn new() -> Self {
150        Self {
151            size: 0,
152            entry_count: 0,
153            max_key_len: 0,
154            last_hash: 0,
155            format: KeyBlockFormat::Unknown,
156        }
157    }
158
159    /// Records a new entry in the accumulator.
160    fn add(&mut self, key_len: usize, key_hash: u64, value_type: EntryType) {
161        self.size += key_len + KEY_BLOCK_ENTRY_META_OVERHEAD;
162        self.max_key_len = self.max_key_len.max(key_len);
163        self.entry_count += 1;
164        self.last_hash = key_hash;
165        self.format.update(key_len, value_type);
166    }
167
168    /// Snapshots the state needed by `flush_key_block`.
169    fn flush_info(&self) -> KeyBlockFlushInfo {
170        KeyBlockFlushInfo {
171            max_key_len: self.max_key_len,
172            format: self.format,
173        }
174    }
175
176    /// Returns `true` if the block should be flushed before adding an entry with the given key
177    /// length and hash. Returns `false` for empty blocks and when the next entry shares its hash
178    /// with the current last entry (to avoid splitting equal-hash runs).
179    fn should_flush(&self, next_key_len: usize, next_key_hash: u64) -> bool {
180        if self.entry_count == 0 {
181            return false;
182        }
183        let would_exceed_size =
184            self.size + next_key_len + KEY_BLOCK_ENTRY_META_OVERHEAD > MAX_KEY_BLOCK_SIZE;
185        let would_exceed_entries = self.entry_count >= MAX_KEY_BLOCK_ENTRIES;
186        // Never split entries with the same hash across blocks.
187        (would_exceed_size || would_exceed_entries) && self.last_hash != next_key_hash
188    }
189
190    /// Resets the accumulator for a new key block.
191    fn reset(&mut self) {
192        self.size = 0;
193        self.entry_count = 0;
194        self.max_key_len = 0;
195        self.format = KeyBlockFormat::Unknown;
196        // last_hash is intentionally not reset -- it is overwritten on the next add() call.
197    }
198}
199
200/// Determines whether to store the hash per entry based on max key length.
201fn use_hash(max_key_len: usize) -> bool {
202    max_key_len > 32
203}
204
205/// Trait for entries from that SST files can be created
206pub trait Entry {
207    /// Returns the hash of the key
208    fn key_hash(&self) -> u64;
209    /// Returns the length of the key
210    fn key_len(&self) -> usize;
211    /// Writes the key to a buffer
212    fn write_key_to(&self, buf: &mut Vec<u8>);
213
214    /// Returns the value
215    fn value(&self) -> EntryValue<'_>;
216}
217
218impl<E: Entry> Entry for &E {
219    fn key_hash(&self) -> u64 {
220        (*self).key_hash()
221    }
222    fn key_len(&self) -> usize {
223        (*self).key_len()
224    }
225    fn write_key_to(&self, buf: &mut Vec<u8>) {
226        (*self).write_key_to(buf)
227    }
228    fn value(&self) -> EntryValue<'_> {
229        (*self).value()
230    }
231}
232
233/// Reference to a value
234#[derive(Copy, Clone)]
235pub enum EntryValue<'l> {
236    /// Inline value stored directly in the key block.
237    Inline { value: &'l [u8] },
238    /// Small-sized value. They are stored in shared value blocks.
239    Small { value: &'l [u8] },
240    /// Medium-sized value. They are stored in their own value block.
241    Medium { value: &'l [u8] },
242    /// Medium-sized value. They are stored in their own value block. In the raw form as on disk.
243    MediumRaw {
244        /// The uncompressed size of the block data. `0` means the block is stored uncompressed
245        /// (and thus the size is the `len` of the block)
246        uncompressed_size: u32,
247        /// CRC32 checksum of the on-disk block data (after compression).
248        checksum: u32,
249        block: &'l [u8],
250    },
251    /// Large-sized value. They are stored in a blob file.
252    Large { blob: u32 },
253    /// Tombstone. The value was removed.
254    Deleted,
255}
256
257#[derive(Debug, Clone)]
258pub struct StaticSortedFileBuilderMeta<'a> {
259    /// The minimum hash of the keys in the SST file
260    pub min_hash: u64,
261    /// The maximum hash of the keys in the SST file
262    pub max_hash: u64,
263    /// The AMQF data
264    pub amqf: Cow<'a, [u8]>,
265    /// The number of blocks in the SST file
266    pub block_count: u16,
267    /// The file size of the SST file
268    pub size: u64,
269    /// The status flags for this SST file
270    pub flags: MetaEntryFlags,
271    /// The number of entries in the SST file
272    pub entries: u64,
273}
274
275/// Writes an SST file from a pre-sorted slice of entries.
276///
277/// This is a convenience wrapper around [`StreamingSstWriter`] for callers that already have all
278/// entries in memory.
279// TODO: Consider adding a variant that takes ownership (Vec<E> or drain iterator)
280// to free entry memory as blocks are written.
281pub fn write_static_stored_file<E: Entry>(
282    entries: &[E],
283    file: &Path,
284    flags: MetaEntryFlags,
285) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
286    debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted());
287    let mut writer = StreamingSstWriter::new(file, flags, entries.len() as u64)?;
288    for entry in entries {
289        writer.add(entry)?;
290    }
291    writer.close()
292}
293
294// ---------------------------------------------------------------------------
295// Block I/O helpers (free functions for borrow-checker friendliness)
296// ---------------------------------------------------------------------------
297
298/// Writes a raw (already-formatted) block to the file. Returns the block index assigned.
299///
300/// `uncompressed_size` is the original uncompressed size of the block data, or `0` if the block
301/// is stored uncompressed.
302fn write_raw_block_to_file(
303    file: &mut BufWriter<File>,
304    block_offsets: &mut Vec<u32>,
305    uncompressed_size: u32,
306    checksum: u32,
307    block: &[u8],
308) -> Result<u16> {
309    let block_index: u16 = block_offsets
310        .len()
311        .try_into()
312        .expect("Block index overflow");
313
314    let len: u32 = (block.len() + BLOCK_HEADER_SIZE).try_into().unwrap();
315    let offset = block_offsets
316        .last()
317        .copied()
318        .unwrap_or_default()
319        .checked_add(len)
320        .expect("Block offset overflow");
321    block_offsets.push(offset);
322
323    file.write_u32::<BE>(uncompressed_size)
324        .context("Failed to write uncompressed size")?;
325    file.write_u32::<BE>(checksum)
326        .context("Failed to write checksum")?;
327    file.write_all(block)
328        .context("Failed to write block data")?;
329    Ok(block_index)
330}
331
332/// Writes a block to the file, optionally compressing it. Returns the block index assigned.
333fn write_block_to_file(
334    file: &mut BufWriter<File>,
335    compress_buffer: &mut Vec<u8>,
336    block_offsets: &mut Vec<u32>,
337    block: &[u8],
338    try_compress: bool,
339) -> Result<u16> {
340    let (uncompressed_size, data_to_write): (u32, &[u8]) = if try_compress {
341        compress_into_buffer(block, compress_buffer)?;
342        // Same threshold as LevelDB/RocksDB: require at least 12.5% savings.
343        if compress_buffer.len() < block.len() - (block.len() / 8) {
344            (block.len().try_into().unwrap(), compress_buffer.as_slice())
345        } else {
346            (0, block)
347        }
348    } else {
349        (0, block)
350    };
351
352    // Checksum is computed on the on-disk data (after compression).
353    let checksum = checksum_block(data_to_write);
354
355    let result = write_raw_block_to_file(
356        file,
357        block_offsets,
358        uncompressed_size,
359        checksum,
360        data_to_write,
361    );
362    compress_buffer.clear();
363    result
364}
365
366// ---------------------------------------------------------------------------
367// StreamingSstWriter
368// ---------------------------------------------------------------------------
369
370/// Where a key entry's value lives (or will live once the small block flushes).
371enum ValueRef {
372    /// Value in a known small value block (already flushed).
373    Small {
374        block_index: u16,
375        offset: u32,
376        size: u16,
377    },
378    /// Value is in a small value block that hasn't been written yet. Will be resolved in-place
379    /// to [`ValueRef::Small`] when the small block is flushed.
380    PendingSmall {
381        #[cfg(debug_assertions)]
382        small_block_id: u16,
383        offset: u32,
384        size: u16,
385    },
386    /// Medium value already written to its own block.
387    Medium { block_index: u16 },
388    /// Inline value (stored directly in the key block).
389    Inline {
390        data: [u8; MAX_INLINE_VALUE_SIZE],
391        len: u8,
392    },
393    /// Large blob stored externally.
394    Blob { blob_id: u32 },
395    /// Tombstone.
396    Deleted,
397}
398
399impl ValueRef {
400    /// Returns the key block entry type for this value reference.
401    fn entry_type(&self) -> EntryType {
402        EntryType(match self {
403            ValueRef::Small { .. } | ValueRef::PendingSmall { .. } => KEY_BLOCK_ENTRY_TYPE_SMALL,
404            ValueRef::Medium { .. } => KEY_BLOCK_ENTRY_TYPE_MEDIUM,
405            ValueRef::Inline { len, .. } => KEY_BLOCK_ENTRY_TYPE_INLINE_MIN + *len,
406            ValueRef::Blob { .. } => KEY_BLOCK_ENTRY_TYPE_BLOB,
407            ValueRef::Deleted => KEY_BLOCK_ENTRY_TYPE_DELETED,
408        })
409    }
410
411    /// Writes the value bytes for this reference to a buffer.
412    ///
413    /// This is the shared serialization logic used by both variable-size and fixed-size key block
414    /// builders.
415    fn write_value_to(&self, buffer: &mut Vec<u8>) {
416        match self {
417            ValueRef::Small {
418                block_index,
419                offset,
420                size,
421            } => {
422                let mut scratch = [0u8; 8];
423                BE::write_u16(&mut scratch, *block_index);
424                BE::write_u16(&mut scratch[2..], *size);
425                BE::write_u32(&mut scratch[4..], *offset);
426                buffer.extend(&scratch);
427            }
428            ValueRef::Medium { block_index } => {
429                let mut scratch = [0u8; 2];
430                BE::write_u16(&mut scratch, *block_index);
431                buffer.extend(scratch);
432            }
433            ValueRef::Inline { data, len } => {
434                buffer.extend(&data[..*len as usize]);
435            }
436            ValueRef::Blob { blob_id } => {
437                let mut scratch = [0u8; 4];
438                BE::write_u32(&mut scratch, *blob_id);
439                buffer.extend(scratch);
440            }
441            ValueRef::Deleted => { /* no value bytes */ }
442            ValueRef::PendingSmall { .. } => {
443                unreachable!("PendingSmall should have been resolved");
444            }
445        }
446    }
447}
448
449struct PendingEntry<E> {
450    entry: E,
451    value_ref: ValueRef,
452}
453
454/// A streaming SST file writer that writes blocks to disk incrementally.
455///
456/// Instead of materializing all entries in memory and then writing all value blocks followed by all
457/// key blocks, this writer interleaves block writes as entries arrive. Medium values are written
458/// immediately, small values are accumulated into blocks, and key blocks are flushed as soon as
459/// their value references are all resolved.
460///
461/// The SST reader is block-index-addressed (not file-position-addressed), so interleaving block
462/// types is fully compatible.
463pub struct StreamingSstWriter<E: Entry> {
464    // File I/O. Wrapped in Option so close() can take ownership without a partial-move
465    // compile error (partial moves are forbidden when the type has a Drop impl).
466    file: Option<BufWriter<File>>,
467    compress_buffer: Vec<u8>,
468    block_offsets: Vec<u32>,
469
470    /// Pending key entries waiting to be flushed as key blocks.
471    ///
472    /// Entries are appended at the back and drained from the front once flushed.
473    ///
474    /// ```text
475    ///  Resolved entries              Unresolved entries
476    ///  (value block index known)     (PendingSmall references)
477    /// |------------------------------|--------------------------|
478    /// 0                     first_pending_small_index         len()
479    ///
480    ///  ^-- current_key_block tracks      ^-- these wait for
481    ///      the incomplete tail block         flush_small_value_block()
482    ///      within this region                to resolve them
483    /// ```
484    ///
485    /// [`advance_boundary_to`](Self::advance_boundary_to) scans the resolved prefix, flushes
486    /// complete key blocks from the front, and drains them. When a small value block is flushed,
487    /// all `PendingSmall` entries are resolved in-place and the boundary advances to `len()`.
488    ///
489    /// **Unbounded growth note:** If a small number of small values appear early, followed by
490    /// many medium/inline values, the queue grows because the front entries block on the
491    /// unflushed small value block while the back keeps accepting resolved entries.
492    pending_keys: VecDeque<PendingEntry<E>>,
493
494    /// Index into `pending_keys` of the first entry that has a `PendingSmall` reference for the
495    /// current (unflushed) small value block. All entries before this index are fully resolved
496    /// (their value block indices are known). Equals `pending_keys.len()` when no pending small
497    /// entries exist.
498    first_pending_small_index: usize,
499
500    /// The current small_block_id being accumulated into (debug-only consistency check).
501    #[cfg(debug_assertions)]
502    current_small_block_id: u16,
503
504    // Pending small value block buffer.
505    pending_small_value_block: Vec<u8>,
506
507    // Reusable buffer for building key blocks
508    key_buffer: Vec<u8>,
509
510    // AMQF filter (built incrementally). Wrapped in Option for the same reason as `file`.
511    filter: Option<qfilter::Filter>,
512
513    // Index block data: (first_hash, block_index) for each key block written
514    key_block_boundaries: Vec<(u64, u16)>,
515
516    // Metadata
517    min_hash: u64,
518    max_hash: u64,
519    entry_count: u64,
520    flags: MetaEntryFlags,
521
522    // Fullness tracking (for compaction callers)
523    total_key_size: usize,
524    total_value_size: usize,
525
526    /// Total byte size of keys in `pending_keys` (for block capacity estimation).
527    pending_key_total_size: usize,
528
529    /// State of the current incomplete key block at the tail of the resolved prefix.
530    current_key_block: KeyBlockAccumulator,
531
532    /// Set to `true` by `close()` so the Drop guard can detect writers dropped without closing.
533    #[cfg(debug_assertions)]
534    finished: bool,
535}
536
537impl<E: Entry> StreamingSstWriter<E> {
538    /// Creates a new streaming SST writer.
539    ///
540    /// `max_entry_count` is used to size the AMQF filter. It must be an upper bound on the number
541    /// of entries that will be added; the filter is not resizable. A slightly oversized value only
542    /// improves the false-positive rate.
543    pub fn new(file: &Path, flags: MetaEntryFlags, max_entry_count: u64) -> Result<Self> {
544        let file = BufWriter::new(File::create(file)?);
545        let filter = qfilter::Filter::new(max_entry_count.max(1), AMQF_FALSE_POSITIVE_RATE)
546            .expect("Filter can't be constructed");
547
548        // Estimate number of key blocks based on max entry count.
549        // Each key block holds up to MAX_KEY_BLOCK_ENTRIES entries.
550        let estimated_key_blocks = (max_entry_count as usize)
551            .div_ceil(MAX_KEY_BLOCK_ENTRIES)
552            .max(1);
553        // Estimate value blocks assuming all entries are small values of average size.
554        // Each small value block holds ~MIN_SMALL_VALUE_BLOCK_SIZE / AVG_SMALL_VALUE_SIZE entries.
555        let entries_per_value_block = MIN_SMALL_VALUE_BLOCK_SIZE / AVG_SMALL_VALUE_SIZE;
556        let estimated_value_blocks = (max_entry_count as usize)
557            .div_ceil(entries_per_value_block)
558            .max(1);
559        let estimated_total_blocks = estimated_key_blocks + estimated_value_blocks + 1;
560
561        Ok(Self {
562            file: Some(file),
563            compress_buffer: Vec::with_capacity(MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE),
564            block_offsets: Vec::with_capacity(estimated_total_blocks),
565            pending_keys: VecDeque::with_capacity(entries_per_value_block),
566            first_pending_small_index: 0,
567            #[cfg(debug_assertions)]
568            current_small_block_id: 0,
569            pending_small_value_block: Vec::with_capacity(
570                MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE,
571            ),
572            key_buffer: Vec::with_capacity(MAX_KEY_BLOCK_SIZE),
573            filter: Some(filter),
574            key_block_boundaries: Vec::with_capacity(estimated_key_blocks),
575            min_hash: u64::MAX,
576            max_hash: 0,
577            entry_count: 0,
578            flags,
579            total_key_size: 0,
580            total_value_size: 0,
581            pending_key_total_size: 0,
582            current_key_block: KeyBlockAccumulator::new(),
583            #[cfg(debug_assertions)]
584            finished: false,
585        })
586    }
587
588    /// Returns true if the SST file has reached capacity limits.
589    ///
590    /// This is intended for compaction callers that need to split output across multiple SST files.
591    pub fn is_full(&self, max_entries: usize, max_data_size: usize) -> bool {
592        self.entry_count as usize >= max_entries
593            || self.total_key_size + self.total_value_size >= max_data_size
594            || !self.has_block_index_capacity()
595    }
596
597    /// Returns true if the SST file has room for more blocks without overflowing the `u16` block
598    /// index. Uses the exact count of blocks already written plus a conservative estimate of
599    /// blocks still needed for pending entries and the index.
600    fn has_block_index_capacity(&self) -> bool {
601        let blocks_written = self.block_offsets.len();
602        // Blocks still needed:
603        // - 1 pending small value block (if buffer is non-empty)
604        // - key blocks for pending entries (upper bound from both entry count and byte size)
605        // - 1 index block
606        let pending_small_block = usize::from(!self.pending_small_value_block.is_empty());
607        let pending_key_blocks = self
608            .pending_keys
609            .len()
610            .div_ceil(MAX_KEY_BLOCK_ENTRIES)
611            .max(self.pending_key_total_size.div_ceil(MAX_KEY_BLOCK_SIZE))
612            .max(1);
613        let index_block = 1;
614        let buffer = BLOCK_INDEX_CAPACITY_BUFFER;
615        blocks_written + pending_small_block + pending_key_blocks + index_block + buffer
616            < u16::MAX as usize
617    }
618
619    /// Adds an entry to the SST file. Entries must be added in (key-hash, key) order.
620    pub fn add(&mut self, entry: E) -> Result<()> {
621        let key_hash = entry.key_hash();
622        let key_len = entry.key_len();
623
624        // Update metadata
625        if self.entry_count == 0 {
626            self.min_hash = key_hash;
627        }
628        self.max_hash = key_hash;
629        self.entry_count += 1;
630
631        // Insert into AMQF
632        self.filter
633            .as_mut()
634            .unwrap()
635            .insert_fingerprint(false, key_hash)
636            .expect("AMQF insert failed");
637
638        // Track key size for fullness and block capacity
639        self.total_key_size += key_len;
640        self.pending_key_total_size += key_len;
641
642        // Route value
643        let value_ref = match entry.value() {
644            EntryValue::Medium { value } => {
645                self.total_value_size += value.len();
646                let block_index = write_block_to_file(
647                    self.file.as_mut().unwrap(),
648                    &mut self.compress_buffer,
649                    &mut self.block_offsets,
650                    value,
651                    true,
652                )
653                .context("Failed to write value block")?;
654                ValueRef::Medium { block_index }
655            }
656            EntryValue::MediumRaw {
657                uncompressed_size,
658                checksum,
659                block,
660            } => {
661                // Note: tracks compressed block size (not uncompressed) unlike EntryValue::Medium.
662                // Both are acceptable approximations of disk usage for is_full() thresholds.
663                self.total_value_size += block.len();
664                let block_index = write_raw_block_to_file(
665                    self.file.as_mut().unwrap(),
666                    &mut self.block_offsets,
667                    uncompressed_size,
668                    checksum,
669                    block,
670                )
671                .context("Failed to write compressed value block")?;
672                ValueRef::Medium { block_index }
673            }
674            EntryValue::Small { value } => {
675                self.total_value_size += value.len();
676
677                let offset = self.pending_small_value_block.len() as u32;
678                let size: u16 = value.len().try_into().unwrap();
679                self.pending_small_value_block.extend_from_slice(value);
680
681                // Track where the first PendingSmall entry is in the queue
682                if self.first_pending_small_index >= self.pending_keys.len() {
683                    self.first_pending_small_index = self.pending_keys.len();
684                }
685
686                let value_ref = ValueRef::PendingSmall {
687                    #[cfg(debug_assertions)]
688                    small_block_id: self.current_small_block_id,
689                    offset,
690                    size,
691                };
692
693                self.push_pending_key_entry(entry, value_ref);
694
695                // Eagerly flush the small block AFTER pushing the new entry. This resolves
696                // the just-pushed entry immediately via advance_boundary_to(), so key blocks
697                // can be flushed incrementally.
698                if self.pending_small_value_block.len() >= MIN_SMALL_VALUE_BLOCK_SIZE {
699                    self.flush_small_value_block()?;
700                }
701
702                return Ok(());
703            }
704            EntryValue::Inline { value } => {
705                debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
706                let mut data = [0u8; MAX_INLINE_VALUE_SIZE];
707                data[..value.len()].copy_from_slice(value);
708                ValueRef::Inline {
709                    data,
710                    len: value.len() as u8,
711                }
712            }
713            EntryValue::Large { blob } => ValueRef::Blob { blob_id: blob },
714            EntryValue::Deleted => ValueRef::Deleted,
715        };
716
717        self.push_pending_key_entry(entry, value_ref);
718        self.try_flush_key_blocks()
719    }
720
721    /// Appends a new entry to the pending-keys queue.
722    fn push_pending_key_entry(&mut self, entry: E, value_ref: ValueRef) {
723        self.pending_keys
724            .push_back(PendingEntry { entry, value_ref });
725    }
726
727    /// Advances `first_pending_small_index` past the just-pushed entry if it is resolved and
728    /// sits right at the current boundary. Flushes complete key blocks incrementally.
729    ///
730    /// Must be called immediately after [`push_pending_key_entry`] with a resolved
731    /// (non-`PendingSmall`) entry.
732    fn try_flush_key_blocks(&mut self) -> Result<()> {
733        debug_assert!(!matches!(
734            self.pending_keys.back().unwrap().value_ref,
735            ValueRef::PendingSmall { .. }
736        ));
737        if self.first_pending_small_index != self.pending_keys.len() - 1 {
738            // Boundary is blocked by earlier unresolved PendingSmall entries.
739            return Ok(());
740        }
741        self.advance_boundary_to(self.pending_keys.len())
742    }
743
744    /// Advances the resolved boundary from its current position to `new_boundary`,
745    /// incrementally tracking key block sizes and flushing complete key blocks.
746    ///
747    /// All entries in `pending_keys[self.first_pending_small_index..new_boundary]`
748    /// must have resolved (non-`PendingSmall`) value references.
749    fn advance_boundary_to(&mut self, new_boundary: usize) -> Result<()> {
750        let mut last_flushed_end = 0usize;
751        // Cumulative key sizes of all entries visited so far, and the snapshot at the last
752        // flush point. The difference at the end gives the total key size of drained entries.
753        let mut cumulative_key_size = 0usize;
754        let mut flushed_key_size = 0usize;
755
756        for i in self.first_pending_small_index..new_boundary {
757            let entry = &self.pending_keys[i];
758            let key_len = entry.entry.key_len();
759            let key_hash = entry.entry.key_hash();
760            let value_type = entry.value_ref.entry_type();
761
762            if self.current_key_block.should_flush(key_len, key_hash) {
763                let block_end = last_flushed_end + self.current_key_block.entry_count;
764                let info = self.current_key_block.flush_info();
765                self.flush_key_block(last_flushed_end, block_end, info)?;
766                flushed_key_size = cumulative_key_size;
767                last_flushed_end = block_end;
768                self.current_key_block.reset();
769            }
770
771            cumulative_key_size += key_len;
772            self.current_key_block.add(key_len, key_hash, value_type);
773        }
774
775        if last_flushed_end > 0 {
776            self.pending_key_total_size -= flushed_key_size;
777            self.pending_keys.drain(..last_flushed_end);
778        }
779
780        self.first_pending_small_index = new_boundary - last_flushed_end;
781        Ok(())
782    }
783
784    /// Flushes the current pending small value block to disk and resolves all `PendingSmall`
785    /// entries in-place.
786    fn flush_small_value_block(&mut self) -> Result<()> {
787        // Early return if empty -- this simplifies trailing small value block handling in
788        // `close()` where we call this unconditionally.
789        if self.pending_small_value_block.is_empty() {
790            return Ok(());
791        }
792
793        let block_index = write_block_to_file(
794            self.file.as_mut().unwrap(),
795            &mut self.compress_buffer,
796            &mut self.block_offsets,
797            &self.pending_small_value_block,
798            true,
799        )
800        .context("Failed to write small value block")?;
801
802        // Resolve all PendingSmall entries for this block in-place.
803        // Only scan from first_pending_small_index -- entries before it are guaranteed
804        // already resolved (from previous flush calls).
805        #[cfg(debug_assertions)]
806        let flushed_id = self.current_small_block_id;
807        for i in self.first_pending_small_index..self.pending_keys.len() {
808            let entry = &mut self.pending_keys[i];
809            if let ValueRef::PendingSmall {
810                #[cfg(debug_assertions)]
811                small_block_id,
812                offset,
813                size,
814            } = entry.value_ref
815            {
816                #[cfg(debug_assertions)]
817                debug_assert_eq!(
818                    small_block_id, flushed_id,
819                    "all pending small entries must reference the small value block that was just \
820                     written"
821                );
822                entry.value_ref = ValueRef::Small {
823                    block_index,
824                    offset,
825                    size,
826                };
827            }
828        }
829
830        // All PendingSmall entries are now resolved. Advance the boundary through all of
831        // them, flushing key blocks incrementally as we go.
832        self.advance_boundary_to(self.pending_keys.len())?;
833
834        // Advance to next small block id (debug-only consistency check)
835        #[cfg(debug_assertions)]
836        {
837            self.current_small_block_id += 1;
838        }
839        self.pending_small_value_block.clear();
840
841        Ok(())
842    }
843
844    /// Flushes a single key block from `pending_keys[start..end]`.
845    fn flush_key_block(&mut self, start: usize, end: usize, info: KeyBlockFlushInfo) -> Result<()> {
846        let entry_count = end - start;
847        let has_hash = use_hash(info.max_key_len);
848        let try_compress = info.max_key_len >= MIN_KEY_SIZE_FOR_COMPRESSION;
849
850        self.key_buffer.clear();
851
852        if let KeyBlockFormat::Fixed {
853            key_len: key_size,
854            value_type,
855        } = info.format
856        {
857            let mut builder = FixedKeyBlockBuilder::new(
858                &mut self.key_buffer,
859                entry_count as u32,
860                has_hash,
861                key_size,
862                value_type,
863            );
864            for i in start..end {
865                let pending = &self.pending_keys[i];
866                builder.put(&pending.entry, &pending.value_ref, has_hash);
867            }
868            builder.finish();
869        } else {
870            let mut builder =
871                KeyBlockBuilder::new(&mut self.key_buffer, entry_count as u32, has_hash);
872
873            for i in start..end {
874                let pending = &self.pending_keys[i];
875                builder.put(&pending.entry, &pending.value_ref, has_hash);
876            }
877
878            builder.finish();
879        }
880
881        // Record boundary
882        let first_hash = self.pending_keys[start].entry.key_hash();
883        let block_index = write_block_to_file(
884            self.file.as_mut().unwrap(),
885            &mut self.compress_buffer,
886            &mut self.block_offsets,
887            &self.key_buffer,
888            try_compress,
889        )
890        .context("Failed to write key block")?;
891        self.key_block_boundaries.push((first_hash, block_index));
892
893        Ok(())
894    }
895
896    /// Finishes writing the SST file. Flushes remaining blocks, writes the index, and returns
897    /// metadata.
898    pub fn close(mut self) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
899        #[cfg(debug_assertions)]
900        {
901            self.finished = true;
902        }
903
904        // Flush remaining small value block (even if under MIN_SMALL_VALUE_BLOCK_SIZE).
905        self.flush_small_value_block()?;
906
907        // Now all PendingSmall entries are resolved. Flush all remaining key blocks.
908        self.flush_remaining_key_blocks()?;
909
910        assert!(
911            !self.key_block_boundaries.is_empty(),
912            "StreamingSstWriter::close() called with no entries"
913        );
914
915        let mut file = self.file.take().unwrap();
916
917        // Write index block (never compressed). Buffer into a Vec first so we can
918        // compute the checksum, then write via the standard block helper.
919        let index_entry_count: u16 = (self.key_block_boundaries.len() - 1)
920            .try_into()
921            .expect("Index entries count overflow");
922        let index_block_size: usize =
923            INDEX_BLOCK_HEADER_SIZE + index_entry_count as usize * INDEX_BLOCK_ENTRY_SIZE;
924        let mut index_buf = Vec::with_capacity(index_block_size);
925        {
926            let first_block = self.key_block_boundaries[0].1;
927            let mut index_block = IndexBlockBuilder::new(&mut index_buf, first_block);
928            for &(hash, block) in &self.key_block_boundaries[1..] {
929                index_block.put(hash, block);
930            }
931        }
932        let index_checksum = checksum_block(&index_buf);
933        write_raw_block_to_file(
934            &mut file,
935            &mut self.block_offsets,
936            0,
937            index_checksum,
938            &index_buf,
939        )
940        .context("Failed to write index block")?;
941
942        // Write block offset table
943        for offset in &self.block_offsets {
944            file.write_u32::<BE>(*offset)
945                .context("Failed to write block offset")?;
946        }
947
948        let block_count: u16 = self
949            .block_offsets
950            .len()
951            .try_into()
952            .expect("Block count overflow");
953
954        // Shrink the AMQF filter to the actual entry count. The filter was created with
955        // `max_entry_count` which may be larger than the number of entries actually added.
956        let mut filter = self.filter.take().unwrap();
957        filter.shrink_to_fit();
958
959        // Serialize AMQF
960        let amqf =
961            turbo_bincode_encode(&AmqfBincodeWrapper(filter)).expect("AMQF serialization failed");
962
963        // Compute file size from block offsets rather than calling stream_position()
964        // (which requires a flush + seek).
965        let last_block_end = self.block_offsets.last().copied().unwrap_or_default() as u64;
966        let offset_table_size = block_count as u64 * size_of::<u32>() as u64;
967        let file_size = last_block_end + offset_table_size;
968
969        let meta = StaticSortedFileBuilderMeta {
970            min_hash: self.min_hash,
971            max_hash: self.max_hash,
972            amqf: Cow::Owned(amqf.into_vec()),
973            block_count,
974            size: file_size,
975            flags: self.flags,
976            entries: self.entry_count,
977        };
978
979        Ok((meta, file.into_inner()?))
980    }
981
982    /// Flushes all remaining entries as key blocks. Called from `close()` after all small value
983    /// blocks have been flushed, so all PendingSmall entries are resolved.
984    ///
985    /// This loop mirrors [`advance_boundary_to`], but uses a local accumulator (since the
986    /// `self.current_key_block` state is stale) and flushes the final incomplete block
987    /// (unlike `advance_boundary_to`, which keeps it for more entries during streaming).
988    fn flush_remaining_key_blocks(&mut self) -> Result<()> {
989        if self.pending_keys.is_empty() {
990            return Ok(());
991        }
992
993        // After flush_small_value_block() in close(), no PendingSmall entries should remain.
994        // first_pending_small_index may be non-zero (when all entries are medium/inline/etc
995        // and advance_boundary_to was never called), but it must equal pending_keys.len(),
996        // meaning no entries after the boundary exist.
997        debug_assert_eq!(
998            self.first_pending_small_index,
999            self.pending_keys.len(),
1000            "expected no unresolved PendingSmall entries after flush_small_value_block"
1001        );
1002
1003        let total = self.pending_keys.len();
1004        let mut block_start = 0;
1005        let mut acc = KeyBlockAccumulator::new();
1006
1007        for i in 0..total {
1008            let entry = &self.pending_keys[i];
1009            let key_len = entry.entry.key_len();
1010            let key_hash = entry.entry.key_hash();
1011            let value_type = entry.value_ref.entry_type();
1012
1013            if acc.should_flush(key_len, key_hash) {
1014                self.flush_key_block(block_start, i, acc.flush_info())?;
1015                block_start = i;
1016                acc.reset();
1017            }
1018
1019            acc.add(key_len, key_hash, value_type);
1020        }
1021
1022        // Flush the final block
1023        if block_start < total {
1024            self.flush_key_block(block_start, total, acc.flush_info())?;
1025        }
1026
1027        // Free VecDeque memory. Numeric fields are not reset because close() consumes self.
1028        self.pending_keys.clear();
1029        Ok(())
1030    }
1031}
1032
1033#[cfg(debug_assertions)]
1034impl<E: Entry> Drop for StreamingSstWriter<E> {
1035    fn drop(&mut self) {
1036        // Skip assertion during panic unwinding to avoid a double-panic (which would abort).
1037        if !std::thread::panicking() {
1038            assert!(
1039                self.finished || self.entry_count == 0,
1040                "StreamingSstWriter dropped without calling close()"
1041            );
1042        }
1043    }
1044}
1045
1046// ---------------------------------------------------------------------------
1047// KeyBlockBuilder
1048// ---------------------------------------------------------------------------
1049
1050/// Builder for a single key block.
1051///
1052/// Entries are added via `put_*` methods which write key data and value references into the buffer.
1053/// The block format uses a fixed-size header table followed by variable-length entry data.
1054struct KeyBlockBuilder<'l> {
1055    current_entry: usize,
1056    header_size: usize,
1057    buffer: &'l mut Vec<u8>,
1058}
1059
1060/// The size of the key block header (block type + entry count).
1061const KEY_BLOCK_HEADER_SIZE: usize = 4;
1062
1063impl<'l> KeyBlockBuilder<'l> {
1064    /// Creates a new key block builder for the number of entries.
1065    fn new(buffer: &'l mut Vec<u8>, entry_count: u32, has_hash: bool) -> Self {
1066        debug_assert!(entry_count < (1 << 24));
1067
1068        const ESTIMATED_KEY_SIZE: usize = 16;
1069        buffer.reserve(entry_count as usize * ESTIMATED_KEY_SIZE);
1070        let block_type = if has_hash {
1071            BLOCK_TYPE_KEY_WITH_HASH
1072        } else {
1073            BLOCK_TYPE_KEY_NO_HASH
1074        };
1075        buffer.write_u8(block_type).unwrap();
1076        buffer.write_u24::<BE>(entry_count).unwrap();
1077        for _ in 0..entry_count {
1078            buffer.write_u32::<BE>(0).unwrap();
1079        }
1080        Self {
1081            current_entry: 0,
1082            header_size: buffer.len(),
1083            buffer,
1084        }
1085    }
1086
1087    /// Writes the entry header (position + type) for the current entry.
1088    fn write_entry_header(&mut self, entry_type: EntryType) {
1089        let pos = self.buffer.len() - self.header_size;
1090        let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4;
1091        let header = (pos as u32) | ((entry_type.0 as u32) << 24);
1092        BE::write_u32(&mut self.buffer[header_offset..header_offset + 4], header);
1093    }
1094
1095    /// Writes a single entry (header + hash + key + value data) to the block.
1096    fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1097        self.write_entry_header(value_ref.entry_type());
1098        if has_hash {
1099            self.buffer
1100                .extend_from_slice(&entry.key_hash().to_be_bytes());
1101        }
1102        entry.write_key_to(self.buffer);
1103        value_ref.write_value_to(self.buffer);
1104        self.current_entry += 1;
1105    }
1106
1107    /// Returns the key block buffer.
1108    fn finish(self) -> &'l mut Vec<u8> {
1109        self.buffer
1110    }
1111}
1112
1113// ---------------------------------------------------------------------------
1114// FixedKeyBlockBuilder
1115// ---------------------------------------------------------------------------
1116
1117/// The size of the fixed-size key block header (block type + entry count + key size + value type).
1118const FIXED_KEY_BLOCK_HEADER_SIZE: usize = 6;
1119
1120/// Builder for a fixed-size key block where all entries share the same key size and value type.
1121///
1122/// No offset table is written — entry positions are computed arithmetically from the stride.
1123struct FixedKeyBlockBuilder<'l> {
1124    buffer: &'l mut Vec<u8>,
1125}
1126
1127impl<'l> FixedKeyBlockBuilder<'l> {
1128    fn new(
1129        buffer: &'l mut Vec<u8>,
1130        entry_count: u32,
1131        has_hash: bool,
1132        key_size: u8,
1133        value_type: EntryType,
1134    ) -> Self {
1135        let hash_len: usize = if has_hash { 8 } else { 0 };
1136        let val_size = value_type_val_size(value_type);
1137        let stride = hash_len + key_size as usize + val_size;
1138        buffer.reserve(FIXED_KEY_BLOCK_HEADER_SIZE + entry_count as usize * stride);
1139
1140        let block_type = if has_hash {
1141            BLOCK_TYPE_FIXED_KEY_WITH_HASH
1142        } else {
1143            BLOCK_TYPE_FIXED_KEY_NO_HASH
1144        };
1145        buffer.extend_from_slice(&[
1146            block_type,
1147            (entry_count >> 16) as u8,
1148            (entry_count >> 8) as u8,
1149            entry_count as u8,
1150            key_size,
1151            value_type.0,
1152        ]);
1153
1154        Self { buffer }
1155    }
1156
1157    /// Writes a single entry (hash + key + value data) to the block.
1158    fn put<E: Entry>(&mut self, entry: &E, value_ref: &ValueRef, has_hash: bool) {
1159        if has_hash {
1160            self.buffer
1161                .extend_from_slice(&entry.key_hash().to_be_bytes());
1162        }
1163        entry.write_key_to(self.buffer);
1164        value_ref.write_value_to(self.buffer);
1165    }
1166
1167    fn finish(self) -> &'l mut Vec<u8> {
1168        self.buffer
1169    }
1170}
1171
1172/// Returns the value size for a given entry type (builder-side, infallible).
1173///
1174/// This mirrors `entry_val_size` in the reader but panics on invalid types since the builder
1175/// only produces valid types.
1176fn value_type_val_size(ty: EntryType) -> usize {
1177    match ty.0 {
1178        KEY_BLOCK_ENTRY_TYPE_SMALL => SMALL_VALUE_REF_SIZE,
1179        KEY_BLOCK_ENTRY_TYPE_MEDIUM => MEDIUM_VALUE_REF_SIZE,
1180        KEY_BLOCK_ENTRY_TYPE_BLOB => BLOB_VALUE_REF_SIZE,
1181        KEY_BLOCK_ENTRY_TYPE_DELETED => DELETED_VALUE_REF_SIZE,
1182        ty if ty >= KEY_BLOCK_ENTRY_TYPE_INLINE_MIN => {
1183            (ty - KEY_BLOCK_ENTRY_TYPE_INLINE_MIN) as usize
1184        }
1185        _ => panic!("Invalid key block entry type: {:?}", ty),
1186    }
1187}
1188
1189// ---------------------------------------------------------------------------
1190// IndexBlockBuilder
1191// ---------------------------------------------------------------------------
1192
1193/// Builder for a single index block.
1194struct IndexBlockBuilder<W: Write> {
1195    writer: W,
1196}
1197
1198/// Size of a single index block entry (u64 hash + u16 block index).
1199const INDEX_BLOCK_ENTRY_SIZE: usize = size_of::<u64>() + size_of::<u16>();
1200
1201/// Size of the index block header (u8 type + u16 first_block).
1202const INDEX_BLOCK_HEADER_SIZE: usize = size_of::<u8>() + size_of::<u16>();
1203
1204impl<W: Write> IndexBlockBuilder<W> {
1205    /// Creates a new builder for an index block with the specified number of entries and a pointer
1206    /// to the first block.
1207    fn new(mut writer: W, first_block: u16) -> Self {
1208        writer.write_u8(BLOCK_TYPE_INDEX).unwrap();
1209        writer.write_u16::<BE>(first_block).unwrap();
1210        Self { writer }
1211    }
1212
1213    /// Adds a hash boundary to the index block.
1214    fn put(&mut self, hash: u64, block: u16) {
1215        self.writer.write_u64::<BE>(hash).unwrap();
1216        self.writer.write_u16::<BE>(block).unwrap();
1217    }
1218}
1219
1220#[cfg(test)]
1221mod tests {
1222    use std::hash::BuildHasherDefault;
1223
1224    use quick_cache::sync::Cache;
1225    use rustc_hash::FxHasher;
1226
1227    use super::*;
1228    use crate::{
1229        key::hash_key,
1230        lookup_entry::LookupValue,
1231        static_sorted_file::{
1232            BlockWeighter, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData,
1233        },
1234    };
1235
1236    type TestBlockCache =
1237        Cache<(u32, u16), crate::ArcBytes, BlockWeighter, BuildHasherDefault<FxHasher>>;
1238
1239    fn make_cache() -> TestBlockCache {
1240        TestBlockCache::with(
1241            100,
1242            4 * 1024 * 1024,
1243            Default::default(),
1244            Default::default(),
1245            Default::default(),
1246        )
1247    }
1248
1249    /// A simple entry type for testing with configurable value type.
1250    struct TestEntry {
1251        key: Vec<u8>,
1252        hash: u64,
1253        value_kind: TestValueKind,
1254    }
1255
1256    enum TestValueKind {
1257        Inline(Vec<u8>),
1258        Small(Vec<u8>),
1259        Medium(Vec<u8>),
1260        /// Already-formatted block with `uncompressed_size = 0` (stored as-is).
1261        MediumRaw(Vec<u8>),
1262        Blob(u32),
1263        Deleted,
1264    }
1265
1266    impl TestEntry {
1267        fn new(key: &[u8], value_kind: TestValueKind) -> Self {
1268            let key = key.to_vec();
1269            let hash = hash_key(&key);
1270            Self {
1271                key,
1272                hash,
1273                value_kind,
1274            }
1275        }
1276
1277        fn small(key: &[u8], value: &[u8]) -> Self {
1278            Self::new(key, TestValueKind::Small(value.to_vec()))
1279        }
1280
1281        fn inline(key: &[u8], value: &[u8]) -> Self {
1282            debug_assert!(value.len() <= MAX_INLINE_VALUE_SIZE);
1283            Self::new(key, TestValueKind::Inline(value.to_vec()))
1284        }
1285
1286        fn medium(key: &[u8], value: &[u8]) -> Self {
1287            Self::new(key, TestValueKind::Medium(value.to_vec()))
1288        }
1289
1290        fn blob(key: &[u8], blob_id: u32) -> Self {
1291            Self::new(key, TestValueKind::Blob(blob_id))
1292        }
1293
1294        fn deleted(key: &[u8]) -> Self {
1295            Self::new(key, TestValueKind::Deleted)
1296        }
1297
1298        fn medium_raw(key: &[u8], value: &[u8]) -> Self {
1299            Self::new(key, TestValueKind::MediumRaw(value.to_vec()))
1300        }
1301
1302        fn expected_value(&self) -> Option<&[u8]> {
1303            match &self.value_kind {
1304                TestValueKind::Inline(v)
1305                | TestValueKind::Small(v)
1306                | TestValueKind::Medium(v)
1307                | TestValueKind::MediumRaw(v) => Some(v),
1308                _ => None,
1309            }
1310        }
1311    }
1312
1313    impl Entry for TestEntry {
1314        fn key_hash(&self) -> u64 {
1315            self.hash
1316        }
1317
1318        fn key_len(&self) -> usize {
1319            self.key.len()
1320        }
1321
1322        fn write_key_to(&self, buf: &mut Vec<u8>) {
1323            buf.extend_from_slice(&self.key);
1324        }
1325
1326        fn value(&self) -> EntryValue<'_> {
1327            match &self.value_kind {
1328                TestValueKind::Inline(v) => EntryValue::Inline { value: v },
1329                TestValueKind::Small(v) => EntryValue::Small { value: v },
1330                TestValueKind::Medium(v) => EntryValue::Medium { value: v },
1331                TestValueKind::MediumRaw(v) => EntryValue::MediumRaw {
1332                    // uncompressed_size = 0 means the block is stored as-is (no compression).
1333                    uncompressed_size: 0,
1334                    checksum: checksum_block(v),
1335                    block: v,
1336                },
1337                TestValueKind::Blob(id) => EntryValue::Large { blob: *id },
1338                TestValueKind::Deleted => EntryValue::Deleted,
1339            }
1340        }
1341    }
1342
1343    /// Sort entries by hash (required by SST writer).
1344    fn sort_entries(entries: &mut [TestEntry]) {
1345        entries.sort_by_key(|e| e.hash);
1346    }
1347
1348    /// Open an SST file for lookup given a path and metadata.
1349    fn open_sst(
1350        dir: &Path,
1351        seq: u32,
1352        meta: &StaticSortedFileBuilderMeta<'_>,
1353    ) -> Result<StaticSortedFile> {
1354        StaticSortedFile::open(
1355            dir,
1356            StaticSortedFileMetaData {
1357                sequence_number: seq,
1358                block_count: meta.block_count,
1359            },
1360        )
1361    }
1362
1363    /// Helper: write entries via StreamingSstWriter, return meta.
1364    fn write_sst(
1365        dir: &Path,
1366        seq: u32,
1367        entries: &[TestEntry],
1368        flags: MetaEntryFlags,
1369    ) -> Result<StaticSortedFileBuilderMeta<'static>> {
1370        let sst_path = dir.join(format!("{seq:08}.sst"));
1371        let mut writer = StreamingSstWriter::new(&sst_path, flags, entries.len() as u64)?;
1372        for entry in entries {
1373            writer.add(entry)?;
1374        }
1375        let (meta, _file) = writer.close()?;
1376        Ok(meta)
1377    }
1378
1379    /// Lookup a key in an SST file and assert it matches the expected value kind.
1380    fn assert_lookup(
1381        sst: &StaticSortedFile,
1382        entry: &TestEntry,
1383        kc: &TestBlockCache,
1384        vc: &TestBlockCache,
1385    ) -> Result<()> {
1386        let result = sst.lookup::<_, false>(entry.hash, &entry.key, kc, vc)?;
1387        match (&entry.value_kind, result) {
1388            (_, SstLookupResult::Found(values))
1389                if values.len() == 1 && matches!(values[0], LookupValue::Slice { .. }) =>
1390            {
1391                let LookupValue::Slice { value } = &values[0] else {
1392                    unreachable!()
1393                };
1394                let expected = entry
1395                    .expected_value()
1396                    .expect("Got Slice but entry has no value");
1397                assert_eq!(
1398                    value.as_ref(),
1399                    expected,
1400                    "value mismatch for key {:?}",
1401                    std::str::from_utf8(&entry.key)
1402                );
1403            }
1404            (TestValueKind::Blob(expected_id), SstLookupResult::Found(values))
1405                if values.len() == 1 && matches!(values[0], LookupValue::Blob { .. }) =>
1406            {
1407                let LookupValue::Blob { sequence_number } = &values[0] else {
1408                    unreachable!()
1409                };
1410                assert_eq!(*sequence_number, *expected_id);
1411            }
1412            (TestValueKind::Deleted, SstLookupResult::Found(values))
1413                if values.len() == 1 && matches!(values[0], LookupValue::Deleted) => {}
1414            _ => {
1415                panic!(
1416                    "Unexpected lookup result for key {:?}",
1417                    std::str::from_utf8(&entry.key)
1418                );
1419            }
1420        }
1421        Ok(())
1422    }
1423
1424    #[test]
1425    fn single_inline_entry() -> Result<()> {
1426        let dir = tempfile::tempdir()?;
1427        let mut entries = vec![TestEntry::inline(b"key1", b"val1")];
1428        sort_entries(&mut entries);
1429
1430        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1431        assert_eq!(meta.entries, 1);
1432
1433        let sst = open_sst(dir.path(), 1, &meta)?;
1434        let kc = make_cache();
1435        let vc = make_cache();
1436        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1437        Ok(())
1438    }
1439
1440    #[test]
1441    fn single_small_entry() -> Result<()> {
1442        let dir = tempfile::tempdir()?;
1443        let value = vec![0xAB; 100]; // > MAX_INLINE_VALUE_SIZE, <= MAX_SMALL_VALUE_SIZE
1444        let mut entries = vec![TestEntry::small(b"skey", &value)];
1445        sort_entries(&mut entries);
1446
1447        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1448        assert_eq!(meta.entries, 1);
1449
1450        let sst = open_sst(dir.path(), 1, &meta)?;
1451        let kc = make_cache();
1452        let vc = make_cache();
1453        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1454        Ok(())
1455    }
1456
1457    #[test]
1458    fn single_medium_entry() -> Result<()> {
1459        let dir = tempfile::tempdir()?;
1460        let value = vec![0xCD; 8192]; // > MAX_SMALL_VALUE_SIZE
1461        let mut entries = vec![TestEntry::medium(b"mkey", &value)];
1462        sort_entries(&mut entries);
1463
1464        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1465        assert_eq!(meta.entries, 1);
1466
1467        let sst = open_sst(dir.path(), 1, &meta)?;
1468        let kc = make_cache();
1469        let vc = make_cache();
1470        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1471        Ok(())
1472    }
1473
1474    #[test]
1475    fn single_blob_entry() -> Result<()> {
1476        let dir = tempfile::tempdir()?;
1477        let mut entries = vec![TestEntry::blob(b"bkey", 42)];
1478        sort_entries(&mut entries);
1479
1480        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1481        assert_eq!(meta.entries, 1);
1482
1483        let sst = open_sst(dir.path(), 1, &meta)?;
1484        let kc = make_cache();
1485        let vc = make_cache();
1486        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1487        Ok(())
1488    }
1489
1490    #[test]
1491    fn single_deleted_entry() -> Result<()> {
1492        let dir = tempfile::tempdir()?;
1493        let mut entries = vec![TestEntry::deleted(b"dkey")];
1494        sort_entries(&mut entries);
1495
1496        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1497        assert_eq!(meta.entries, 1);
1498
1499        let sst = open_sst(dir.path(), 1, &meta)?;
1500        let kc = make_cache();
1501        let vc = make_cache();
1502        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1503        Ok(())
1504    }
1505
1506    #[test]
1507    fn many_small_values() -> Result<()> {
1508        let dir = tempfile::tempdir()?;
1509        // Create enough small entries to trigger multiple small value block flushes.
1510        // MIN_SMALL_VALUE_BLOCK_SIZE = 8KB, each value is 200 bytes -> ~40 entries per block.
1511        let count = 200;
1512        let mut entries: Vec<TestEntry> = (0..count)
1513            .map(|i| {
1514                let key = format!("key-{i:04}");
1515                let value = vec![(i & 0xFF) as u8; 200];
1516                TestEntry::small(key.as_bytes(), &value)
1517            })
1518            .collect();
1519        sort_entries(&mut entries);
1520
1521        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1522        assert_eq!(meta.entries, count as u64);
1523
1524        let sst = open_sst(dir.path(), 1, &meta)?;
1525        let kc = make_cache();
1526        let vc = make_cache();
1527
1528        for entry in &entries {
1529            assert_lookup(&sst, entry, &kc, &vc)?;
1530        }
1531        Ok(())
1532    }
1533
1534    #[test]
1535    fn many_medium_values() -> Result<()> {
1536        let dir = tempfile::tempdir()?;
1537        let count = 50;
1538        let mut entries: Vec<TestEntry> = (0..count)
1539            .map(|i| {
1540                let key = format!("mkey-{i:04}");
1541                let value = vec![(i & 0xFF) as u8; 8192];
1542                TestEntry::medium(key.as_bytes(), &value)
1543            })
1544            .collect();
1545        sort_entries(&mut entries);
1546
1547        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1548        assert_eq!(meta.entries, count as u64);
1549
1550        let sst = open_sst(dir.path(), 1, &meta)?;
1551        let kc = make_cache();
1552        let vc = make_cache();
1553
1554        for entry in &entries {
1555            assert_lookup(&sst, entry, &kc, &vc)?;
1556        }
1557        Ok(())
1558    }
1559
1560    #[test]
1561    fn mixed_value_types() -> Result<()> {
1562        let dir = tempfile::tempdir()?;
1563        let mut entries = vec![
1564            TestEntry::inline(b"a-inline", b"tiny"),
1565            TestEntry::small(b"b-small", &[0x11; 200]),
1566            TestEntry::medium(b"c-medium", &[0x22; 8192]),
1567            TestEntry::blob(b"d-blob", 99),
1568            TestEntry::deleted(b"e-deleted"),
1569            TestEntry::small(b"f-small2", &[0x33; 300]),
1570            TestEntry::inline(b"g-inline2", b"mini"),
1571            TestEntry::medium(b"h-medium2", &[0x44; 16384]),
1572        ];
1573        sort_entries(&mut entries);
1574
1575        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1576        assert_eq!(meta.entries, 8);
1577
1578        let sst = open_sst(dir.path(), 1, &meta)?;
1579        let kc = make_cache();
1580        let vc = make_cache();
1581
1582        for entry in &entries {
1583            assert_lookup(&sst, entry, &kc, &vc)?;
1584        }
1585        Ok(())
1586    }
1587
1588    #[test]
1589    fn is_full_entry_count_limit() {
1590        let dir = tempfile::tempdir().unwrap();
1591        let sst_path = dir.path().join("test.sst");
1592        let mut writer =
1593            StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1594
1595        let max_entries = 50;
1596        for i in 0..max_entries {
1597            let key = format!("k{i:06}");
1598            let entry = TestEntry::inline(key.as_bytes(), &[0; 4]);
1599            writer.add(entry).unwrap();
1600        }
1601
1602        assert_eq!(writer.entry_count, max_entries as u64);
1603        assert!(
1604            writer.is_full(max_entries, usize::MAX),
1605            "Should be full when entry count reaches max_entries"
1606        );
1607        assert!(
1608            !writer.is_full(max_entries + 1, usize::MAX),
1609            "Should not be full when limit is higher"
1610        );
1611        writer.close().unwrap();
1612    }
1613
1614    #[test]
1615    fn is_full_data_size_limit() {
1616        let dir = tempfile::tempdir().unwrap();
1617        let sst_path = dir.path().join("test.sst");
1618        let mut writer =
1619            StreamingSstWriter::new(&sst_path, MetaEntryFlags::default(), 100).unwrap();
1620
1621        let value = vec![0u8; 1000];
1622        for i in 0..10 {
1623            let key = format!("k{i:06}");
1624            let entry = TestEntry::small(key.as_bytes(), &value);
1625            writer.add(entry).unwrap();
1626        }
1627
1628        let total = writer.total_key_size + writer.total_value_size;
1629        assert!(total > 10_000, "total data should exceed 10KB");
1630        assert!(writer.is_full(usize::MAX, total - 1));
1631        assert!(!writer.is_full(usize::MAX, total + 1));
1632        writer.close().unwrap();
1633    }
1634
1635    #[test]
1636    fn write_static_stored_file_matches_streaming() -> Result<()> {
1637        let dir = tempfile::tempdir()?;
1638
1639        let mut entries: Vec<TestEntry> = (0..100)
1640            .map(|i| {
1641                let key = format!("rkey-{i:04}");
1642                if i % 3 == 0 {
1643                    TestEntry::inline(key.as_bytes(), &[(i & 0xFF) as u8; 4])
1644                } else if i % 3 == 1 {
1645                    TestEntry::small(key.as_bytes(), &[(i & 0xFF) as u8; 200])
1646                } else {
1647                    TestEntry::medium(key.as_bytes(), &[(i & 0xFF) as u8; 8192])
1648                }
1649            })
1650            .collect();
1651        sort_entries(&mut entries);
1652
1653        // Write via convenience function
1654        let batch_path = dir.path().join("00000001.sst");
1655        let (meta1, _) =
1656            write_static_stored_file(&entries, &batch_path, MetaEntryFlags::default())?;
1657
1658        // Write via streaming API
1659        let streaming_path = dir.path().join("00000002.sst");
1660        let mut writer = StreamingSstWriter::new(
1661            &streaming_path,
1662            MetaEntryFlags::default(),
1663            entries.len() as u64,
1664        )?;
1665        for entry in &entries {
1666            writer.add(entry)?;
1667        }
1668        let (meta2, _) = writer.close()?;
1669
1670        // Metadata should match
1671        assert_eq!(meta1.entries, meta2.entries);
1672        assert_eq!(meta1.min_hash, meta2.min_hash);
1673        assert_eq!(meta1.max_hash, meta2.max_hash);
1674        assert_eq!(meta1.block_count, meta2.block_count);
1675
1676        // Both files should produce the same lookup results
1677        let sst1 = StaticSortedFile::open(
1678            dir.path(),
1679            StaticSortedFileMetaData {
1680                sequence_number: 1,
1681                block_count: meta1.block_count,
1682            },
1683        )?;
1684        let sst2 = StaticSortedFile::open(
1685            dir.path(),
1686            StaticSortedFileMetaData {
1687                sequence_number: 2,
1688                block_count: meta2.block_count,
1689            },
1690        )?;
1691        let kc = make_cache();
1692        let vc = make_cache();
1693
1694        for entry in &entries {
1695            let r1 = sst1.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1696            let r2 = sst2.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?;
1697            match (&r1, &r2) {
1698                (SstLookupResult::Found(v1), SstLookupResult::Found(v2))
1699                    if v1.len() == 1 && v2.len() == 1 =>
1700                {
1701                    match (&v1[0], &v2[0]) {
1702                        (
1703                            LookupValue::Slice { value: val1 },
1704                            LookupValue::Slice { value: val2 },
1705                        ) => {
1706                            assert_eq!(
1707                                val1.as_ref(),
1708                                val2.as_ref(),
1709                                "Value mismatch for key {:?}",
1710                                std::str::from_utf8(&entry.key)
1711                            );
1712                        }
1713                        (LookupValue::Deleted, LookupValue::Deleted) => {}
1714                        (
1715                            LookupValue::Blob {
1716                                sequence_number: s1,
1717                            },
1718                            LookupValue::Blob {
1719                                sequence_number: s2,
1720                            },
1721                        ) => {
1722                            assert_eq!(s1, s2);
1723                        }
1724                        _ => panic!(
1725                            "Mismatched results for key {:?}",
1726                            std::str::from_utf8(&entry.key)
1727                        ),
1728                    }
1729                }
1730                _ => panic!(
1731                    "Mismatched results for key {:?}",
1732                    std::str::from_utf8(&entry.key)
1733                ),
1734            }
1735        }
1736        Ok(())
1737    }
1738
1739    #[test]
1740    #[should_panic(expected = "StreamingSstWriter::close() called with no entries")]
1741    fn close_empty_writer_panics() {
1742        let dir = tempfile::tempdir().unwrap();
1743        let sst_path = dir.path().join("empty.sst");
1744        let writer =
1745            StreamingSstWriter::<TestEntry>::new(&sst_path, MetaEntryFlags::default(), 0).unwrap();
1746        writer.close().unwrap();
1747    }
1748
1749    #[test]
1750    fn key_block_boundary_at_max_entries() -> Result<()> {
1751        let dir = tempfile::tempdir()?;
1752        let count = MAX_KEY_BLOCK_ENTRIES + 1;
1753        let mut entries: Vec<TestEntry> = (0..count)
1754            .map(|i| {
1755                let key = format!("boundary-{i:06}");
1756                TestEntry::inline(key.as_bytes(), &[0u8; 4])
1757            })
1758            .collect();
1759        sort_entries(&mut entries);
1760
1761        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1762        assert_eq!(meta.entries, count as u64);
1763        // count > MAX_KEY_BLOCK_ENTRIES so we need at least 2 key blocks plus 1 index block
1764        assert!(
1765            meta.block_count >= 3,
1766            "expected at least 2 key blocks + 1 index block"
1767        );
1768
1769        let sst = open_sst(dir.path(), 1, &meta)?;
1770        let kc = make_cache();
1771        let vc = make_cache();
1772        for entry in &entries {
1773            assert_lookup(&sst, entry, &kc, &vc)?;
1774        }
1775        Ok(())
1776    }
1777
1778    #[test]
1779    fn single_medium_raw_entry() -> Result<()> {
1780        let dir = tempfile::tempdir()?;
1781        let value = vec![0xBE; 8192];
1782        let mut entries = vec![TestEntry::medium_raw(b"rkey", &value)];
1783        sort_entries(&mut entries);
1784
1785        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default())?;
1786        assert_eq!(meta.entries, 1);
1787
1788        let sst = open_sst(dir.path(), 1, &meta)?;
1789        let kc = make_cache();
1790        let vc = make_cache();
1791        assert_lookup(&sst, &entries[0], &kc, &vc)?;
1792        Ok(())
1793    }
1794
1795    /// Flip a single byte in an SST file at the given position.
1796    fn corrupt_sst_byte(dir: &Path, seq: u32, pos: u64) {
1797        use std::io::{Seek, SeekFrom, Write as _};
1798
1799        let sst_path = dir.join(format!("{seq:08}.sst"));
1800        let file_bytes = std::fs::read(&sst_path).unwrap();
1801        let original = file_bytes[pos as usize];
1802        let mut file = std::fs::OpenOptions::new()
1803            .write(true)
1804            .open(&sst_path)
1805            .unwrap();
1806        file.seek(SeekFrom::Start(pos)).unwrap();
1807        file.write_all(&[original ^ 0xFF]).unwrap();
1808        file.sync_all().unwrap();
1809    }
1810
1811    /// Assert that looking up the first entry in a corrupted SST returns a corruption error.
1812    fn assert_corruption_detected(
1813        dir: &Path,
1814        seq: u32,
1815        meta: &StaticSortedFileBuilderMeta<'_>,
1816        entries: &[TestEntry],
1817    ) {
1818        let sst = open_sst(dir, seq, meta).unwrap();
1819        let kc = make_cache();
1820        let vc = make_cache();
1821        match sst.lookup::<_, false>(entries[0].hash, &entries[0].key, &kc, &vc) {
1822            Err(err) => {
1823                let msg = format!("{err}");
1824                assert!(
1825                    msg.contains("corruption"),
1826                    "Expected corruption error, got: {msg}"
1827                );
1828            }
1829            Ok(_) => panic!("Expected checksum error, but lookup succeeded"),
1830        }
1831    }
1832
1833    #[test]
1834    fn checksum_detects_corrupted_compressed_block() {
1835        let dir = tempfile::tempdir().unwrap();
1836        // Medium value is large enough to get its own value block, which will be compressed
1837        let value = vec![0xCD; 8192];
1838        let entries = vec![TestEntry::medium(b"mkey", &value)];
1839
1840        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1841
1842        // Corrupt the stored checksum of the first block (bytes 4..8).
1843        // This guarantees a mismatch regardless of whether LZ4 decompression succeeds.
1844        corrupt_sst_byte(dir.path(), 1, 4);
1845        assert_corruption_detected(dir.path(), 1, &meta, &entries);
1846    }
1847
1848    #[test]
1849    fn checksum_detects_corrupted_uncompressed_block() {
1850        let dir = tempfile::tempdir().unwrap();
1851        // Single inline entry - the key block will be small and likely stored uncompressed
1852        let entries = vec![TestEntry::inline(b"key1", b"val1")];
1853
1854        let meta = write_sst(dir.path(), 1, &entries, MetaEntryFlags::default()).unwrap();
1855
1856        // Corrupt a byte in the first block's data (after the 8-byte header)
1857        corrupt_sst_byte(dir.path(), 1, BLOCK_HEADER_SIZE as u64 + 1);
1858        assert_corruption_detected(dir.path(), 1, &meta, &entries);
1859    }
1860}