diff --git a/Cargo.lock b/Cargo.lock index 7fd9053f62..db206dd0c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2841,6 +2841,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lz4_flex" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" +dependencies = [ + "twox-hash", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3511,6 +3520,7 @@ dependencies = [ "hyper", "itertools", "leaky-bucket", + "lz4_flex", "md5", "metrics", "nix 0.27.1", diff --git a/Cargo.toml b/Cargo.toml index 76f4ff041c..04178987e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ jsonwebtoken = "9" lasso = "0.7" leaky-bucket = "1.0.1" libc = "0.2" +lz4_flex = "0.11.1" md5 = "0.7.0" memoffset = "0.8" native-tls = "0.2" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 5adeaffe1a..a01a50dc52 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true leaky-bucket.workspace = true +lz4_flex.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 02a690d4e1..4a688d36d4 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -40,7 +40,14 @@ use tracing::info; /// format, bump this! /// Note that TimelineMetadata uses its own version number to track /// backwards-compatible changes to the metadata format. -pub const STORAGE_FORMAT_VERSION: u16 = 3; +pub const STORAGE_FORMAT_VERSION: u16 = 4; + +/// Minimal sorage format version with compression support +pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4; + +/// Page image compression algorithm +pub const NO_COMPRESSION: u8 = 0; +pub const LZ4_COMPRESSION: u8 = 0; pub const DEFAULT_PG_VERSION: u32 = 15; diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0d33100ead..4ba68bac1f 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,13 +11,16 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use crate::virtual_file::VirtualFile; +use crate::{LZ4_COMPRESSION, NO_COMPRESSION}; +use lz4_flex; +use postgres_ffi::BLCKSZ; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> { self.read_blob_into_buf(offset, &mut buf, ctx).await?; Ok(buf) } + /// Read blob into the given buffer. Any previous contents in the buffer + /// are overwritten. + pub async fn read_compressed_blob( + &self, + offset: u64, + ctx: &RequestContext, + ) -> Result, std::io::Error> { + let blknum = (offset / PAGE_SZ as u64) as u32; + let off = (offset % PAGE_SZ as u64) as usize; + + let buf = self.read_blk(blknum, ctx).await?; + let compression_alg = buf[off]; + let res = self.read_blob(offset + 1, ctx).await?; + if compression_alg == LZ4_COMPRESSION { + lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error") + }) + } else { + assert_eq!(compression_alg, NO_COMPRESSION); + Ok(res) + } + } + /// Read blob into the given buffer. Any previous contents in the buffer /// are overwritten. pub async fn read_blob_into_buf( @@ -211,6 +237,58 @@ impl BlobWriter { (src_buf, Ok(())) } + pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result { + let offset = self.offset; + + let len = srcbuf.len(); + + let mut io_buf = self.io_buf.take().expect("we always put it back below"); + io_buf.clear(); + let mut is_compressed = false; + if len < 128 { + // Short blob. Write a 1-byte length header + io_buf.put_u8(NO_COMPRESSION); + io_buf.put_u8(len as u8); + } else { + // Write a 4-byte length header + if len > 0x7fff_ffff { + return Err(Error::new( + ErrorKind::Other, + format!("blob too large ({} bytes)", len), + )); + } + if len == BLCKSZ as usize { + let compressed = lz4_flex::block::compress(&srcbuf); + if compressed.len() < len { + io_buf.put_u8(LZ4_COMPRESSION); + let mut len_buf = (compressed.len() as u32).to_be_bytes(); + len_buf[0] |= 0x80; + io_buf.extend_from_slice(&len_buf[..]); + io_buf.extend_from_slice(&compressed[..]); + is_compressed = true; + } + if is_compressed { + io_buf.put_u8(NO_COMPRESSION); + let mut len_buf = (len as u32).to_be_bytes(); + len_buf[0] |= 0x80; + io_buf.extend_from_slice(&len_buf[..]); + } + } + } + let (io_buf, hdr_res) = self.write_all(io_buf).await; + match hdr_res { + Ok(_) => (), + Err(e) => return Err(e), + } + self.io_buf = Some(io_buf); + if is_compressed { + hdr_res.map(|_| offset) + } else { + let (_buf, res) = self.write_all(srcbuf).await; + res.map(|_| offset) + } + } + /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. pub async fn write_blob, Buf: IoBuf + Send>( @@ -227,7 +305,6 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf).await } else { // Write a 4-byte length header if len > 0x7fff_ffff { @@ -242,8 +319,8 @@ impl BlobWriter { let mut len_buf = (len as u32).to_be_bytes(); len_buf[0] |= 0x80; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf).await } + self.write_all(io_buf).await } .await; self.io_buf = Some(io_buf); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 14c79e413c..0ca356646a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; -use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; +use crate::{ + COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX, +}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; @@ -153,6 +155,7 @@ pub struct ImageLayerInner { // values copied from summary index_start_blk: u32, index_root_blk: u32, + format_version: u16, lsn: Lsn, @@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner { f.debug_struct("ImageLayerInner") .field("index_start_blk", &self.index_start_blk) .field("index_root_blk", &self.index_root_blk) + .field("format_version", &self.format_version) .finish() } } @@ -408,6 +412,7 @@ impl ImageLayerInner { Ok(Ok(ImageLayerInner { index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, + format_version: actual_summary.format_version, lsn, file, file_id, @@ -436,18 +441,20 @@ impl ImageLayerInner { ) .await? { - let blob = block_reader - .block_cursor() - .read_blob( - offset, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::ImageLayerValue) - .build(), - ) - .await - .with_context(|| format!("failed to read value from offset {}", offset))?; - let value = Bytes::from(blob); + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerValue) + .build(); + let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION { + block_reader + .block_cursor() + .read_compressed_blob(offset, &ctx) + .await + } else { + block_reader.block_cursor().read_blob(offset, &ctx).await + }) + .with_context(|| format!("failed to read value from offset {}", offset))?; + let value = Bytes::from(blob); reconstruct_state.img = Some((self.lsn, value)); Ok(ValueReconstructResult::Complete) } else { @@ -658,10 +665,7 @@ impl ImageLayerWriterInner { /// async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_blob(img).await; - // TODO: re-use the buffer for `img` further upstack - let off = res?; - + let off = self.blob_writer.write_compressed_blob(img).await?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); self.tree.append(&keybuf, off)?;