mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 12:52:55 +00:00
Compare commits
36 Commits
cloneable/
...
arpad/comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5e32cce2d9 | ||
|
|
6b67135bd3 | ||
|
|
66d3bef947 | ||
|
|
e749e73ad6 | ||
|
|
c5034f0e45 | ||
|
|
d2533c06a5 | ||
|
|
0fdeca882c | ||
|
|
f1bebda713 | ||
|
|
983972f812 | ||
|
|
2ea8d1b151 | ||
|
|
2f70221503 | ||
|
|
07bd0ce69e | ||
|
|
40e79712eb | ||
|
|
88b24e1593 | ||
|
|
8fcdc22283 | ||
|
|
2eb8b428cc | ||
|
|
e6a0e7ec61 | ||
|
|
843d996cb1 | ||
|
|
c824ffe1dc | ||
|
|
dadbd87ac1 | ||
|
|
0e667dcd93 | ||
|
|
14447b98ce | ||
|
|
8fcb236783 | ||
|
|
a9963db8c3 | ||
|
|
0c500450fe | ||
|
|
3182c3361a | ||
|
|
80803ff098 | ||
|
|
9b74d554b4 | ||
|
|
fce252fb2c | ||
|
|
f132658bd9 | ||
|
|
d030cbffec | ||
|
|
554a6bd4a6 | ||
|
|
9850794250 | ||
|
|
f5baac2579 | ||
|
|
2d37db234a | ||
|
|
8745c0d6f2 |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2946,6 +2946,15 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.11.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3612,6 +3621,7 @@ dependencies = [
|
||||
"hyper 0.14.26",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"lz4_flex",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -110,6 +110,7 @@ jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.21", features=["lasso"] }
|
||||
measured-process = { version = "0.0.21" }
|
||||
|
||||
@@ -455,6 +455,26 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
strum_macros::EnumString,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
ZstdLow,
|
||||
Zstd,
|
||||
ZstdHigh,
|
||||
LZ4,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
lz4_flex.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::num::NonZeroU32;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -8,7 +11,7 @@ use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer, LayerName};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
@@ -20,7 +23,12 @@ use pageserver::{
|
||||
},
|
||||
virtual_file::VirtualFile,
|
||||
};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig};
|
||||
use std::fs;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
@@ -55,6 +63,17 @@ pub(crate) enum LayerCmd {
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
CompressOne {
|
||||
dest_path: Utf8PathBuf,
|
||||
layer_file_path: Utf8PathBuf,
|
||||
},
|
||||
CompressMany {
|
||||
tmp_dir: Utf8PathBuf,
|
||||
tenant_remote_prefix: String,
|
||||
tenant_remote_config: String,
|
||||
layers_dir: Utf8PathBuf,
|
||||
parallelism: Option<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -240,5 +259,138 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
LayerCmd::CompressOne {
|
||||
dest_path,
|
||||
layer_file_path,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
|
||||
println!(
|
||||
"Statistics: {stats:#?}\n{}",
|
||||
serde_json::to_string(&stats).unwrap()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::CompressMany {
|
||||
tmp_dir,
|
||||
tenant_remote_prefix,
|
||||
tenant_remote_config,
|
||||
layers_dir,
|
||||
parallelism,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
|
||||
let storage = Arc::new(storage);
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let path = RemotePath::from_string(tenant_remote_prefix)?;
|
||||
let max_files = NonZeroU32::new(128_000);
|
||||
let files_list = storage
|
||||
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
|
||||
.await?;
|
||||
|
||||
println!("Listing gave {} keys", files_list.keys.len());
|
||||
|
||||
tokio::fs::create_dir_all(&layers_dir).await?;
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));
|
||||
|
||||
let mut tasks = JoinSet::new();
|
||||
for (file_idx, file_key) in files_list.keys.iter().enumerate() {
|
||||
let Some(file_name) = file_key.object_name() else {
|
||||
continue;
|
||||
};
|
||||
match LayerName::from_str(file_name) {
|
||||
Ok(LayerName::Delta(_)) => continue,
|
||||
Ok(LayerName::Image(_)) => (),
|
||||
Err(_e) => {
|
||||
// Split off the final part. We ensured above that this is not turning a
|
||||
// generation-less delta layer file name into an image layer file name.
|
||||
let Some(file_without_generation) = file_name.rsplit_once('-') else {
|
||||
continue;
|
||||
};
|
||||
let Ok(LayerName::Image(_layer_file_name)) =
|
||||
LayerName::from_str(file_without_generation.0)
|
||||
else {
|
||||
// Skipping because it's either not a layer or an image layer
|
||||
//println!("object {file_name}: not an image layer");
|
||||
continue;
|
||||
};
|
||||
}
|
||||
}
|
||||
let json_file_path = layers_dir.join(format!("{file_name}.json"));
|
||||
if tokio::fs::try_exists(&json_file_path).await? {
|
||||
//println!("object {file_name}: report already created");
|
||||
// If we have already created a report for the layer, skip it.
|
||||
continue;
|
||||
}
|
||||
let local_layer_path = layers_dir.join(file_name);
|
||||
async fn stats(
|
||||
semaphore: Arc<Semaphore>,
|
||||
local_layer_path: Utf8PathBuf,
|
||||
json_file_path: Utf8PathBuf,
|
||||
tmp_dir: Utf8PathBuf,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
file_key: RemotePath,
|
||||
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>, anyhow::Error>
|
||||
{
|
||||
let _permit = semaphore.acquire().await?;
|
||||
let cancel = CancellationToken::new();
|
||||
let download = storage.download(&file_key, &cancel).await?;
|
||||
let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?;
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
|
||||
println!("Downloaded file to {local_layer_path}");
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx)
|
||||
.await?;
|
||||
|
||||
let stats_str = serde_json::to_string(&stats).unwrap();
|
||||
tokio::fs::write(json_file_path, stats_str).await?;
|
||||
|
||||
tokio::fs::remove_file(&local_layer_path).await?;
|
||||
Ok(stats)
|
||||
}
|
||||
let semaphore = semaphore.clone();
|
||||
let file_key = file_key.to_owned();
|
||||
let storage = storage.clone();
|
||||
let tmp_dir = tmp_dir.to_owned();
|
||||
let file_name = file_name.to_owned();
|
||||
let percent = (file_idx * 100) as f64 / files_list.keys.len() as f64;
|
||||
tasks.spawn(async move {
|
||||
let stats = stats(
|
||||
semaphore,
|
||||
local_layer_path.to_owned(),
|
||||
json_file_path.to_owned(),
|
||||
tmp_dir,
|
||||
storage,
|
||||
file_key,
|
||||
)
|
||||
.await;
|
||||
match stats {
|
||||
Ok(stats) => {
|
||||
println!("Statistics for {file_name} ({percent:.1}%): {stats:?}\n")
|
||||
}
|
||||
Err(e) => eprintln!("Error for {file_name}: {e:?}"),
|
||||
};
|
||||
});
|
||||
}
|
||||
while let Some(_res) = tasks.join_next().await {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
//! See also `settings.md` for better description on every parameter.
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde;
|
||||
use serde::de::IntoDeserializer;
|
||||
@@ -55,6 +55,7 @@ pub mod defaults {
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
@@ -95,6 +96,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
@@ -290,6 +293,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub validate_vectored_get: bool,
|
||||
|
||||
pub image_compression: Option<ImageCompressionAlgorithm>,
|
||||
|
||||
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
|
||||
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
|
||||
/// of ephemeral data.
|
||||
@@ -400,6 +405,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
validate_vectored_get: BuilderValue<bool>,
|
||||
|
||||
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
|
||||
|
||||
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
|
||||
}
|
||||
|
||||
@@ -487,6 +494,7 @@ impl PageServerConfigBuilder {
|
||||
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
|
||||
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
|
||||
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
}
|
||||
@@ -672,6 +680,10 @@ impl PageServerConfigBuilder {
|
||||
self.validate_vectored_get = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
|
||||
self.image_compression = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
|
||||
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -732,6 +744,7 @@ impl PageServerConfigBuilder {
|
||||
get_impl,
|
||||
max_vectored_read_bytes,
|
||||
validate_vectored_get,
|
||||
image_compression,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
}
|
||||
CUSTOM LOGIC
|
||||
@@ -1026,6 +1039,9 @@ impl PageServerConf {
|
||||
"validate_vectored_get" => {
|
||||
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
|
||||
}
|
||||
"image_compression" => {
|
||||
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
|
||||
}
|
||||
"ephemeral_bytes_per_memory_kb" => {
|
||||
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
|
||||
}
|
||||
@@ -1110,6 +1126,7 @@ impl PageServerConf {
|
||||
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
|
||||
.expect("Invalid default constant"),
|
||||
),
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
}
|
||||
@@ -1350,6 +1367,7 @@ background_task_maximum_delay = '334 s'
|
||||
.expect("Invalid default constant")
|
||||
),
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
@@ -1423,6 +1441,7 @@ background_task_maximum_delay = '334 s'
|
||||
.expect("Invalid default constant")
|
||||
),
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
|
||||
@@ -11,7 +11,10 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use async_compression::Level;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -66,12 +69,29 @@ impl<'a> BlockCursor<'a> {
|
||||
len_buf.copy_from_slice(&buf[off..off + 4]);
|
||||
off += 4;
|
||||
}
|
||||
len_buf[0] &= 0x7f;
|
||||
len_buf[0] &= 0x0f;
|
||||
u32::from_be_bytes(len_buf) as usize
|
||||
};
|
||||
let compression_bits = first_len_byte & 0xf0;
|
||||
|
||||
dstbuf.clear();
|
||||
dstbuf.reserve(len);
|
||||
let mut tmp_buf = Vec::new();
|
||||
let buf_to_write;
|
||||
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
|
||||
buf_to_write = dstbuf;
|
||||
None
|
||||
} else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 {
|
||||
buf_to_write = &mut tmp_buf;
|
||||
Some(dstbuf)
|
||||
} else {
|
||||
let error = std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("invalid compression byte {compression_bits:x}"),
|
||||
);
|
||||
return Err(error);
|
||||
};
|
||||
|
||||
buf_to_write.clear();
|
||||
buf_to_write.reserve(len);
|
||||
|
||||
// Read the payload
|
||||
let mut remain = len;
|
||||
@@ -85,14 +105,38 @@ impl<'a> BlockCursor<'a> {
|
||||
page_remain = PAGE_SZ;
|
||||
}
|
||||
let this_blk_len = min(remain, page_remain);
|
||||
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
|
||||
buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
|
||||
remain -= this_blk_len;
|
||||
off += this_blk_len;
|
||||
}
|
||||
|
||||
if let Some(dstbuf) = compression {
|
||||
if compression_bits == BYTE_ZSTD {
|
||||
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
|
||||
decoder.write_all(buf_to_write).await?;
|
||||
decoder.flush().await?;
|
||||
} else if compression_bits == BYTE_LZ4 {
|
||||
let decompressed = lz4_flex::block::decompress_size_prepended(&buf_to_write)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("lz4 decompression error: {e:?}"),
|
||||
)
|
||||
})?;
|
||||
dstbuf.extend_from_slice(&decompressed);
|
||||
} else {
|
||||
unreachable!("already checked above")
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const BYTE_UNCOMPRESSED: u8 = 0x80;
|
||||
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
|
||||
const BYTE_LZ4: u8 = BYTE_UNCOMPRESSED | 0x20;
|
||||
|
||||
/// A wrapper of `VirtualFile` that allows users to write blobs.
|
||||
///
|
||||
/// If a `BlobWriter` is dropped, the internal buffer will be
|
||||
@@ -219,6 +263,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
&mut self,
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
self.write_blob_compressed(srcbuf, ctx, None).await
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
algorithm: Option<ImageCompressionAlgorithm>,
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
let offset = self.offset;
|
||||
|
||||
@@ -226,29 +281,75 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
io_buf.clear();
|
||||
let (io_buf, hdr_res) = async {
|
||||
let mut compressed_buf = None;
|
||||
let ((io_buf, hdr_res), srcbuf) = async {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
self.write_all(io_buf, ctx).await
|
||||
(
|
||||
self.write_all(io_buf, ctx).await,
|
||||
srcbuf.slice(..).into_inner(),
|
||||
)
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
if len > 0x0fff_ffff {
|
||||
return (
|
||||
io_buf,
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({len} bytes)"),
|
||||
)),
|
||||
(
|
||||
io_buf,
|
||||
Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({len} bytes)"),
|
||||
)),
|
||||
),
|
||||
srcbuf.slice(..).into_inner(),
|
||||
);
|
||||
}
|
||||
if len > 0x0fff_ffff {
|
||||
tracing::warn!("writing blob above future limit ({len} bytes)");
|
||||
}
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
use ImageCompressionAlgorithm::*;
|
||||
let (high_bit_mask, len_written, srcbuf) = match algorithm {
|
||||
Some(ZstdLow | Zstd | ZstdHigh) => {
|
||||
let mut encoder = if matches!(algorithm, Some(ZstdLow)) {
|
||||
async_compression::tokio::write::ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
Level::Precise(1),
|
||||
)
|
||||
} else if matches!(algorithm, Some(ZstdHigh)) {
|
||||
async_compression::tokio::write::ZstdEncoder::with_quality(
|
||||
Vec::new(),
|
||||
Level::Precise(6),
|
||||
)
|
||||
} else {
|
||||
async_compression::tokio::write::ZstdEncoder::new(Vec::new())
|
||||
};
|
||||
let slice = srcbuf.slice(..);
|
||||
encoder.write_all(&slice[..]).await.unwrap();
|
||||
encoder.shutdown().await.unwrap();
|
||||
let compressed = encoder.into_inner();
|
||||
if compressed.len() < len {
|
||||
let compressed_len = compressed.len();
|
||||
compressed_buf = Some(compressed);
|
||||
(BYTE_ZSTD, compressed_len, slice.into_inner())
|
||||
} else {
|
||||
(BYTE_UNCOMPRESSED, len, slice.into_inner())
|
||||
}
|
||||
}
|
||||
Some(ImageCompressionAlgorithm::LZ4) => {
|
||||
let slice = srcbuf.slice(..);
|
||||
let compressed = lz4_flex::block::compress_prepend_size(&slice[..]);
|
||||
if compressed.len() < len {
|
||||
let compressed_len = compressed.len();
|
||||
compressed_buf = Some(compressed);
|
||||
(BYTE_LZ4, compressed_len, slice.into_inner())
|
||||
} else {
|
||||
(BYTE_UNCOMPRESSED, len, slice.into_inner())
|
||||
}
|
||||
}
|
||||
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice(..).into_inner()),
|
||||
};
|
||||
let mut len_buf = (len_written as u32).to_be_bytes();
|
||||
assert_eq!(len_buf[0] & 0xf0, 0);
|
||||
len_buf[0] |= high_bit_mask;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
self.write_all(io_buf, ctx).await
|
||||
(self.write_all(io_buf, ctx).await, srcbuf)
|
||||
}
|
||||
}
|
||||
.await;
|
||||
@@ -257,7 +358,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
Ok(_) => (),
|
||||
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
|
||||
}
|
||||
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
|
||||
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
|
||||
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
|
||||
(Slice::into_inner(srcbuf.slice(..)), res)
|
||||
} else {
|
||||
self.write_all(srcbuf, ctx).await
|
||||
};
|
||||
(srcbuf, res.map(|_| offset))
|
||||
}
|
||||
}
|
||||
@@ -295,6 +401,12 @@ mod tests {
|
||||
use rand::{Rng, SeedableRng};
|
||||
|
||||
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
|
||||
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
|
||||
}
|
||||
|
||||
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
|
||||
blobs: &[Vec<u8>],
|
||||
) -> Result<(), Error> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
@@ -305,7 +417,26 @@ mod tests {
|
||||
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
|
||||
let (_, res) = match COMPRESSION {
|
||||
0 => wtr.write_blob(blob.clone(), &ctx).await,
|
||||
1 => {
|
||||
wtr.write_blob_compressed(
|
||||
blob.clone(),
|
||||
&ctx,
|
||||
Some(ImageCompressionAlgorithm::ZstdLow),
|
||||
)
|
||||
.await
|
||||
}
|
||||
2 => {
|
||||
wtr.write_blob_compressed(
|
||||
blob.clone(),
|
||||
&ctx,
|
||||
Some(ImageCompressionAlgorithm::LZ4),
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => unreachable!("Invalid compression {COMPRESSION}"),
|
||||
};
|
||||
let offs = res?;
|
||||
offsets.push(offs);
|
||||
}
|
||||
@@ -361,10 +492,17 @@ mod tests {
|
||||
let blobs = &[
|
||||
b"test".to_vec(),
|
||||
random_array(10 * PAGE_SZ),
|
||||
b"hello".to_vec(),
|
||||
random_array(66 * PAGE_SZ),
|
||||
vec![0xf3; 24 * PAGE_SZ],
|
||||
b"foobar".to_vec(),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test_compressed::<false, 1>(blobs).await?;
|
||||
round_trip_test_compressed::<true, 1>(blobs).await?;
|
||||
round_trip_test_compressed::<false, 2>(blobs).await?;
|
||||
round_trip_test_compressed::<true, 2>(blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -46,17 +46,19 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::time::Instant;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
|
||||
@@ -366,6 +368,170 @@ impl ImageLayer {
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
pub async fn compression_statistics(
|
||||
dest_repo_path: &Utf8Path,
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>> {
|
||||
fn make_conf(
|
||||
image_compression: Option<ImageCompressionAlgorithm>,
|
||||
dest_repo_path: &Utf8Path,
|
||||
) -> &'static PageServerConf {
|
||||
let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned());
|
||||
conf.image_compression = image_compression;
|
||||
Box::leak(Box::new(conf))
|
||||
}
|
||||
let image_compressions = [
|
||||
None,
|
||||
Some(ImageCompressionAlgorithm::ZstdLow),
|
||||
Some(ImageCompressionAlgorithm::Zstd),
|
||||
Some(ImageCompressionAlgorithm::ZstdHigh),
|
||||
Some(ImageCompressionAlgorithm::LZ4),
|
||||
];
|
||||
let confs = image_compressions
|
||||
.clone()
|
||||
.map(|compression| make_conf(compression, dest_repo_path));
|
||||
let mut stats = Vec::new();
|
||||
for (image_compression, conf) in image_compressions.into_iter().zip(confs) {
|
||||
let start_compression = Instant::now();
|
||||
let compressed_path = Self::compress_for_conf(path, ctx, conf).await?;
|
||||
let path_to_delete = compressed_path.clone();
|
||||
scopeguard::defer!({
|
||||
let _ = std::fs::remove_file(path_to_delete);
|
||||
});
|
||||
let size = path.metadata()?.size();
|
||||
let elapsed_ms = start_compression.elapsed().as_millis() as u64;
|
||||
let start_decompression = Instant::now();
|
||||
Self::compare_are_equal(path, &compressed_path, ctx, &image_compression).await?;
|
||||
let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64;
|
||||
stats.push((
|
||||
image_compression,
|
||||
size,
|
||||
elapsed_ms,
|
||||
elapsed_decompression_ms,
|
||||
));
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
async fn compress_for_conf(
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<Utf8PathBuf> {
|
||||
let file =
|
||||
VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
|
||||
let tree_reader = DiskBtreeReader::new(
|
||||
summary.index_start_blk,
|
||||
summary.index_root_blk,
|
||||
&block_reader,
|
||||
);
|
||||
|
||||
let mut key_offset_stream =
|
||||
std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id);
|
||||
tokio::fs::create_dir_all(timeline_path).await?;
|
||||
|
||||
let mut writer = ImageLayerWriter::new(
|
||||
conf,
|
||||
summary.timeline_id,
|
||||
tenant_shard_id,
|
||||
&summary.key_range,
|
||||
summary.lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let cursor = block_reader.block_cursor();
|
||||
while let Some(r) = key_offset_stream.next().await {
|
||||
let (key, offset) = r?;
|
||||
let key = Key::from_slice(&key);
|
||||
let content = cursor.read_blob(offset, ctx).await?;
|
||||
writer.put_image(key, content.into(), ctx).await?;
|
||||
}
|
||||
let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
async fn compare_are_equal(
|
||||
path_a: &Utf8Path,
|
||||
path_b: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
cmp: &Option<ImageCompressionAlgorithm>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut files = Vec::new();
|
||||
for path in [path_a, path_b] {
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
files.push(file);
|
||||
}
|
||||
|
||||
let mut readers_summaries = Vec::new();
|
||||
for file in files.iter() {
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
readers_summaries.push((block_reader, summary));
|
||||
}
|
||||
|
||||
let mut tree_readers_cursors = Vec::new();
|
||||
for (block_reader, summary) in readers_summaries.iter() {
|
||||
let tree_reader = DiskBtreeReader::new(
|
||||
summary.index_start_blk,
|
||||
summary.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
let cursor = block_reader.block_cursor();
|
||||
tree_readers_cursors.push((tree_reader, cursor));
|
||||
}
|
||||
|
||||
let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[1]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
while let Some(r) = key_offset_stream_a.next().await {
|
||||
let (key_a, offset_a): (Vec<u8>, _) = r?;
|
||||
let Some(r) = key_offset_stream_b.next().await else {
|
||||
panic!("second file at {path_b} has fewer keys than {path_a}");
|
||||
};
|
||||
let (key_b, offset_b): (Vec<u8>, _) = r?;
|
||||
assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}");
|
||||
let key = Key::from_slice(&key_a);
|
||||
let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?;
|
||||
let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?;
|
||||
assert_eq!(
|
||||
content_a, content_b,
|
||||
"mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}"
|
||||
);
|
||||
//println!("match for key={key} cmp={cmp:?} from {path_a}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
@@ -782,7 +948,10 @@ impl ImageLayerWriterInner {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
|
||||
let (_img, res) = self
|
||||
.blob_writer
|
||||
.write_blob_compressed(img, ctx, self.conf.image_compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
@@ -796,11 +965,10 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
async fn finish(
|
||||
async fn finish_inner(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -854,8 +1022,16 @@ impl ImageLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all().await?;
|
||||
|
||||
Ok((self.conf, desc, self.path))
|
||||
}
|
||||
async fn finish(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let (conf, desc, path) = self.finish_inner(ctx).await?;
|
||||
// FIXME: why not carry the virtualfile here, it supports renaming?
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
|
||||
|
||||
info!("created image layer {}", layer.local_path());
|
||||
|
||||
@@ -923,6 +1099,12 @@ impl ImageLayerWriter {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
/// Obtains the current size of the file
|
||||
pub(crate) fn size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
|
||||
63
scripts/plot-compression-report.py
Executable file
63
scripts/plot-compression-report.py
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env -S python3 -u
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
from pprint import pprint
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
parser = argparse.ArgumentParser(prog="compression-report")
|
||||
parser.add_argument("dir")
|
||||
args = parser.parse_args()
|
||||
|
||||
files = []
|
||||
for file_name in os.listdir(args.dir):
|
||||
if not file_name.endswith(".json"):
|
||||
continue
|
||||
file_path = os.path.join(args.dir, file_name)
|
||||
with open(file_path) as json_str:
|
||||
json_data = json.load(json_str)
|
||||
files.append((file_name, json_data))
|
||||
#pprint(files)
|
||||
|
||||
extra_zstd_lines = True
|
||||
dc = 2 # data column to use (1 for sizes, 2 for time)
|
||||
sort_by = "ZstdHigh"
|
||||
files.sort(key=lambda file_data: [x for x in file_data[1] if x[0] == sort_by][0][dc])
|
||||
|
||||
|
||||
x_axis = []
|
||||
data_baseline = []
|
||||
data_lz4 = []
|
||||
data_zstd = []
|
||||
data_zstd_low = []
|
||||
data_zstd_high = []
|
||||
|
||||
for idx, f in enumerate(files):
|
||||
file_data = f[1]
|
||||
#pprint(file_data)
|
||||
|
||||
x_axis.append(idx)
|
||||
data_baseline.append([x for x in file_data if x[0] is None][0][dc])
|
||||
data_lz4.append([x for x in file_data if x[0] == "LZ4"][0][dc])
|
||||
data_zstd.append([x for x in file_data if x[0] == "Zstd"][0][dc])
|
||||
if extra_zstd_lines:
|
||||
data_zstd_low.append([x for x in file_data if x[0] == "ZstdLow"][0][dc])
|
||||
data_zstd_high.append([x for x in file_data if x[0] == "ZstdHigh"][0][dc])
|
||||
|
||||
plt.plot(x_axis, data_baseline, "x", markeredgewidth=2, label="baseline")
|
||||
plt.plot(x_axis, data_lz4, "x", markeredgewidth=2, label="lz4")
|
||||
plt.plot(x_axis, data_zstd, "x", markeredgewidth=2, label="Zstd")
|
||||
if extra_zstd_lines:
|
||||
plt.plot(x_axis, data_zstd_low, "x", markeredgewidth=2, label="ZstdLow")
|
||||
plt.plot(x_axis, data_zstd_high, "x", markeredgewidth=2, label="ZstdHigh")
|
||||
|
||||
# plt.style.use('_mpl-gallery')
|
||||
plt.ylim(bottom=0)
|
||||
plt.legend(loc="upper left")
|
||||
|
||||
figure_path = os.path.join(args.dir, "figure.png")
|
||||
print(f"saving figure to {figure_path}")
|
||||
plt.savefig(figure_path)
|
||||
plt.show()
|
||||
Reference in New Issue
Block a user