diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 2e6c985158..c3cd337d0b 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -237,7 +237,11 @@ impl BlobWriter { (src_buf, Ok(())) } - pub async fn write_compressed_blob(&mut self, srcbuf: Bytes, compress: bool) -> Result { + pub async fn write_compressed_blob( + &mut self, + srcbuf: Bytes, + compress: bool, + ) -> Result { let offset = self.offset; let len = srcbuf.len(); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b7132ee3bf..c9e153536f 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -966,7 +966,7 @@ impl DeltaLayerInner { // track when a key is done. for read in reads.into_iter().rev() { let res = vectored_blob_reader - .read_blobs(&read, buf.take().expect("Should have a buffer")) + .read_blobs(&read, buf.take().expect("Should have a buffer"), false) .await; let blobs_buf = match res { diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index c4d30cf1e3..51cac59b26 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -40,7 +40,8 @@ use crate::tenant::vectored_blob_io::{ use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{ - COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX, + COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, LZ4_COMPRESSION, STORAGE_FORMAT_VERSION, + TEMP_FILE_SUFFIX, }; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; @@ -49,6 +50,7 @@ use hex; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; +use postgres_ffi::BLCKSZ; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::File; @@ -546,9 +548,12 @@ impl ImageLayerInner { .into(); let vectored_blob_reader = VectoredBlobReader::new(&self.file); + let compressed = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION; for read in reads.into_iter() { let buf = BytesMut::with_capacity(max_vectored_read_bytes); - let res = vectored_blob_reader.read_blobs(&read, buf).await; + let res = vectored_blob_reader + .read_blobs(&read, buf, compressed) + .await; match res { Ok(blobs_buf) => { @@ -556,11 +561,31 @@ impl ImageLayerInner { for meta in blobs_buf.blobs.iter() { let img_buf = frozen_buf.slice(meta.start..meta.end); - reconstruct_state.update_key( - &meta.meta.key, - self.lsn, - Value::Image(img_buf), - ); + if meta.compression_alg == LZ4_COMPRESSION { + match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) { + Ok(decompressed) => { + reconstruct_state.update_key( + &meta.meta.key, + self.lsn, + Value::Image(Bytes::from(decompressed)), + ); + } + Err(err) => reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::from(anyhow!( + "Failed to decompress blob from file {}: {}", + self.file.path, + err + )), + ), + } + } else { + reconstruct_state.update_key( + &meta.meta.key, + self.lsn, + Value::Image(img_buf), + ); + } } } Err(err) => { @@ -668,7 +693,10 @@ impl ImageLayerWriterInner { /// async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let off = self.blob_writer.write_compressed_blob(img, self.compression).await?; + let off = self + .blob_writer + .write_compressed_blob(img, self.compression) + .await?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); self.tree.append(&keybuf, off)?; diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 805f70b23b..17e618b4e0 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -15,11 +15,11 @@ //! //! Note that the vectored blob api does *not* go through the page cache. -use std::collections::BTreeMap; -use std::num::NonZeroUsize; - +use crate::NO_COMPRESSION; use bytes::BytesMut; use pageserver_api::key::Key; +use std::collections::BTreeMap; +use std::num::NonZeroUsize; use utils::lsn::Lsn; use utils::vec_map::VecMap; @@ -40,6 +40,7 @@ pub struct VectoredBlob { pub start: usize, pub end: usize, pub meta: BlobMeta, + pub compression_alg: u8, } /// Return type of [`VectoredBlobReader::read_blobs`] @@ -274,6 +275,7 @@ impl<'a> VectoredBlobReader<'a> { &self, read: &VectoredRead, buf: BytesMut, + new_storage_format: bool, ) -> Result { assert!(read.size() > 0); assert!( @@ -304,35 +306,42 @@ impl<'a> VectoredBlobReader<'a> { ); for ((offset, meta), next) in pairs { - let offset_in_buf = offset - start_offset; - let first_len_byte = buf[offset_in_buf as usize]; + let mut offset_in_buf = (offset - start_offset) as usize; + let compression_alg = if new_storage_format { + offset_in_buf += 1; + buf[offset_in_buf - 1] + } else { + NO_COMPRESSION + }; + let first_len_byte = buf[offset_in_buf]; // Each blob is prefixed by a header containing it's size. // Extract the size and skip that header to find the start of the data. // The size can be 1 or 4 bytes. The most significant bit is 0 in the // 1 byte case and 1 in the 4 byte case. let (size_length, blob_size) = if first_len_byte < 0x80 { - (1, first_len_byte as u64) + (1usize, first_len_byte as usize) } else { let mut blob_size_buf = [0u8; 4]; - let offset_in_buf = offset_in_buf as usize; - blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]); blob_size_buf[0] &= 0x7f; - (4, u32::from_be_bytes(blob_size_buf) as u64) + (4usize, u32::from_be_bytes(blob_size_buf) as usize) }; let start = offset_in_buf + size_length; let end = match next { - Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset, + Some((next_blob_start_offset, _)) => { + (next_blob_start_offset - start_offset) as usize + } None => start + blob_size, }; assert_eq!(end - start, blob_size); metas.push(VectoredBlob { - start: start as usize, - end: end as usize, + start, + end, + compression_alg, meta: *meta, }) }