mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
pageserver: don't recompress images in ImageLayerInner::filter() (#11592)
## Problem During shard ancestor compaction, we currently recompress all page images as we move them into a new layer file. This is expensive and unnecessary. Resolves #11562. Requires #11607. ## Summary of changes Pass through compressed page images in `ImageLayerInner::filter()`.
This commit is contained in:
@@ -446,6 +446,34 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
};
|
||||
(srcbuf, res.map(|_| (offset, compression_info)))
|
||||
}
|
||||
|
||||
/// Writes a raw blob containing both header and data, returning its offset.
|
||||
pub(crate) async fn write_blob_raw<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
raw_with_header: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<u64, Error>) {
|
||||
// Verify the header, to ensure we don't write invalid/corrupt data.
|
||||
let header = match Header::decode(&raw_with_header) {
|
||||
Ok(header) => header,
|
||||
Err(err) => return (raw_with_header, Err(err)),
|
||||
};
|
||||
if raw_with_header.len() != header.total_len() {
|
||||
let header_total_len = header.total_len();
|
||||
let raw_len = raw_with_header.len();
|
||||
return (
|
||||
raw_with_header,
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("header length mismatch: {header_total_len} != {raw_len}"),
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
let offset = self.offset;
|
||||
let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
|
||||
(raw_with_header, result.map(|_| offset))
|
||||
}
|
||||
}
|
||||
|
||||
impl BlobWriter<true> {
|
||||
|
||||
@@ -559,11 +559,12 @@ impl ImageLayerInner {
|
||||
let view = BufView::new_slice(&blobs_buf.buf);
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = meta.read(&view).await?;
|
||||
|
||||
// Just read the raw header+data and pass it through to the target layer, without
|
||||
// decoding and recompressing it.
|
||||
let raw = meta.raw_with_header(&view);
|
||||
key_count += 1;
|
||||
writer
|
||||
.put_image(meta.meta.key, img_buf.into_bytes(), ctx)
|
||||
.put_image_raw(meta.meta.key, raw.into_bytes(), ctx)
|
||||
.await
|
||||
.context(format!("Storing key {}", meta.meta.key))?;
|
||||
}
|
||||
@@ -853,6 +854,41 @@ impl ImageLayerWriterInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Write the next image to the file, as a raw blob header and data.
|
||||
///
|
||||
/// The page versions must be appended in blknum order.
|
||||
///
|
||||
async fn put_image_raw(
|
||||
&mut self,
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
|
||||
// NB: we don't update the (un)compressed metrics, since we can't determine them without
|
||||
// decompressing the image. This seems okay.
|
||||
self.num_keys += 1;
|
||||
|
||||
let (_, res) = self
|
||||
.blob_writer
|
||||
.write_blob_raw(raw_with_header.slice_len(), ctx)
|
||||
.await;
|
||||
let offset = res?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, offset)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
self.last_written_key = key;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
@@ -888,7 +924,13 @@ impl ImageLayerWriterInner {
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
|
||||
.inc_by(self.uncompressed_bytes_eligible);
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
|
||||
// NB: filter() may pass through raw pages from a different layer, without looking at
|
||||
// whether these are compressed or not. We don't track metrics for these, so avoid
|
||||
// increasing `COMPRESSION_IMAGE_OUTPUT_BYTES` in this case too.
|
||||
if self.uncompressed_bytes > 0 {
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
};
|
||||
|
||||
let mut file = self.blob_writer.into_inner();
|
||||
|
||||
@@ -1034,6 +1076,25 @@ impl ImageLayerWriter {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
///
|
||||
/// Write the next value to the file, as a raw header and data. This allows passing through a
|
||||
/// raw, potentially compressed image from a different layer file without recompressing it.
|
||||
///
|
||||
/// The page versions must be appended in blknum order.
|
||||
///
|
||||
pub async fn put_image_raw(
|
||||
&mut self,
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.put_image_raw(key, raw_with_header, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Estimated size of the image layer.
|
||||
pub(crate) fn estimated_size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
|
||||
@@ -151,7 +151,6 @@ impl VectoredBlob {
|
||||
}
|
||||
|
||||
/// Returns the raw blob including header.
|
||||
#[allow(unused)]
|
||||
pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> {
|
||||
buf.view(self.header_start..self.end)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user