Compare commits

...

36 Commits

Author SHA1 Message Date
Arpad Müller
5e32cce2d9 Add script for plots 2024-06-22 18:14:05 +02:00
Arpad Müller
6b67135bd3 Fix offset stream issue 2024-06-18 17:50:47 +02:00
Arpad Müller
66d3bef947 More printing and assertions 2024-06-18 17:50:47 +02:00
Arpad Müller
e749e73ad6 Add the compression algo name 2024-06-18 17:50:47 +02:00
Arpad Müller
c5034f0e45 Fix build 2024-06-18 17:50:47 +02:00
Arpad Müller
d2533c06a5 Always delete the file, even on error 2024-06-18 17:50:47 +02:00
Arpad Müller
0fdeca882c Fix tests 2024-06-18 17:50:47 +02:00
Arpad Müller
f1bebda713 Fix failing test 2024-06-18 17:50:47 +02:00
Arpad Müller
983972f812 Add tests for compression 2024-06-18 17:50:47 +02:00
Arpad Müller
2ea8d1b151 Shutdown instead of flush 2024-06-18 17:50:47 +02:00
Arpad Müller
2f70221503 Don't forget the flush 2024-06-18 17:50:47 +02:00
Arpad Müller
07bd0ce69e Also measure decompression time 2024-06-18 17:50:47 +02:00
Arpad Müller
40e79712eb Add decompression 2024-06-18 17:50:47 +02:00
Arpad Müller
88b24e1593 Move constants out into file 2024-06-18 17:50:47 +02:00
Arpad Müller
8fcdc22283 Add stats info 2024-06-18 17:50:47 +02:00
Arpad Müller
2eb8b428cc Also support the generation-less legacy naming scheme 2024-06-18 17:50:47 +02:00
Arpad Müller
e6a0e7ec61 Add zstd with low compression quality 2024-06-18 17:50:47 +02:00
Arpad Müller
843d996cb1 More precise printing 2024-06-18 17:50:47 +02:00
Arpad Müller
c824ffe1dc Add ZstdHigh compression mode 2024-06-18 17:50:47 +02:00
Arpad Müller
dadbd87ac1 Add percent to output 2024-06-18 17:50:47 +02:00
Arpad Müller
0e667dcd93 more yielding 2024-06-18 17:50:47 +02:00
Arpad Müller
14447b98ce Yield in between 2024-06-18 17:50:47 +02:00
Arpad Müller
8fcb236783 Increase listing limit 2024-06-18 17:50:47 +02:00
Arpad Müller
a9963db8c3 Create timeline dir in temp location if not existent 2024-06-18 17:50:47 +02:00
Arpad Müller
0c500450fe Print error better 2024-06-18 17:50:47 +02:00
Arpad Müller
3182c3361a Corrections 2024-06-18 17:50:47 +02:00
Arpad Müller
80803ff098 Printing tweaks 2024-06-18 17:50:47 +02:00
Arpad Müller
9b74d554b4 Remove generation suffix 2024-06-18 17:50:47 +02:00
Arpad Müller
fce252fb2c Rename dest_path to tmp_dir 2024-06-18 17:50:47 +02:00
Arpad Müller
f132658bd9 Some prints 2024-06-18 17:50:47 +02:00
Arpad Müller
d030cbffec Print number of keys 2024-06-18 17:50:47 +02:00
Arpad Müller
554a6bd4a6 Two separate commands
More easy to have an overview
2024-06-18 17:50:47 +02:00
Arpad Müller
9850794250 Remote layer file after done 2024-06-18 17:50:47 +02:00
Arpad Müller
f5baac2579 clippy 2024-06-18 17:50:47 +02:00
Arpad Müller
2d37db234a Add mode to compare multiple files from a tenant 2024-06-18 17:50:47 +02:00
Arpad Müller
8745c0d6f2 Add a pagectl tool to recompress image layers 2024-06-18 17:50:47 +02:00
9 changed files with 614 additions and 28 deletions

10
Cargo.lock generated
View File

@@ -2946,6 +2946,15 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3612,6 +3621,7 @@ dependencies = [
"hyper 0.14.26",
"itertools",
"leaky-bucket",
"lz4_flex",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -110,6 +110,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11"
md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }

View File

