mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
pageserver: add VectoredBlob::raw_with_header (#11607)
## Problem To avoid recompressing page images during layer filtering, we need access to the raw header and data from vectored reads such that we can pass them through to the target layer. Touches #11562. ## Summary of changes Adds `VectoredBlob::raw_with_header()` to return a raw view of the header+data, and updates `read()` to track it. Also adds `blob_io::Header` with header metadata and decode logic, to reuse for tests and assertions. This isn't yet widely used.
This commit is contained in:
@@ -37,6 +37,63 @@ pub struct CompressionInfo {
|
||||
pub compressed_size: Option<usize>,
|
||||
}
|
||||
|
||||
/// A blob header, with header+data length and compression info.
|
||||
///
|
||||
/// TODO: use this more widely, and add an encode() method too.
|
||||
/// TODO: document the header format.
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub struct Header {
|
||||
pub header_len: usize,
|
||||
pub data_len: usize,
|
||||
pub compression_bits: u8,
|
||||
}
|
||||
|
||||
impl Header {
|
||||
/// Decodes a header from a byte slice.
|
||||
pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
|
||||
let Some(&first_header_byte) = bytes.first() else {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"zero-length blob header",
|
||||
));
|
||||
};
|
||||
|
||||
// If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
|
||||
if first_header_byte < 0x80 {
|
||||
return Ok(Self {
|
||||
header_len: 1, // by definition
|
||||
data_len: first_header_byte as usize,
|
||||
compression_bits: BYTE_UNCOMPRESSED,
|
||||
});
|
||||
}
|
||||
|
||||
// Otherwise, this is a 4-byte header containing compression information and length.
|
||||
const HEADER_LEN: usize = 4;
|
||||
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("blob header too short: {bytes:?}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
// TODO: verify the compression bits and convert to an enum.
|
||||
let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
|
||||
header_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
|
||||
let data_len = u32::from_be_bytes(header_buf) as usize;
|
||||
|
||||
Ok(Self {
|
||||
header_len: HEADER_LEN,
|
||||
data_len,
|
||||
compression_bits,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the total header+data length.
|
||||
pub fn total_len(&self) -> usize {
|
||||
self.header_len + self.data_len
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockCursor<'_> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
|
||||
@@ -26,7 +26,7 @@ use utils::lsn::Lsn;
|
||||
use utils::vec_map::VecMap;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
|
||||
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, Header};
|
||||
use crate::virtual_file::{self, IoBufferMut, VirtualFile};
|
||||
|
||||
/// Metadata bundled with the start and end offset of a blob.
|
||||
@@ -111,18 +111,20 @@ impl From<Bytes> for BufView<'_> {
|
||||
pub struct VectoredBlob {
|
||||
/// Blob metadata.
|
||||
pub meta: BlobMeta,
|
||||
/// Start offset.
|
||||
start: usize,
|
||||
/// Header start offset.
|
||||
header_start: usize,
|
||||
/// Data start offset.
|
||||
data_start: usize,
|
||||
/// End offset.
|
||||
end: usize,
|
||||
/// Compression used on the the blob.
|
||||
/// Compression used on the data, extracted from the header.
|
||||
compression_bits: u8,
|
||||
}
|
||||
|
||||
impl VectoredBlob {
|
||||
/// Reads a decompressed view of the blob.
|
||||
pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
|
||||
let view = buf.view(self.start..self.end);
|
||||
let view = buf.view(self.data_start..self.end);
|
||||
|
||||
match self.compression_bits {
|
||||
BYTE_UNCOMPRESSED => Ok(view),
|
||||
@@ -140,13 +142,19 @@ impl VectoredBlob {
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}",
|
||||
self.meta.key, self.meta.lsn, self.start, self.end
|
||||
self.meta.key, self.meta.lsn, self.data_start, self.end
|
||||
),
|
||||
);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the raw blob including header.
|
||||
#[allow(unused)]
|
||||
pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> {
|
||||
buf.view(self.header_start..self.end)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for VectoredBlob {
|
||||
@@ -154,7 +162,7 @@ impl std::fmt::Display for VectoredBlob {
|
||||
write!(
|
||||
f,
|
||||
"{}@{}, {}..{}",
|
||||
self.meta.key, self.meta.lsn, self.start, self.end
|
||||
self.meta.key, self.meta.lsn, self.data_start, self.end
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -493,50 +501,28 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
|
||||
let blobs_at = read.blobs_at.as_slice();
|
||||
|
||||
let start_offset = read.start;
|
||||
|
||||
let mut metas = Vec::with_capacity(blobs_at.len());
|
||||
let mut blobs = Vec::with_capacity(blobs_at.len());
|
||||
// Blobs in `read` only provide their starting offset. The end offset
|
||||
// of a blob is implicit: the start of the next blob if one exists
|
||||
// or the end of the read.
|
||||
|
||||
for (blob_start, meta) in blobs_at {
|
||||
let blob_start_in_buf = blob_start - start_offset;
|
||||
let first_len_byte = buf[blob_start_in_buf as usize];
|
||||
for (blob_start, meta) in blobs_at.iter().copied() {
|
||||
let header_start = (blob_start - read.start) as usize;
|
||||
let header = Header::decode(&buf[header_start..])?;
|
||||
let data_start = header_start + header.header_len;
|
||||
let end = data_start + header.data_len;
|
||||
let compression_bits = header.compression_bits;
|
||||
|
||||
// Each blob is prefixed by a header containing its size and compression information.
|
||||
// Extract the size and skip that header to find the start of the data.
|
||||
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
|
||||
// 1 byte case and 1 in the 4 byte case.
|
||||
let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
|
||||
(1, first_len_byte as u64, BYTE_UNCOMPRESSED)
|
||||
} else {
|
||||
let mut blob_size_buf = [0u8; 4];
|
||||
let offset_in_buf = blob_start_in_buf as usize;
|
||||
|
||||
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
|
||||
blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
|
||||
|
||||
let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
|
||||
(
|
||||
4,
|
||||
u32::from_be_bytes(blob_size_buf) as u64,
|
||||
compression_bits,
|
||||
)
|
||||
};
|
||||
|
||||
let start = (blob_start_in_buf + size_length) as usize;
|
||||
let end = start + blob_size as usize;
|
||||
|
||||
metas.push(VectoredBlob {
|
||||
start,
|
||||
blobs.push(VectoredBlob {
|
||||
header_start,
|
||||
data_start,
|
||||
end,
|
||||
meta: *meta,
|
||||
meta,
|
||||
compression_bits,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(VectoredBlobsBuf { buf, blobs: metas })
|
||||
Ok(VectoredBlobsBuf { buf, blobs })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -997,6 +983,15 @@ mod tests {
|
||||
&read_buf[..],
|
||||
"mismatch for idx={idx} at offset={offset}"
|
||||
);
|
||||
|
||||
// Check that raw_with_header returns a valid header.
|
||||
let raw = read_blob.raw_with_header(&view);
|
||||
let header = Header::decode(&raw)?;
|
||||
if !compression || header.header_len == 1 {
|
||||
assert_eq!(header.compression_bits, BYTE_UNCOMPRESSED);
|
||||
}
|
||||
assert_eq!(raw.len(), header.total_len());
|
||||
|
||||
buf = result.buf;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user