pageserver: handle decompression outside vectored read_blobs (#8942)

Part of #8130.

## Problem

Currently, decompression is performed within the `read_blobs`
implementation and the decompressed blob will be appended to the end of
the `BytesMut` buffer. We will lose this flexibility of extending the
buffer when we switch to using our own dio-aligned buffer (WIP in
https://github.com/neondatabase/neon/pull/8730). To facilitate the
adoption of aligned buffer, we need to refactor the code to perform
decompression outside `read_blobs`.

## Summary of changes

- `VectoredBlobReader::read_blobs` will return `VectoredBlob` without
performing decompression and appending decompressed blob. It becomes the
caller's responsibility to decompress the buffer.
- Added a new `BufView` type that functions as `Cow<Bytes, &[u8]>`.
- Perform decompression within `VectoredBlob::read` so that people don't
have to explicitly thinking about compression when using the reader
interface.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-09-24 12:41:38 -04:00
committed by GitHub
parent 2f7cecaf6a
commit 4f67b0225b
3 changed files with 200 additions and 61 deletions

View File

@@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
@@ -1021,13 +1021,30 @@ impl DeltaLayerInner {
continue;
}
};
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let blob_read = meta.read(&view).await;
let blob_read = match blob_read {
Ok(buf) => buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
let value = Value::des(&blob_read);
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
@@ -1243,21 +1260,21 @@ impl DeltaLayerInner {
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf, ctx).await?;
let view = BufView::new_slice(&res.buf);
for blob in res.blobs {
let key = blob.meta.key;
let lsn = blob.meta.lsn;
let data = &res.buf[blob.start..blob.end];
let data = blob.read(&view).await?;
#[cfg(debug_assertions)]
Value::des(data)
Value::des(&data)
.with_context(|| {
format!(
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
blob.meta.key,
blob.meta.lsn,
blob.start,
blob.end,
utils::Hex(data)
"blob failed to deserialize for {}: {:?}",
blob,
utils::Hex(&data)
)
})
.unwrap();
@@ -1265,15 +1282,15 @@ impl DeltaLayerInner {
// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = crate::repository::ValueBytes::will_init(data)
let will_init = crate::repository::ValueBytes::will_init(&data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);
per_blob_copy.clear();
per_blob_copy.extend_from_slice(data);
per_blob_copy.extend_from_slice(&data);
let (tmp, res) = writer
.put_value_bytes(
@@ -1538,8 +1555,11 @@ impl<'a> DeltaLayerIterator<'a> {
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let value = Value::des(&frozen_buf[meta.start..meta.end])?;
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
}
self.key_values_batch = next_batch;
@@ -1916,9 +1936,13 @@ pub(crate) mod test {
let blobs_buf = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
.await?;
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let value = &blobs_buf.buf[meta.start..meta.end];
assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
let value = meta.read(&view).await?;
assert_eq!(
&value[..],
&entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
);
}
buf = Some(blobs_buf.buf);

View File

@@ -36,7 +36,8 @@ use crate::tenant::disk_btree::{
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
@@ -547,15 +548,15 @@ impl ImageLayerInner {
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
let img_buf = meta.read(&view).await?;
key_count += 1;
writer
.put_image(meta.meta.key, img_buf, ctx)
.put_image(meta.meta.key, img_buf.into_bytes(), ctx)
.await
.context(format!("Storing key {}", meta.meta.key))?;
}
@@ -602,13 +603,28 @@ impl ImageLayerInner {
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
let img_buf = meta.read(&view).await;
let img_buf = match img_buf {
Ok(img_buf) => img_buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
))),
);
continue;
}
};
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
Value::Image(img_buf.into_bytes()),
);
}
}
@@ -1025,10 +1041,15 @@ impl<'a> ImageLayerIterator<'a> {
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf: Bytes = blobs_buf.buf.freeze();
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
let img_buf = meta.read(&view).await?;
next_batch.push_back((
meta.meta.key,
self.image_layer.lsn,
Value::Image(img_buf.into_bytes()),
));
}
self.key_values_batch = next_batch;
Ok(())

View File

