Add a pagectl tool to recompress image layers

This commit is contained in:
Arpad Müller
2024-05-24 03:00:06 +02:00
parent 6c6a7f9ace
commit 8745c0d6f2
8 changed files with 226 additions and 18 deletions

10
Cargo.lock generated
View File

@@ -2946,6 +2946,15 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3612,6 +3621,7 @@ dependencies = [
"hyper 0.14.26",
"itertools",
"leaky-bucket",
"lz4_flex",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -110,6 +110,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11"
md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }

View File

@@ -455,6 +455,24 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
enum_map::Enum,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
Zstd,
LZ4,
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -55,6 +55,10 @@ pub(crate) enum LayerCmd {
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
Compress {
dest_path: Utf8PathBuf,
layer_file_path: Utf8PathBuf,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -240,5 +244,22 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
LayerCmd::Compress {
dest_path,
layer_file_path,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let stats =
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
println!(
"Statistics: {stats:#?}\n{}",
serde_json::to_string(&stats).unwrap()
);
return Ok(());
}
}
}

View File

@@ -5,7 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
@@ -55,6 +55,7 @@ pub mod defaults {
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
use pageserver_api::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
@@ -95,6 +96,8 @@ pub mod defaults {
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
@@ -290,6 +293,8 @@ pub struct PageServerConf {
pub validate_vectored_get: bool,
pub image_compression: Option<ImageCompressionAlgorithm>,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
@@ -400,6 +405,8 @@ struct PageServerConfigBuilder {
validate_vectored_get: BuilderValue<bool>,
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
}
@@ -487,6 +494,7 @@ impl PageServerConfigBuilder {
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
}
@@ -672,6 +680,10 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
self.image_compression = BuilderValue::Set(value);
}
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}
@@ -732,6 +744,7 @@ impl PageServerConfigBuilder {
get_impl,
max_vectored_read_bytes,
validate_vectored_get,
image_compression,
ephemeral_bytes_per_memory_kb,
}
CUSTOM LOGIC
@@ -1026,6 +1039,9 @@ impl PageServerConf {
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"image_compression" => {
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
@@ -1110,6 +1126,7 @@ impl PageServerConf {
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
}

View File

@@ -12,6 +12,8 @@
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
@@ -219,6 +221,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
self.write_blob_compressed(srcbuf, ctx, None).await
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
ctx: &RequestContext,
algorithm: Option<ImageCompressionAlgorithm>,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
@@ -226,29 +239,65 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let (io_buf, hdr_res) = async {
let mut compressed_buf = None;
let ((io_buf, hdr_res), srcbuf) = async {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf, ctx).await
(
self.write_all(io_buf, ctx).await,
srcbuf.slice(..).into_inner(),
)
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
if len > 0x0fff_ffff {
return (
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
(
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
),
srcbuf.slice(..).into_inner(),
);
}
if len > 0x0fff_ffff {
tracing::warn!("writing blob above future limit ({len} bytes)");
}
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
const UNCOMPRESSED: u8 = 0x80;
const ZSTD: u8 = UNCOMPRESSED | 0x10;
const LZ4: u8 = UNCOMPRESSED | 0x20;
let (high_bit_mask, len_written, srcbuf) = match algorithm {
Some(ImageCompressionAlgorithm::Zstd) => {
let mut encoder =
async_compression::tokio::write::ZstdEncoder::new(Vec::new());
let slice = srcbuf.slice(..);
encoder.write_all(&slice[..]).await.unwrap();
encoder.flush().await.unwrap();
let compressed = encoder.into_inner();
if compressed.len() < len {
let compressed_len = len;
compressed_buf = Some(compressed);
(ZSTD, compressed_len, slice.into_inner())
} else {
(0x80, len, slice.into_inner())
}
}
Some(ImageCompressionAlgorithm::LZ4) => {
let slice = srcbuf.slice(..);
let compressed = lz4_flex::block::compress(&slice[..]);
if compressed.len() < len {
let compressed_len = len;
compressed_buf = Some(compressed);
(LZ4, compressed_len, slice.into_inner())
} else {
(0x80, len, slice.into_inner())
}
}
None => (0x80, len, srcbuf.slice(..).into_inner()),
};
let mut len_buf = (len_written as u32).to_be_bytes();
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf, ctx).await
(self.write_all(io_buf, ctx).await, srcbuf)
}
}
.await;
@@ -257,7 +306,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
}
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
(Slice::into_inner(srcbuf.slice(..)), res)
} else {
self.write_all(srcbuf, ctx).await
};
(srcbuf, res.map(|_| offset))
}
}

View File

@@ -46,7 +46,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
@@ -366,6 +366,83 @@ impl ImageLayer {
res?;
Ok(())
}
pub async fn compression_statistics(
dest_repo_path: &Utf8Path,
path: &Utf8Path,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64)>> {
fn make_conf(
image_compression: Option<ImageCompressionAlgorithm>,
dest_repo_path: &Utf8Path,
) -> &'static PageServerConf {
let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned());
conf.image_compression = image_compression;
Box::leak(Box::new(conf))
}
let image_compressions = [
None,
Some(ImageCompressionAlgorithm::Zstd),
Some(ImageCompressionAlgorithm::LZ4),
];
let mut stats = Vec::new();
for image_compression in image_compressions {
let size = Self::compressed_size_for_conf(
path,
ctx,
make_conf(image_compression, dest_repo_path),
)
.await?;
stats.push((image_compression, size));
}
Ok(stats)
}
async fn compressed_size_for_conf(
path: &Utf8Path,
ctx: &RequestContext,
conf: &'static PageServerConf,
) -> anyhow::Result<u64> {
let file =
VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
if summary.magic != IMAGE_FILE_MAGIC {
anyhow::bail!("magic file mismatch");
}
let tree_reader = DiskBtreeReader::new(
summary.index_start_blk,
summary.index_root_blk,
&block_reader,
);
let mut key_offset_stream =
std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx));
let mut writer = ImageLayerWriter::new(
conf,
summary.timeline_id,
TenantShardId::unsharded(summary.tenant_id),
&summary.key_range,
summary.lsn,
ctx,
)
.await?;
let cursor = block_reader.block_cursor();
while let Some(r) = key_offset_stream.next().await {
let (key, offset) = r?;
let key = Key::from_slice(&key);
let content = cursor.read_blob(offset, ctx).await?;
writer.put_image(key, content.into(), ctx).await?;
}
Ok(writer.size())
}
}
impl ImageLayerInner {
@@ -782,7 +859,10 @@ impl ImageLayerWriterInner {
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
let (_img, res) = self
.blob_writer
.write_blob_compressed(img, ctx, self.conf.image_compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
@@ -923,6 +1003,12 @@ impl ImageLayerWriter {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
/// Obtains the current size of the file
pub(crate) fn size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
}
///
/// Finish writing the image layer.
///