From 8745c0d6f22f7a1b91ed5e6141c8653b3955c4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 24 May 2024 03:00:06 +0200 Subject: [PATCH] Add a pagectl tool to recompress image layers --- Cargo.lock | 10 +++ Cargo.toml | 1 + libs/pageserver_api/src/models.rs | 18 ++++ pageserver/Cargo.toml | 1 + pageserver/ctl/src/layers.rs | 21 +++++ pageserver/src/config.rs | 19 +++- pageserver/src/tenant/blob_io.rs | 84 +++++++++++++---- .../src/tenant/storage_layer/image_layer.rs | 90 ++++++++++++++++++- 8 files changed, 226 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c8a8b0c0f..9403044b3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2946,6 +2946,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3612,6 +3621,7 @@ dependencies = [ "hyper 0.14.26", "itertools", "leaky-bucket", + "lz4_flex", "md5", "metrics", "nix 0.27.1", diff --git a/Cargo.toml b/Cargo.toml index 8fddaaef12..830bd7cb80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ jsonwebtoken = "9" lasso = "0.7" leaky-bucket = "1.0.1" libc = "0.2" +lz4_flex = "0.11" md5 = "0.7.0" measured = { version = "0.0.21", features=["lasso"] } measured-process = { version = "0.0.21" } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9311dab33c..318dd307d7 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -455,6 +455,24 @@ pub enum CompactionAlgorithm { Tiered, } +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Serialize, + Deserialize, + strum_macros::FromRepr, + strum_macros::EnumString, + enum_map::Enum, +)] +#[strum(serialize_all = "kebab-case")] +pub enum ImageCompressionAlgorithm { + Zstd, + LZ4, +} + #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct CompactionAlgorithmSettings { pub kind: CompactionAlgorithm, diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 4335f38f1e..2c0dede89c 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true leaky-bucket.workspace = true +lz4_flex.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 3611b0baab..340718ef1a 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -55,6 +55,10 @@ pub(crate) enum LayerCmd { #[clap(long)] new_timeline_id: Option, }, + Compress { + dest_path: Utf8PathBuf, + layer_file_path: Utf8PathBuf, + }, } async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { @@ -240,5 +244,22 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { anyhow::bail!("not an image or delta layer: {layer_file_path}"); } + LayerCmd::Compress { + dest_path, + layer_file_path, + } => { + pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::page_cache::init(100); + + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + + let stats = + ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; + println!( + "Statistics: {stats:#?}\n{}", + serde_json::to_string(&stats).unwrap() + ); + return Ok(()); + } } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index b4a0d1ac02..a7da4f55be 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -5,7 +5,7 @@ //! See also `settings.md` for better description on every parameter. use anyhow::{anyhow, bail, ensure, Context, Result}; -use pageserver_api::shard::TenantShardId; +use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId}; use remote_storage::{RemotePath, RemoteStorageConfig}; use serde; use serde::de::IntoDeserializer; @@ -55,6 +55,7 @@ pub mod defaults { DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_PG_LISTEN_PORT, }; + use pageserver_api::models::ImageCompressionAlgorithm; pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT; pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; @@ -95,6 +96,8 @@ pub mod defaults { pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB + pub const DEFAULT_IMAGE_COMPRESSION: Option = None; + pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true; pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; @@ -290,6 +293,8 @@ pub struct PageServerConf { pub validate_vectored_get: bool, + pub image_compression: Option, + /// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this /// is exceeded, we start proactively closing ephemeral layers to limit the total amount /// of ephemeral data. @@ -400,6 +405,8 @@ struct PageServerConfigBuilder { validate_vectored_get: BuilderValue, + image_compression: BuilderValue>, + ephemeral_bytes_per_memory_kb: BuilderValue, } @@ -487,6 +494,7 @@ impl PageServerConfigBuilder { max_vectored_read_bytes: Set(MaxVectoredReadBytes( NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(), )), + image_compression: Set(DEFAULT_IMAGE_COMPRESSION), validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET), ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), } @@ -672,6 +680,10 @@ impl PageServerConfigBuilder { self.validate_vectored_get = BuilderValue::Set(value); } + pub fn get_image_compression(&mut self, value: Option) { + self.image_compression = BuilderValue::Set(value); + } + pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) { self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value); } @@ -732,6 +744,7 @@ impl PageServerConfigBuilder { get_impl, max_vectored_read_bytes, validate_vectored_get, + image_compression, ephemeral_bytes_per_memory_kb, } CUSTOM LOGIC @@ -1026,6 +1039,9 @@ impl PageServerConf { "validate_vectored_get" => { builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?) } + "image_compression" => { + builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?)) + } "ephemeral_bytes_per_memory_kb" => { builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize) } @@ -1110,6 +1126,7 @@ impl PageServerConf { NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES) .expect("Invalid default constant"), ), + image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 2be8816cef..919b1a5162 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -12,6 +12,8 @@ //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use bytes::{BufMut, BytesMut}; +use pageserver_api::models::ImageCompressionAlgorithm; +use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; @@ -219,6 +221,17 @@ impl BlobWriter { &mut self, srcbuf: B, ctx: &RequestContext, + ) -> (B::Buf, Result) { + self.write_blob_compressed(srcbuf, ctx, None).await + } + + /// Write a blob of data. Returns the offset that it was written to, + /// which can be used to retrieve the data later. + pub async fn write_blob_compressed, Buf: IoBuf + Send>( + &mut self, + srcbuf: B, + ctx: &RequestContext, + algorithm: Option, ) -> (B::Buf, Result) { let offset = self.offset; @@ -226,29 +239,65 @@ impl BlobWriter { let mut io_buf = self.io_buf.take().expect("we always put it back below"); io_buf.clear(); - let (io_buf, hdr_res) = async { + let mut compressed_buf = None; + let ((io_buf, hdr_res), srcbuf) = async { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf, ctx).await + ( + self.write_all(io_buf, ctx).await, + srcbuf.slice(..).into_inner(), + ) } else { // Write a 4-byte length header - if len > 0x7fff_ffff { + if len > 0x0fff_ffff { return ( - io_buf, - Err(Error::new( - ErrorKind::Other, - format!("blob too large ({len} bytes)"), - )), + ( + io_buf, + Err(Error::new( + ErrorKind::Other, + format!("blob too large ({len} bytes)"), + )), + ), + srcbuf.slice(..).into_inner(), ); } - if len > 0x0fff_ffff { - tracing::warn!("writing blob above future limit ({len} bytes)"); - } - let mut len_buf = (len as u32).to_be_bytes(); - len_buf[0] |= 0x80; + const UNCOMPRESSED: u8 = 0x80; + const ZSTD: u8 = UNCOMPRESSED | 0x10; + const LZ4: u8 = UNCOMPRESSED | 0x20; + let (high_bit_mask, len_written, srcbuf) = match algorithm { + Some(ImageCompressionAlgorithm::Zstd) => { + let mut encoder = + async_compression::tokio::write::ZstdEncoder::new(Vec::new()); + let slice = srcbuf.slice(..); + encoder.write_all(&slice[..]).await.unwrap(); + encoder.flush().await.unwrap(); + let compressed = encoder.into_inner(); + if compressed.len() < len { + let compressed_len = len; + compressed_buf = Some(compressed); + (ZSTD, compressed_len, slice.into_inner()) + } else { + (0x80, len, slice.into_inner()) + } + } + Some(ImageCompressionAlgorithm::LZ4) => { + let slice = srcbuf.slice(..); + let compressed = lz4_flex::block::compress(&slice[..]); + if compressed.len() < len { + let compressed_len = len; + compressed_buf = Some(compressed); + (LZ4, compressed_len, slice.into_inner()) + } else { + (0x80, len, slice.into_inner()) + } + } + None => (0x80, len, srcbuf.slice(..).into_inner()), + }; + let mut len_buf = (len_written as u32).to_be_bytes(); + len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf, ctx).await + (self.write_all(io_buf, ctx).await, srcbuf) } } .await; @@ -257,7 +306,12 @@ impl BlobWriter { Ok(_) => (), Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), } - let (srcbuf, res) = self.write_all(srcbuf, ctx).await; + let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf { + let (_buf, res) = self.write_all(compressed_buf, ctx).await; + (Slice::into_inner(srcbuf.slice(..)), res) + } else { + self.write_all(srcbuf, ctx).await + }; (srcbuf, res.map(|_| offset)) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 06e2f09384..193012d8ff 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -46,7 +46,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::LayerAccessKind; +use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind}; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; @@ -366,6 +366,83 @@ impl ImageLayer { res?; Ok(()) } + pub async fn compression_statistics( + dest_repo_path: &Utf8Path, + path: &Utf8Path, + ctx: &RequestContext, + ) -> anyhow::Result, u64)>> { + fn make_conf( + image_compression: Option, + dest_repo_path: &Utf8Path, + ) -> &'static PageServerConf { + let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned()); + conf.image_compression = image_compression; + Box::leak(Box::new(conf)) + } + let image_compressions = [ + None, + Some(ImageCompressionAlgorithm::Zstd), + Some(ImageCompressionAlgorithm::LZ4), + ]; + let mut stats = Vec::new(); + for image_compression in image_compressions { + let size = Self::compressed_size_for_conf( + path, + ctx, + make_conf(image_compression, dest_repo_path), + ) + .await?; + stats.push((image_compression, size)); + } + Ok(stats) + } + + async fn compressed_size_for_conf( + path: &Utf8Path, + ctx: &RequestContext, + conf: &'static PageServerConf, + ) -> anyhow::Result { + let file = + VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx) + .await + .with_context(|| format!("Failed to open file '{}'", path))?; + + let file_id = page_cache::next_file_id(); + let block_reader = FileBlockReader::new(&file, file_id); + let summary_blk = block_reader.read_blk(0, ctx).await?; + let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; + if summary.magic != IMAGE_FILE_MAGIC { + anyhow::bail!("magic file mismatch"); + } + + let tree_reader = DiskBtreeReader::new( + summary.index_start_blk, + summary.index_root_blk, + &block_reader, + ); + + let mut key_offset_stream = + std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx)); + + let mut writer = ImageLayerWriter::new( + conf, + summary.timeline_id, + TenantShardId::unsharded(summary.tenant_id), + &summary.key_range, + summary.lsn, + ctx, + ) + .await?; + + let cursor = block_reader.block_cursor(); + while let Some(r) = key_offset_stream.next().await { + let (key, offset) = r?; + let key = Key::from_slice(&key); + let content = cursor.read_blob(offset, ctx).await?; + writer.put_image(key, content.into(), ctx).await?; + } + Ok(writer.size()) + } } impl ImageLayerInner { @@ -782,7 +859,10 @@ impl ImageLayerWriterInner { ctx: &RequestContext, ) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_blob(img, ctx).await; + let (_img, res) = self + .blob_writer + .write_blob_compressed(img, ctx, self.conf.image_compression) + .await; // TODO: re-use the buffer for `img` further upstack let off = res?; @@ -923,6 +1003,12 @@ impl ImageLayerWriter { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } + /// Obtains the current size of the file + pub(crate) fn size(&self) -> u64 { + let inner = self.inner.as_ref().unwrap(); + inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 + } + /// /// Finish writing the image layer. ///