Add decompression

This commit is contained in:
Arpad Müller
2024-06-12 16:04:39 +02:00
parent 88b24e1593
commit 40e79712eb

View File

@@ -14,6 +14,7 @@
use async_compression::Level;
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use postgres_ffi::BLCKSZ;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
@@ -69,12 +70,29 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
len_buf[0] &= 0x7f;
len_buf[0] &= 0x0f;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & 0xf0;
dstbuf.clear();
dstbuf.reserve(len);
let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 {
buf_to_write = &mut tmp_buf;
Some(dstbuf)
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
};
buf_to_write.clear();
buf_to_write.reserve(len);
// Read the payload
let mut remain = len;
@@ -88,10 +106,29 @@ impl<'a> BlockCursor<'a> {
page_remain = PAGE_SZ;
}
let this_blk_len = min(remain, page_remain);
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
remain -= this_blk_len;
off += this_blk_len;
}
if let Some(dstbuf) = compression {
if compression_bits == BYTE_ZSTD {
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
decoder.write_all(buf_to_write).await?;
} else if compression_bits == BYTE_LZ4 {
let decompressed = lz4_flex::block::decompress(&buf_to_write, 128 as usize)
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"lz4 decompression error",
)
})?;
dstbuf.extend_from_slice(&decompressed);
} else {
unreachable!("already checked above")
}
}
Ok(())
}
}