@@ -455,6 +455,26 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
enum_map::Enum,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
ZstdLow,
Zstd,
ZstdHigh,
LZ4,
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -1,4 +1,7 @@
use std::num::NonZeroU32;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
@@ -8,7 +11,7 @@ use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{delta_layer, image_layer, LayerName};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
@@ -20,7 +23,12 @@ use pageserver::{
},
virtual_file::VirtualFile,
};
use pageserver_api::models::ImageCompressionAlgorithm;
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig};
use std::fs;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -55,6 +63,17 @@ pub(crate) enum LayerCmd {
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
CompressOne {
dest_path: Utf8PathBuf,
layer_file_path: Utf8PathBuf,
},
CompressMany {
tmp_dir: Utf8PathBuf,
tenant_remote_prefix: String,
tenant_remote_config: String,
layers_dir: Utf8PathBuf,
parallelism: Option<u32>,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -240,5 +259,138 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
LayerCmd::CompressOne {
dest_path,
layer_file_path,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let stats =
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
println!(
"Statistics: {stats:#?}\n{}",
serde_json::to_string(&stats).unwrap()
);
Ok(())
}
LayerCmd::CompressMany {
tmp_dir,
tenant_remote_prefix,
tenant_remote_config,
layers_dir,
parallelism,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
let storage = Arc::new(storage);
let cancel = CancellationToken::new();
let path = RemotePath::from_string(tenant_remote_prefix)?;
let max_files = NonZeroU32::new(128_000);
let files_list = storage
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
.await?;
println!("Listing gave {} keys", files_list.keys.len());
tokio::fs::create_dir_all(&layers_dir).await?;
let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));
let mut tasks = JoinSet::new();
for (file_idx, file_key) in files_list.keys.iter().enumerate() {
let Some(file_name) = file_key.object_name() else {
continue;
};
match LayerName::from_str(file_name) {
Ok(LayerName::Delta(_)) => continue,
Ok(LayerName::Image(_)) => (),
Err(_e) => {
// Split off the final part. We ensured above that this is not turning a
// generation-less delta layer file name into an image layer file name.
let Some(file_without_generation) = file_name.rsplit_once('-') else {
continue;
};
let Ok(LayerName::Image(_layer_file_name)) =
LayerName::from_str(file_without_generation.0)
else {
// Skipping because it's either not a layer or an image layer
//println!("object {file_name}: not an image layer");
continue;
};
}
}
let json_file_path = layers_dir.join(format!("{file_name}.json"));
if tokio::fs::try_exists(&json_file_path).await? {
//println!("object {file_name}: report already created");
// If we have already created a report for the layer, skip it.
continue;
}
let local_layer_path = layers_dir.join(file_name);
async fn stats(
semaphore: Arc<Semaphore>,
local_layer_path: Utf8PathBuf,
json_file_path: Utf8PathBuf,
tmp_dir: Utf8PathBuf,
storage: Arc<GenericRemoteStorage>,
file_key: RemotePath,
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>, anyhow::Error>
{
let _permit = semaphore.acquire().await?;
let cancel = CancellationToken::new();
let download = storage.download(&file_key, &cancel).await?;
let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?;
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
println!("Downloaded file to {local_layer_path}");
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let stats =
ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx)
.await?;
let stats_str = serde_json::to_string(&stats).unwrap();
tokio::fs::write(json_file_path, stats_str).await?;
tokio::fs::remove_file(&local_layer_path).await?;
Ok(stats)
}
let semaphore = semaphore.clone();
let file_key = file_key.to_owned();
let storage = storage.clone();
let tmp_dir = tmp_dir.to_owned();
let file_name = file_name.to_owned();
let percent = (file_idx * 100) as f64 / files_list.keys.len() as f64;
tasks.spawn(async move {
let stats = stats(
semaphore,
local_layer_path.to_owned(),
json_file_path.to_owned(),
tmp_dir,
storage,
file_key,
)
.await;
match stats {
Ok(stats) => {
println!("Statistics for {file_name} ({percent:.1}%): {stats:?}\n")
}
Err(e) => eprintln!("Error for {file_name}: {e:?}"),
};
});
}
while let Some(_res) = tasks.join_next().await {}
Ok(())
}
}
}

View File