@@ -16,8 +16,9 @@
//! Note that the vectored blob api does *not* go through the page cache.
use std::collections::BTreeMap;
use std::ops::Deref;
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
@@ -35,11 +36,123 @@ pub struct BlobMeta {
pub lsn: Lsn,
}
/// Blob offsets into [`VectoredBlobsBuf::buf`]
/// A view into the vectored blobs read buffer.
#[derive(Clone, Debug)]
pub(crate) enum BufView<'a> {
Slice(&'a [u8]),
Bytes(bytes::Bytes),
}
impl<'a> BufView<'a> {
/// Creates a new slice-based view on the blob.
pub fn new_slice(slice: &'a [u8]) -> Self {
Self::Slice(slice)
}
/// Creates a new [`bytes::Bytes`]-based view on the blob.
pub fn new_bytes(bytes: bytes::Bytes) -> Self {
Self::Bytes(bytes)
}
/// Convert the view into `Bytes`.
///
/// If using slice as the underlying storage, the copy will be an O(n) operation.
pub fn into_bytes(self) -> Bytes {
match self {
BufView::Slice(slice) => Bytes::copy_from_slice(slice),
BufView::Bytes(bytes) => bytes,
}
}
/// Creates a sub-view of the blob based on the range.
fn view(&self, range: std::ops::Range<usize>) -> Self {
match self {
BufView::Slice(slice) => BufView::Slice(&slice[range]),
BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
}
}
}
impl<'a> Deref for BufView<'a> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
match self {
BufView::Slice(slice) => slice,
BufView::Bytes(bytes) => bytes,
}
}
}
impl<'a> AsRef<[u8]> for BufView<'a> {
fn as_ref(&self) -> &[u8] {
match self {
BufView::Slice(slice) => slice,
BufView::Bytes(bytes) => bytes.as_ref(),
}
}
}
impl<'a> From<&'a [u8]> for BufView<'a> {
fn from(value: &'a [u8]) -> Self {
Self::new_slice(value)
}
}
impl From<Bytes> for BufView<'_> {
fn from(value: Bytes) -> Self {
Self::new_bytes(value)
}
}
/// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
/// subject to [`VectoredBlob::compression_bits`].
pub struct VectoredBlob {
pub start: usize,
pub end: usize,
/// Blob metadata.
pub meta: BlobMeta,
/// Start offset.
start: usize,
/// End offset.
end: usize,
/// Compression used on the the blob.
compression_bits: u8,
}
impl VectoredBlob {
/// Reads a decompressed view of the blob.
pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
let view = buf.view(self.start..self.end);
match self.compression_bits {
BYTE_UNCOMPRESSED => Ok(view),
BYTE_ZSTD => {
let mut decompressed_vec = Vec::new();
let mut decoder =
async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
decoder.write_all(&view).await?;
decoder.flush().await?;
// Zero-copy conversion from `Vec` to `Bytes`
Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
}
bits => {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end),
);
Err(error)
}
}
}
}
impl std::fmt::Display for VectoredBlob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}@{}, {}..{}",
self.meta.key, self.meta.lsn, self.start, self.end
)
}
}
/// Return type of [`VectoredBlobReader::read_blobs`]
@@ -514,7 +627,7 @@ impl<'a> VectoredBlobReader<'a> {
);
}
let mut buf = self
let buf = self
.file
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
.await?
@@ -529,9 +642,6 @@ impl<'a> VectoredBlobReader<'a> {
// of a blob is implicit: the start of the next blob if one exists
// or the end of the read.
// Some scratch space, put here for reusing the allocation
let mut decompressed_vec = Vec::new();
for (blob_start, meta) in blobs_at {
let blob_start_in_buf = blob_start - start_offset;
let first_len_byte = buf[blob_start_in_buf as usize];
@@ -557,35 +667,14 @@ impl<'a> VectoredBlobReader<'a> {
)
};
let start_raw = blob_start_in_buf + size_length;
let end_raw = start_raw + blob_size;
let (start, end);
if compression_bits == BYTE_UNCOMPRESSED {
start = start_raw as usize;
end = end_raw as usize;
} else if compression_bits == BYTE_ZSTD {
let mut decoder =
async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
decoder
.write_all(&buf[start_raw as usize..end_raw as usize])
.await?;
decoder.flush().await?;
start = buf.len();
buf.extend_from_slice(&decompressed_vec);
end = buf.len();
decompressed_vec.clear();
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
}
let start = (blob_start_in_buf + size_length) as usize;
let end = start + blob_size as usize;
metas.push(VectoredBlob {
start,
end,
meta: *meta,
compression_bits,
});
}
@@ -1020,8 +1109,13 @@ mod tests {
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
assert_eq!(result.blobs.len(), 1);
let read_blob = &result.blobs[0];
let read_buf = &result.buf[read_blob.start..read_blob.end];
assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
let view = BufView::new_slice(&result.buf);
let read_buf = read_blob.read(&view).await?;
assert_eq!(
&blob[..],
&read_buf[..],
"mismatch for idx={idx} at offset={offset}"
);
buf = result.buf;
}
Ok(())