diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 5b65f97c92..852670af2c 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -577,7 +577,3 @@ mod tests { assert_eq!(key, Key::from_str(&format!("{key}")).unwrap()); } } - -pub fn is_rel_data_key(key: Key) -> bool { - key.field1 == 0x00 && key.field4 != 0 && key.field6 != 0xffffffff -} diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0d33100ead..970eea511a 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -18,6 +18,9 @@ use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use crate::virtual_file::VirtualFile; +use crate::{LZ4_COMPRESSION, NO_COMPRESSION}; +use lz4_flex; +use postgres_ffi::BLCKSZ; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> { self.read_blob_into_buf(offset, &mut buf, ctx).await?; Ok(buf) } + /// Read blob into the given buffer. Any previous contents in the buffer + /// are overwritten. + pub async fn read_compressed_blob( + &self, + offset: u64, + ctx: &RequestContext, + ) -> Result, std::io::Error> { + let blknum = (offset / PAGE_SZ as u64) as u32; + let off = (offset % PAGE_SZ as u64) as usize; + + let buf = self.read_blk(blknum, ctx).await?; + let compression_alg = buf[off]; + let res = self.read_blob(offset + 1, ctx).await?; + if compression_alg == LZ4_COMPRESSION { + lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error") + }) + } else { + assert_eq!(compression_alg, NO_COMPRESSION); + Ok(res) + } + } + /// Read blob into the given buffer. Any previous contents in the buffer /// are overwritten. pub async fn read_blob_into_buf( @@ -211,6 +237,41 @@ impl BlobWriter { (src_buf, Ok(())) } + pub async fn write_compressed_blob(&mut self, srcbuf: &[u8]) -> Result { + let offset = self.offset; + if srcbuf.len() < 128 { + self.write_all(&[NO_COMPRESSION]).await?; + // Short blob. Write a 1-byte length header + let len_buf = srcbuf.len() as u8; + self.write_all(&[len_buf]).await?; + } else { + // Write a 4-byte length header + if srcbuf.len() == BLCKSZ as usize { + let compressed = lz4_flex::block::compress(srcbuf); + if compressed.len() < srcbuf.len() { + self.write_all(&[LZ4_COMPRESSION]).await?; + let mut len_buf = (compressed.len() as u32).to_be_bytes(); + len_buf[0] |= 0x80; + self.write_all(&len_buf).await?; + self.write_all(&compressed).await?; + return Ok(offset); + } + } + if srcbuf.len() > 0x7fff_ffff { + return Err(Error::new( + ErrorKind::Other, + format!("blob too large ({} bytes)", srcbuf.len()), + )); + } + self.write_all(&[NO_COMPRESSION]).await?; + let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes(); + len_buf[0] |= 0x80; + self.write_all(&len_buf).await?; + } + self.write_all(srcbuf).await?; + Ok(offset) + } + /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. pub async fn write_blob, Buf: IoBuf + Send>( diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index f9bff850e7..883caac47e 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -37,6 +37,10 @@ use crate::tenant::storage_layer::{ use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner, +use crate::tenant::Timeline; +use crate::virtual_file::VirtualFile; +use crate::{ + COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX, }; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; @@ -50,7 +54,6 @@ use lz4_flex; use pageserver_api::key::is_rel_data_key; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; -use postgres_ffi::BLCKSZ; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::File; @@ -92,10 +95,6 @@ pub struct Summary { pub index_start_blk: u32, /// Block within the 'index', where the B-tree root page is stored pub index_root_blk: u32, - /// Compression algorithm used for relation pages. Compressed size should be always - /// smaller than original size, otherwise original image is stored instead of conpressed image. - /// Old storage versions (format_version < 4) do not have this fields and it is deseriealized as 0=NO+COMPRESSION - pub compression_alg: u8, // the 'values' part starts after the summary header, on block 1. } @@ -127,7 +126,6 @@ impl Summary { index_start_blk: 0, index_root_blk: 0, - compression_alg: LZ4_COMPRESSION, } } } @@ -163,7 +161,6 @@ pub struct ImageLayerInner { index_start_blk: u32, index_root_blk: u32, format_version: u16, - compression_alg: u8, lsn: Lsn, @@ -179,7 +176,6 @@ impl std::fmt::Debug for ImageLayerInner { .field("index_start_blk", &self.index_start_blk) .field("index_root_blk", &self.index_root_blk) .field("format_version", &self.format_version) - .field("compression_alg", &self.compression_alg) .finish() } } @@ -404,20 +400,10 @@ impl ImageLayerInner { let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?; - if actual_summary.compression_alg != LZ4_COMPRESSION - && actual_summary.compression_alg != NO_COMPRESSION - { - bail!( - "Unsupported compression algorithm: {}", - actual_summary.compression_alg - ); - } - if let Some(mut expected_summary) = summary { // production code path expected_summary.index_start_blk = actual_summary.index_start_blk; expected_summary.index_root_blk = actual_summary.index_root_blk; - expected_summary.compression_alg = actual_summary.compression_alg; if actual_summary != expected_summary { bail!( @@ -432,7 +418,6 @@ impl ImageLayerInner { index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, format_version: actual_summary.format_version, - compression_alg: actual_summary.compression_alg, lsn, file, file_id, @@ -461,26 +446,17 @@ impl ImageLayerInner { ) .await? { - let blob = block_reader - .block_cursor() - .read_blob( - offset, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::ImageLayerValue) - .build(), - ) - .await - .with_context(|| format!("failed to read value from offset {}", offset))?; - - let value = if self.compression_alg == LZ4_COMPRESSION - && is_rel_data_key(key) - && blob.len() < BLCKSZ as usize - { - let decompressed = lz4_flex::block::decompress(&blob, BLCKSZ as usize)?; - Bytes::from(decompressed) + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerValue) + .build(); + let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION { + file.block_cursor().read_compressed_blob(offset, &ctx).await } else { - Bytes::from(blob) - }; + file.block_cursor().read_blob(offset, &ctx).await + }) + .with_context(|| format!("failed to read value from offset {}", offset))?; + + let value = Bytes::from(blob); reconstruct_state.img = Some((self.lsn, value)); Ok(ValueReconstructResult::Complete) } else { @@ -691,17 +667,7 @@ impl ImageLayerWriterInner { /// async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = if is_rel_data_key(key) { - assert_eq!(img.len(), BLCKSZ as usize); - let compressed = lz4_flex::block::compress(img); - if compressed.len() < img.len() { - self.blob_writer.write_blob(&compressed).await; - } else { - self.blob_writer.write_blob(img).await; - } - } else { - self.blob_writer.write_blob(img).await; - }; + let (_img, res) = self.blob_writer.write_compressed_blob(img).await?; // TODO: re-use the buffer for `img` further upstack let off = res?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; @@ -739,7 +705,6 @@ impl ImageLayerWriterInner { lsn: self.lsn, index_start_blk, index_root_blk, - compression_alg: LZ4_COMPRESSION, }; let mut buf = Vec::with_capacity(PAGE_SZ);