@@ -5,7 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
@@ -55,6 +55,7 @@ pub mod defaults {
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
use pageserver_api::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
@@ -95,6 +96,8 @@ pub mod defaults {
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
@@ -290,6 +293,8 @@ pub struct PageServerConf {
pub validate_vectored_get: bool,
pub image_compression: Option<ImageCompressionAlgorithm>,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
@@ -400,6 +405,8 @@ struct PageServerConfigBuilder {
validate_vectored_get: BuilderValue<bool>,
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
}
@@ -487,6 +494,7 @@ impl PageServerConfigBuilder {
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
}
@@ -672,6 +680,10 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
self.image_compression = BuilderValue::Set(value);
}
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}
@@ -732,6 +744,7 @@ impl PageServerConfigBuilder {
get_impl,
max_vectored_read_bytes,
validate_vectored_get,
image_compression,
ephemeral_bytes_per_memory_kb,
}
CUSTOM LOGIC
@@ -1026,6 +1039,9 @@ impl PageServerConf {
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"image_compression" => {
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
@@ -1110,6 +1126,7 @@ impl PageServerConf {
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
}
@@ -1350,6 +1367,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
},
"Correct defaults should be used when no config values are provided"
@@ -1423,6 +1441,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
},
"Should be able to parse all basic config values correctly"

View File

@@ -11,7 +11,10 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use async_compression::Level;
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
@@ -66,12 +69,29 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
len_buf[0] &= 0x7f;
len_buf[0] &= 0x0f;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & 0xf0;
dstbuf.clear();
dstbuf.reserve(len);
let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 {
buf_to_write = &mut tmp_buf;
Some(dstbuf)
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
};
buf_to_write.clear();
buf_to_write.reserve(len);
// Read the payload
let mut remain = len;
@@ -85,14 +105,38 @@ impl<'a> BlockCursor<'a> {
page_remain = PAGE_SZ;
}
let this_blk_len = min(remain, page_remain);
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
remain -= this_blk_len;
off += this_blk_len;
}
if let Some(dstbuf) = compression {
if compression_bits == BYTE_ZSTD {
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
decoder.write_all(buf_to_write).await?;
decoder.flush().await?;
} else if compression_bits == BYTE_LZ4 {
let decompressed = lz4_flex::block::decompress_size_prepended(&buf_to_write)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("lz4 decompression error: {e:?}"),
)
})?;
dstbuf.extend_from_slice(&decompressed);
} else {
unreachable!("already checked above")
}
}
Ok(())
}
}
const BYTE_UNCOMPRESSED: u8 = 0x80;
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
const BYTE_LZ4: u8 = BYTE_UNCOMPRESSED | 0x20;
/// A wrapper of `VirtualFile` that allows users to write blobs.
///
/// If a `BlobWriter` is dropped, the internal buffer will be
@@ -219,6 +263,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
self.write_blob_compressed(srcbuf, ctx, None).await
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
ctx: &RequestContext,
algorithm: Option<ImageCompressionAlgorithm>,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
@@ -226,29 +281,75 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let (io_buf, hdr_res) = async {
let mut compressed_buf = None;
let ((io_buf, hdr_res), srcbuf) = async {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf, ctx).await
(
self.write_all(io_buf, ctx).await,
srcbuf.slice(..).into_inner(),
)
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
if len > 0x0fff_ffff {
return (
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
(
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
),
srcbuf.slice(..).into_inner(),
);
}
if len > 0x0fff_ffff {
tracing::warn!("writing blob above future limit ({len} bytes)");
}
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
use ImageCompressionAlgorithm::*;
let (high_bit_mask, len_written, srcbuf) = match algorithm {
Some(ZstdLow | Zstd | ZstdHigh) => {
let mut encoder = if matches!(algorithm, Some(ZstdLow)) {
async_compression::tokio::write::ZstdEncoder::with_quality(
Vec::new(),
Level::Precise(1),
)
} else if matches!(algorithm, Some(ZstdHigh)) {
async_compression::tokio::write::ZstdEncoder::with_quality(
Vec::new(),
Level::Precise(6),
)
} else {
async_compression::tokio::write::ZstdEncoder::new(Vec::new())
};
let slice = srcbuf.slice(..);
encoder.write_all(&slice[..]).await.unwrap();
encoder.shutdown().await.unwrap();
let compressed = encoder.into_inner();
if compressed.len() < len {
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_ZSTD, compressed_len, slice.into_inner())
} else {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
}
}
Some(ImageCompressionAlgorithm::LZ4) => {
let slice = srcbuf.slice(..);
let compressed = lz4_flex::block::compress_prepend_size(&slice[..]);
if compressed.len() < len {
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_LZ4, compressed_len, slice.into_inner())
} else {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
}
}
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice(..).into_inner()),
};
let mut len_buf = (len_written as u32).to_be_bytes();
assert_eq!(len_buf[0] & 0xf0, 0);
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf, ctx).await
(self.write_all(io_buf, ctx).await, srcbuf)
}
}
.await;
@@ -257,7 +358,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
}
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
(Slice::into_inner(srcbuf.slice(..)), res)
} else {
self.write_all(srcbuf, ctx).await
};
(srcbuf, res.map(|_| offset))
}
}
@@ -295,6 +401,12 @@ mod tests {
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
}
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
blobs: &[Vec<u8>],
) -> Result<(), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
@@ -305,7 +417,26 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
let (_, res) = match COMPRESSION {
0 => wtr.write_blob(blob.clone(), &ctx).await,
1 => {
wtr.write_blob_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::ZstdLow),
)
.await
}
2 => {
wtr.write_blob_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::LZ4),
)
.await
}
_ => unreachable!("Invalid compression {COMPRESSION}"),
};
let offs = res?;
offsets.push(offs);
}
@@ -361,10 +492,17 @@ mod tests {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
b"hello".to_vec(),
random_array(66 * PAGE_SZ),
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, 1>(blobs).await?;
round_trip_test_compressed::<true, 1>(blobs).await?;
round_trip_test_compressed::<false, 2>(blobs).await?;
round_trip_test_compressed::<true, 2>(blobs).await?;
Ok(())
}

