From 4f67b0225bb946c32f5b9c8d1d96eafbb05295ca Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Tue, 24 Sep 2024 12:41:38 -0400 Subject: [PATCH] pageserver: handle decompression outside vectored `read_blobs` (#8942) Part of #8130. ## Problem Currently, decompression is performed within the `read_blobs` implementation and the decompressed blob will be appended to the end of the `BytesMut` buffer. We will lose this flexibility of extending the buffer when we switch to using our own dio-aligned buffer (WIP in https://github.com/neondatabase/neon/pull/8730). To facilitate the adoption of aligned buffer, we need to refactor the code to perform decompression outside `read_blobs`. ## Summary of changes - `VectoredBlobReader::read_blobs` will return `VectoredBlob` without performing decompression and appending decompressed blob. It becomes the caller's responsibility to decompress the buffer. - Added a new `BufView` type that functions as `Cow`. - Perform decompression within `VectoredBlob::read` so that people don't have to explicitly thinking about compression when using the reader interface. Signed-off-by: Yuchen Liang --- .../src/tenant/storage_layer/delta_layer.rs | 58 +++++-- .../src/tenant/storage_layer/image_layer.rs | 41 +++-- pageserver/src/tenant/vectored_blob_io.rs | 162 ++++++++++++++---- 3 files changed, 200 insertions(+), 61 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 34f1b15138..2b212cfed5 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadCoalesceMode, VectoredReadPlanner, }; use crate::tenant::PageReconstructError; @@ -1021,13 +1021,30 @@ impl DeltaLayerInner { continue; } }; - + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter().rev() { if Some(meta.meta.key) == ignore_key_with_err { continue; } + let blob_read = meta.read(&view).await; + let blob_read = match blob_read { + Ok(buf) => buf, + Err(e) => { + reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::Other(anyhow!(e).context(format!( + "Failed to decompress blob from virtual file {}", + self.file.path, + ))), + ); + + ignore_key_with_err = Some(meta.meta.key); + continue; + } + }; + + let value = Value::des(&blob_read); - let value = Value::des(&blobs_buf.buf[meta.start..meta.end]); let value = match value { Ok(v) => v, Err(e) => { @@ -1243,21 +1260,21 @@ impl DeltaLayerInner { buf.reserve(read.size()); let res = reader.read_blobs(&read, buf, ctx).await?; + let view = BufView::new_slice(&res.buf); + for blob in res.blobs { let key = blob.meta.key; let lsn = blob.meta.lsn; - let data = &res.buf[blob.start..blob.end]; + + let data = blob.read(&view).await?; #[cfg(debug_assertions)] - Value::des(data) + Value::des(&data) .with_context(|| { format!( - "blob failed to deserialize for {}@{}, {}..{}: {:?}", - blob.meta.key, - blob.meta.lsn, - blob.start, - blob.end, - utils::Hex(data) + "blob failed to deserialize for {}: {:?}", + blob, + utils::Hex(&data) ) }) .unwrap(); @@ -1265,15 +1282,15 @@ impl DeltaLayerInner { // is it an image or will_init walrecord? // FIXME: this could be handled by threading the BlobRef to the // VectoredReadBuilder - let will_init = crate::repository::ValueBytes::will_init(data) + let will_init = crate::repository::ValueBytes::will_init(&data) .inspect_err(|_e| { #[cfg(feature = "testing")] - tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); + tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); }) .unwrap_or(false); per_blob_copy.clear(); - per_blob_copy.extend_from_slice(data); + per_blob_copy.extend_from_slice(&data); let (tmp, res) = writer .put_value_bytes( @@ -1538,8 +1555,11 @@ impl<'a> DeltaLayerIterator<'a> { .read_blobs(&plan, buf, self.ctx) .await?; let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let value = Value::des(&frozen_buf[meta.start..meta.end])?; + let blob_read = meta.read(&view).await?; + let value = Value::des(&blob_read)?; + next_batch.push_back((meta.meta.key, meta.meta.lsn, value)); } self.key_values_batch = next_batch; @@ -1916,9 +1936,13 @@ pub(crate) mod test { let blobs_buf = vectored_blob_reader .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx) .await?; + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { - let value = &blobs_buf.buf[meta.start..meta.end]; - assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]); + let value = meta.read(&view).await?; + assert_eq!( + &value[..], + &entries_meta.index[&(meta.meta.key, meta.meta.lsn)] + ); } buf = Some(blobs_buf.buf); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 5de2582ab7..940d169db0 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -36,7 +36,8 @@ use crate::tenant::disk_btree::{ }; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, + BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + VectoredReadPlanner, }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; @@ -547,15 +548,15 @@ impl ImageLayerInner { let buf = BytesMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?; - let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); + let img_buf = meta.read(&view).await?; key_count += 1; writer - .put_image(meta.meta.key, img_buf, ctx) + .put_image(meta.meta.key, img_buf.into_bytes(), ctx) .await .context(format!("Storing key {}", meta.meta.key))?; } @@ -602,13 +603,28 @@ impl ImageLayerInner { match res { Ok(blobs_buf) => { let frozen_buf = blobs_buf.buf.freeze(); - + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); + let img_buf = meta.read(&view).await; + + let img_buf = match img_buf { + Ok(img_buf) => img_buf, + Err(e) => { + reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::Other(anyhow!(e).context(format!( + "Failed to decompress blob from virtual file {}", + self.file.path, + ))), + ); + + continue; + } + }; reconstruct_state.update_key( &meta.meta.key, self.lsn, - Value::Image(img_buf), + Value::Image(img_buf.into_bytes()), ); } } @@ -1025,10 +1041,15 @@ impl<'a> ImageLayerIterator<'a> { let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf: Bytes = blobs_buf.buf.freeze(); + let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); - next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf))); + let img_buf = meta.read(&view).await?; + next_batch.push_back(( + meta.meta.key, + self.image_layer.lsn, + Value::Image(img_buf.into_bytes()), + )); } self.key_values_batch = next_batch; Ok(()) diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 553edf6d8b..aa37a45898 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -16,8 +16,9 @@ //! Note that the vectored blob api does *not* go through the page cache. use std::collections::BTreeMap; +use std::ops::Deref; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -35,11 +36,123 @@ pub struct BlobMeta { pub lsn: Lsn, } -/// Blob offsets into [`VectoredBlobsBuf::buf`] +/// A view into the vectored blobs read buffer. +#[derive(Clone, Debug)] +pub(crate) enum BufView<'a> { + Slice(&'a [u8]), + Bytes(bytes::Bytes), +} + +impl<'a> BufView<'a> { + /// Creates a new slice-based view on the blob. + pub fn new_slice(slice: &'a [u8]) -> Self { + Self::Slice(slice) + } + + /// Creates a new [`bytes::Bytes`]-based view on the blob. + pub fn new_bytes(bytes: bytes::Bytes) -> Self { + Self::Bytes(bytes) + } + + /// Convert the view into `Bytes`. + /// + /// If using slice as the underlying storage, the copy will be an O(n) operation. + pub fn into_bytes(self) -> Bytes { + match self { + BufView::Slice(slice) => Bytes::copy_from_slice(slice), + BufView::Bytes(bytes) => bytes, + } + } + + /// Creates a sub-view of the blob based on the range. + fn view(&self, range: std::ops::Range) -> Self { + match self { + BufView::Slice(slice) => BufView::Slice(&slice[range]), + BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)), + } + } +} + +impl<'a> Deref for BufView<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + BufView::Slice(slice) => slice, + BufView::Bytes(bytes) => bytes, + } + } +} + +impl<'a> AsRef<[u8]> for BufView<'a> { + fn as_ref(&self) -> &[u8] { + match self { + BufView::Slice(slice) => slice, + BufView::Bytes(bytes) => bytes.as_ref(), + } + } +} + +impl<'a> From<&'a [u8]> for BufView<'a> { + fn from(value: &'a [u8]) -> Self { + Self::new_slice(value) + } +} + +impl From for BufView<'_> { + fn from(value: Bytes) -> Self { + Self::new_bytes(value) + } +} + +/// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed, +/// subject to [`VectoredBlob::compression_bits`]. pub struct VectoredBlob { - pub start: usize, - pub end: usize, + /// Blob metadata. pub meta: BlobMeta, + /// Start offset. + start: usize, + /// End offset. + end: usize, + /// Compression used on the the blob. + compression_bits: u8, +} + +impl VectoredBlob { + /// Reads a decompressed view of the blob. + pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result, std::io::Error> { + let view = buf.view(self.start..self.end); + + match self.compression_bits { + BYTE_UNCOMPRESSED => Ok(view), + BYTE_ZSTD => { + let mut decompressed_vec = Vec::new(); + let mut decoder = + async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); + decoder.write_all(&view).await?; + decoder.flush().await?; + // Zero-copy conversion from `Vec` to `Bytes` + Ok(BufView::new_bytes(Bytes::from(decompressed_vec))) + } + bits => { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end), + ); + Err(error) + } + } + } +} + +impl std::fmt::Display for VectoredBlob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}@{}, {}..{}", + self.meta.key, self.meta.lsn, self.start, self.end + ) + } } /// Return type of [`VectoredBlobReader::read_blobs`] @@ -514,7 +627,7 @@ impl<'a> VectoredBlobReader<'a> { ); } - let mut buf = self + let buf = self .file .read_exact_at(buf.slice(0..read.size()), read.start, ctx) .await? @@ -529,9 +642,6 @@ impl<'a> VectoredBlobReader<'a> { // of a blob is implicit: the start of the next blob if one exists // or the end of the read. - // Some scratch space, put here for reusing the allocation - let mut decompressed_vec = Vec::new(); - for (blob_start, meta) in blobs_at { let blob_start_in_buf = blob_start - start_offset; let first_len_byte = buf[blob_start_in_buf as usize]; @@ -557,35 +667,14 @@ impl<'a> VectoredBlobReader<'a> { ) }; - let start_raw = blob_start_in_buf + size_length; - let end_raw = start_raw + blob_size; - let (start, end); - if compression_bits == BYTE_UNCOMPRESSED { - start = start_raw as usize; - end = end_raw as usize; - } else if compression_bits == BYTE_ZSTD { - let mut decoder = - async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); - decoder - .write_all(&buf[start_raw as usize..end_raw as usize]) - .await?; - decoder.flush().await?; - start = buf.len(); - buf.extend_from_slice(&decompressed_vec); - end = buf.len(); - decompressed_vec.clear(); - } else { - let error = std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid compression byte {compression_bits:x}"), - ); - return Err(error); - } + let start = (blob_start_in_buf + size_length) as usize; + let end = start + blob_size as usize; metas.push(VectoredBlob { start, end, meta: *meta, + compression_bits, }); } @@ -1020,8 +1109,13 @@ mod tests { let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; assert_eq!(result.blobs.len(), 1); let read_blob = &result.blobs[0]; - let read_buf = &result.buf[read_blob.start..read_blob.end]; - assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}"); + let view = BufView::new_slice(&result.buf); + let read_buf = read_blob.read(&view).await?; + assert_eq!( + &blob[..], + &read_buf[..], + "mismatch for idx={idx} at offset={offset}" + ); buf = result.buf; } Ok(())