diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index abeaa166a4..d1dd105b13 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -37,6 +37,63 @@ pub struct CompressionInfo { pub compressed_size: Option, } +/// A blob header, with header+data length and compression info. +/// +/// TODO: use this more widely, and add an encode() method too. +/// TODO: document the header format. +#[derive(Clone, Copy, Default)] +pub struct Header { + pub header_len: usize, + pub data_len: usize, + pub compression_bits: u8, +} + +impl Header { + /// Decodes a header from a byte slice. + pub fn decode(bytes: &[u8]) -> Result { + let Some(&first_header_byte) = bytes.first() else { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "zero-length blob header", + )); + }; + + // If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes. + if first_header_byte < 0x80 { + return Ok(Self { + header_len: 1, // by definition + data_len: first_header_byte as usize, + compression_bits: BYTE_UNCOMPRESSED, + }); + } + + // Otherwise, this is a 4-byte header containing compression information and length. + const HEADER_LEN: usize = 4; + let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("blob header too short: {bytes:?}"), + ) + })?; + + // TODO: verify the compression bits and convert to an enum. + let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK; + header_buf[0] &= !LEN_COMPRESSION_BIT_MASK; + let data_len = u32::from_be_bytes(header_buf) as usize; + + Ok(Self { + header_len: HEADER_LEN, + data_len, + compression_bits, + }) + } + + /// Returns the total header+data length. + pub fn total_len(&self) -> usize { + self.header_len + self.data_len + } +} + impl BlockCursor<'_> { /// Read a blob into a new buffer. pub async fn read_blob( diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 166917d674..8e535a55d7 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -26,7 +26,7 @@ use utils::lsn::Lsn; use utils::vec_map::VecMap; use crate::context::RequestContext; -use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; +use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, Header}; use crate::virtual_file::{self, IoBufferMut, VirtualFile}; /// Metadata bundled with the start and end offset of a blob. @@ -111,18 +111,20 @@ impl From for BufView<'_> { pub struct VectoredBlob { /// Blob metadata. pub meta: BlobMeta, - /// Start offset. - start: usize, + /// Header start offset. + header_start: usize, + /// Data start offset. + data_start: usize, /// End offset. end: usize, - /// Compression used on the the blob. + /// Compression used on the data, extracted from the header. compression_bits: u8, } impl VectoredBlob { /// Reads a decompressed view of the blob. pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result, std::io::Error> { - let view = buf.view(self.start..self.end); + let view = buf.view(self.data_start..self.end); match self.compression_bits { BYTE_UNCOMPRESSED => Ok(view), @@ -140,13 +142,19 @@ impl VectoredBlob { std::io::ErrorKind::InvalidData, format!( "Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", - self.meta.key, self.meta.lsn, self.start, self.end + self.meta.key, self.meta.lsn, self.data_start, self.end ), ); Err(error) } } } + + /// Returns the raw blob including header. + #[allow(unused)] + pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> { + buf.view(self.header_start..self.end) + } } impl std::fmt::Display for VectoredBlob { @@ -154,7 +162,7 @@ impl std::fmt::Display for VectoredBlob { write!( f, "{}@{}, {}..{}", - self.meta.key, self.meta.lsn, self.start, self.end + self.meta.key, self.meta.lsn, self.data_start, self.end ) } } @@ -493,50 +501,28 @@ impl<'a> VectoredBlobReader<'a> { let blobs_at = read.blobs_at.as_slice(); - let start_offset = read.start; - - let mut metas = Vec::with_capacity(blobs_at.len()); + let mut blobs = Vec::with_capacity(blobs_at.len()); // Blobs in `read` only provide their starting offset. The end offset // of a blob is implicit: the start of the next blob if one exists // or the end of the read. - for (blob_start, meta) in blobs_at { - let blob_start_in_buf = blob_start - start_offset; - let first_len_byte = buf[blob_start_in_buf as usize]; + for (blob_start, meta) in blobs_at.iter().copied() { + let header_start = (blob_start - read.start) as usize; + let header = Header::decode(&buf[header_start..])?; + let data_start = header_start + header.header_len; + let end = data_start + header.data_len; + let compression_bits = header.compression_bits; - // Each blob is prefixed by a header containing its size and compression information. - // Extract the size and skip that header to find the start of the data. - // The size can be 1 or 4 bytes. The most significant bit is 0 in the - // 1 byte case and 1 in the 4 byte case. - let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 { - (1, first_len_byte as u64, BYTE_UNCOMPRESSED) - } else { - let mut blob_size_buf = [0u8; 4]; - let offset_in_buf = blob_start_in_buf as usize; - - blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]); - blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK; - - let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK; - ( - 4, - u32::from_be_bytes(blob_size_buf) as u64, - compression_bits, - ) - }; - - let start = (blob_start_in_buf + size_length) as usize; - let end = start + blob_size as usize; - - metas.push(VectoredBlob { - start, + blobs.push(VectoredBlob { + header_start, + data_start, end, - meta: *meta, + meta, compression_bits, }); } - Ok(VectoredBlobsBuf { buf, blobs: metas }) + Ok(VectoredBlobsBuf { buf, blobs }) } } @@ -997,6 +983,15 @@ mod tests { &read_buf[..], "mismatch for idx={idx} at offset={offset}" ); + + // Check that raw_with_header returns a valid header. + let raw = read_blob.raw_with_header(&view); + let header = Header::decode(&raw)?; + if !compression || header.header_len == 1 { + assert_eq!(header.compression_bits, BYTE_UNCOMPRESSED); + } + assert_eq!(raw.len(), header.total_len()); + buf = result.buf; } Ok(())