Skip to main content

turbo_persistence/
compression.rs

1use std::{mem::MaybeUninit, rc::Rc, sync::Arc};
2
3use anyhow::{Context, Result};
4use lzzzz::lz4::{self, decompress};
5
6/// Decompresses `block` into `dest`, verifying the output length matches `expected_len`.
7fn decompress_block(block: &[u8], dest: &mut [u8], expected_len: u32) -> Result<()> {
8    debug_assert!(
9        expected_len > 0,
10        "decompress_block called with uncompressed_length=0; uncompressed blocks should use \
11         zero-copy mmap path"
12    );
13    let bytes_written = decompress(block, dest).with_context(|| {
14        format!(
15            "Failed to decompress block ({} bytes compressed, {} bytes uncompressed)",
16            block.len(),
17            expected_len
18        )
19    })?;
20    assert_eq!(
21        bytes_written, expected_len as usize,
22        "Decompressed length does not match expected length"
23    );
24    Ok(())
25}
26
27/// Decompresses a block into an Arc allocation.
28///
29/// The caller must ensure `uncompressed_length > 0` (i.e., the block is actually compressed).
30/// Uncompressed blocks should be handled via zero-copy mmap slices before calling this.
31pub fn decompress_into_arc(uncompressed_length: u32, block: &[u8]) -> Result<Arc<[u8]>> {
32    // Allocate directly into an Arc to avoid a copy. The buffer is uninitialized;
33    // decompression will overwrite it completely (verified by decompress_block).
34    let buffer: Arc<[MaybeUninit<u8>]> = Arc::new_uninit_slice(uncompressed_length as usize);
35    // Safety: decompression will fully initialize the buffer (verified by the assert in
36    // decompress_block).
37    let mut buffer = unsafe { buffer.assume_init() };
38    // We just created this Arc so refcount is 1; get_mut always succeeds.
39    let dest = Arc::get_mut(&mut buffer).expect("Arc refcount should be 1");
40    decompress_block(block, dest, uncompressed_length)?;
41    Ok(buffer)
42}
43
44/// Like [`decompress_into_arc`] but returns an `Rc<[u8]>` for thread-local use.
45pub fn decompress_into_rc(uncompressed_length: u32, block: &[u8]) -> Result<Rc<[u8]>> {
46    let buffer: Rc<[MaybeUninit<u8>]> = Rc::new_uninit_slice(uncompressed_length as usize);
47    // Safety: decompression will fully initialize the buffer (verified by the assert in
48    // decompress_block).
49    let mut buffer = unsafe { buffer.assume_init() };
50    let dest = Rc::get_mut(&mut buffer).expect("Rc refcount should be 1");
51    decompress_block(block, dest, uncompressed_length)?;
52    Ok(buffer)
53}
54
55/// Computes a CRC32 checksum of a byte slice.
56pub fn checksum_block(data: &[u8]) -> u32 {
57    crc32fast::hash(data)
58}
59
60#[tracing::instrument(level = "trace", skip_all)]
61pub fn compress_into_buffer(block: &[u8], buffer: &mut Vec<u8>) -> Result<()> {
62    lz4::compress_to_vec(block, buffer, lz4::ACC_LEVEL_DEFAULT).context("Compression failed")?;
63    Ok(())
64}