View File

@@ -46,17 +46,19 @@ use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::MetadataExt;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use tracing::*;
@@ -366,6 +368,170 @@ impl ImageLayer {
res?;
Ok(())
}
pub async fn compression_statistics(
dest_repo_path: &Utf8Path,
path: &Utf8Path,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>> {
fn make_conf(
image_compression: Option<ImageCompressionAlgorithm>,
dest_repo_path: &Utf8Path,
) -> &'static PageServerConf {
let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned());
conf.image_compression = image_compression;
Box::leak(Box::new(conf))
}
let image_compressions = [
None,
Some(ImageCompressionAlgorithm::ZstdLow),
Some(ImageCompressionAlgorithm::Zstd),
Some(ImageCompressionAlgorithm::ZstdHigh),
Some(ImageCompressionAlgorithm::LZ4),
];
let confs = image_compressions
.clone()
.map(|compression| make_conf(compression, dest_repo_path));
let mut stats = Vec::new();
for (image_compression, conf) in image_compressions.into_iter().zip(confs) {
let start_compression = Instant::now();
let compressed_path = Self::compress_for_conf(path, ctx, conf).await?;
let path_to_delete = compressed_path.clone();
scopeguard::defer!({
let _ = std::fs::remove_file(path_to_delete);
});
let size = path.metadata()?.size();
let elapsed_ms = start_compression.elapsed().as_millis() as u64;
let start_decompression = Instant::now();
Self::compare_are_equal(path, &compressed_path, ctx, &image_compression).await?;
let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64;
stats.push((
image_compression,
size,
elapsed_ms,
elapsed_decompression_ms,
));
tokio::task::yield_now().await;
}
Ok(stats)
}
async fn compress_for_conf(
path: &Utf8Path,
ctx: &RequestContext,
conf: &'static PageServerConf,
) -> anyhow::Result<Utf8PathBuf> {
let file =
VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
if summary.magic != IMAGE_FILE_MAGIC {
anyhow::bail!("magic file mismatch");
}
let tree_reader = DiskBtreeReader::new(
summary.index_start_blk,
summary.index_root_blk,
&block_reader,
);
let mut key_offset_stream =
std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx));
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id);
tokio::fs::create_dir_all(timeline_path).await?;
let mut writer = ImageLayerWriter::new(
conf,
summary.timeline_id,
tenant_shard_id,
&summary.key_range,
summary.lsn,
ctx,
)
.await?;
let cursor = block_reader.block_cursor();
while let Some(r) = key_offset_stream.next().await {
let (key, offset) = r?;
let key = Key::from_slice(&key);
let content = cursor.read_blob(offset, ctx).await?;
writer.put_image(key, content.into(), ctx).await?;
}
let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2;
Ok(path)
}
async fn compare_are_equal(
path_a: &Utf8Path,
path_b: &Utf8Path,
ctx: &RequestContext,
cmp: &Option<ImageCompressionAlgorithm>,
) -> anyhow::Result<()> {
let mut files = Vec::new();
for path in [path_a, path_b] {
let file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
files.push(file);
}
let mut readers_summaries = Vec::new();
for file in files.iter() {
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
if summary.magic != IMAGE_FILE_MAGIC {
anyhow::bail!("magic file mismatch");
}
readers_summaries.push((block_reader, summary));
}
let mut tree_readers_cursors = Vec::new();
for (block_reader, summary) in readers_summaries.iter() {
let tree_reader = DiskBtreeReader::new(
summary.index_start_blk,
summary.index_root_blk,
block_reader,
);
let cursor = block_reader.block_cursor();
tree_readers_cursors.push((tree_reader, cursor));
}
let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0]
.0
.get_stream_from(&[0u8; KEY_SIZE], ctx));
let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[1]
.0
.get_stream_from(&[0u8; KEY_SIZE], ctx));
while let Some(r) = key_offset_stream_a.next().await {
let (key_a, offset_a): (Vec<u8>, _) = r?;
let Some(r) = key_offset_stream_b.next().await else {
panic!("second file at {path_b} has fewer keys than {path_a}");
};
let (key_b, offset_b): (Vec<u8>, _) = r?;
assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}");
let key = Key::from_slice(&key_a);
let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?;
let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?;
assert_eq!(
content_a, content_b,
"mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}"
);
//println!("match for key={key} cmp={cmp:?} from {path_a}");
}
Ok(())
}
}
impl ImageLayerInner {
@@ -782,7 +948,10 @@ impl ImageLayerWriterInner {
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
let (_img, res) = self
.blob_writer
.write_blob_compressed(img, ctx, self.conf.image_compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
@@ -796,11 +965,10 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
async fn finish(
async fn finish_inner(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -854,8 +1022,16 @@ impl ImageLayerWriterInner {
// fsync the file
file.sync_all().await?;
Ok((self.conf, desc, self.path))
}
async fn finish(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
let (conf, desc, path) = self.finish_inner(ctx).await?;
// FIXME: why not carry the virtualfile here, it supports renaming?
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
info!("created image layer {}", layer.local_path());
@@ -923,6 +1099,12 @@ impl ImageLayerWriter {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
/// Obtains the current size of the file
pub(crate) fn size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
}
///
/// Finish writing the image layer.
///

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env -S python3 -u
import argparse
import json
import os
from pprint import pprint
import matplotlib.pyplot as plt
parser = argparse.ArgumentParser(prog="compression-report")
parser.add_argument("dir")
args = parser.parse_args()
files = []
for file_name in os.listdir(args.dir):
if not file_name.endswith(".json"):
continue
file_path = os.path.join(args.dir, file_name)
with open(file_path) as json_str:
json_data = json.load(json_str)
files.append((file_name, json_data))
#pprint(files)
extra_zstd_lines = True
dc = 2 # data column to use (1 for sizes, 2 for time)
sort_by = "ZstdHigh"
files.sort(key=lambda file_data: [x for x in file_data[1] if x[0] == sort_by][0][dc])
x_axis = []
data_baseline = []
data_lz4 = []
data_zstd = []
data_zstd_low = []
data_zstd_high = []
for idx, f in enumerate(files):
file_data = f[1]
#pprint(file_data)
x_axis.append(idx)
data_baseline.append([x for x in file_data if x[0] is None][0][dc])
data_lz4.append([x for x in file_data if x[0] == "LZ4"][0][dc])
data_zstd.append([x for x in file_data if x[0] == "Zstd"][0][dc])
if extra_zstd_lines:
data_zstd_low.append([x for x in file_data if x[0] == "ZstdLow"][0][dc])
data_zstd_high.append([x for x in file_data if x[0] == "ZstdHigh"][0][dc])
plt.plot(x_axis, data_baseline, "x", markeredgewidth=2, label="baseline")
plt.plot(x_axis, data_lz4, "x", markeredgewidth=2, label="lz4")
plt.plot(x_axis, data_zstd, "x", markeredgewidth=2, label="Zstd")
if extra_zstd_lines:
plt.plot(x_axis, data_zstd_low, "x", markeredgewidth=2, label="ZstdLow")
plt.plot(x_axis, data_zstd_high, "x", markeredgewidth=2, label="ZstdHigh")
# plt.style.use('_mpl-gallery')
plt.ylim(bottom=0)
plt.legend(loc="upper left")
figure_path = os.path.join(args.dir, "figure.png")
print(f"saving figure to {figure_path}")
plt.savefig(figure_path)
plt.show()