From 4af0b9b3877b27f18b00712d0aff1e56475ed3ec Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 16 Apr 2025 19:10:15 +0200 Subject: [PATCH] pageserver: don't recompress images in `ImageLayerInner::filter()` (#11592) ## Problem During shard ancestor compaction, we currently recompress all page images as we move them into a new layer file. This is expensive and unnecessary. Resolves #11562. Requires #11607. ## Summary of changes Pass through compressed page images in `ImageLayerInner::filter()`. --- pageserver/src/tenant/blob_io.rs | 28 ++++++++ .../src/tenant/storage_layer/image_layer.rs | 69 +++++++++++++++++-- pageserver/src/tenant/vectored_blob_io.rs | 1 - 3 files changed, 93 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index d1dd105b13..3483a9f31e 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -446,6 +446,34 @@ impl BlobWriter { }; (srcbuf, res.map(|_| (offset, compression_info))) } + + /// Writes a raw blob containing both header and data, returning its offset. + pub(crate) async fn write_blob_raw( + &mut self, + raw_with_header: FullSlice, + ctx: &RequestContext, + ) -> (FullSlice, Result) { + // Verify the header, to ensure we don't write invalid/corrupt data. + let header = match Header::decode(&raw_with_header) { + Ok(header) => header, + Err(err) => return (raw_with_header, Err(err)), + }; + if raw_with_header.len() != header.total_len() { + let header_total_len = header.total_len(); + let raw_len = raw_with_header.len(); + return ( + raw_with_header, + Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("header length mismatch: {header_total_len} != {raw_len}"), + )), + ); + } + + let offset = self.offset; + let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await; + (raw_with_header, result.map(|_| offset)) + } } impl BlobWriter { diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 3744d615f2..c2de20b5b3 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -559,11 +559,12 @@ impl ImageLayerInner { let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { - let img_buf = meta.read(&view).await?; - + // Just read the raw header+data and pass it through to the target layer, without + // decoding and recompressing it. + let raw = meta.raw_with_header(&view); key_count += 1; writer - .put_image(meta.meta.key, img_buf.into_bytes(), ctx) + .put_image_raw(meta.meta.key, raw.into_bytes(), ctx) .await .context(format!("Storing key {}", meta.meta.key))?; } @@ -853,6 +854,41 @@ impl ImageLayerWriterInner { Ok(()) } + /// + /// Write the next image to the file, as a raw blob header and data. + /// + /// The page versions must be appended in blknum order. + /// + async fn put_image_raw( + &mut self, + key: Key, + raw_with_header: Bytes, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + ensure!(self.key_range.contains(&key)); + + // NB: we don't update the (un)compressed metrics, since we can't determine them without + // decompressing the image. This seems okay. + self.num_keys += 1; + + let (_, res) = self + .blob_writer + .write_blob_raw(raw_with_header.slice_len(), ctx) + .await; + let offset = res?; + + let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; + key.write_to_byte_slice(&mut keybuf); + self.tree.append(&keybuf, offset)?; + + #[cfg(feature = "testing")] + { + self.last_written_key = key; + } + + Ok(()) + } + /// /// Finish writing the image layer. /// @@ -888,7 +924,13 @@ impl ImageLayerWriterInner { crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED .inc_by(self.uncompressed_bytes_eligible); crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen); - crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); + + // NB: filter() may pass through raw pages from a different layer, without looking at + // whether these are compressed or not. We don't track metrics for these, so avoid + // increasing `COMPRESSION_IMAGE_OUTPUT_BYTES` in this case too. + if self.uncompressed_bytes > 0 { + crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); + }; let mut file = self.blob_writer.into_inner(); @@ -1034,6 +1076,25 @@ impl ImageLayerWriter { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } + /// + /// Write the next value to the file, as a raw header and data. This allows passing through a + /// raw, potentially compressed image from a different layer file without recompressing it. + /// + /// The page versions must be appended in blknum order. + /// + pub async fn put_image_raw( + &mut self, + key: Key, + raw_with_header: Bytes, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.inner + .as_mut() + .unwrap() + .put_image_raw(key, raw_with_header, ctx) + .await + } + /// Estimated size of the image layer. pub(crate) fn estimated_size(&self) -> u64 { let inner = self.inner.as_ref().unwrap(); diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 8e535a55d7..f9a44fe4ca 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -151,7 +151,6 @@ impl VectoredBlob { } /// Returns the raw blob including header. - #[allow(unused)] pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> { buf.view(self.header_start..self.end) }