Compare commits

...

15 Commits

Author SHA1 Message Date
Konstantin Knizhnik
70d1086e0f Prepare for first stage of deployment: do not bump format version and do not write data in new format but recognoze new format 2024-03-15 10:02:51 +02:00
Konstantin Knizhnik
5a8e8baf9f Make ruff happy 2024-03-14 18:05:30 +02:00
Konstantin Knizhnik
57a4119a7b Add test for compression 2024-03-14 16:45:45 +02:00
Konstantin Knizhnik
aaef3789b0 Ignore format version when comparing summary for delta_layer 2024-03-14 14:21:35 +02:00
Konstantin Knizhnik
0b57e0b8f2 Fix image layer format version matching 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
485ecbaf8f Fix test_attach_tenant_config.py test 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
0bcbce197a Fix test_attach_tenent_config.py test 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
19d59e58d2 Use CompressionAlgorithm enum 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
ce65d13dbd Add compress_image_layer to openapi spec 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
18fefff026 Fix compressed blob writer 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
2a69861896 Fix parse_tenant_config test 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
98375b3896 Support vectored comp[ressed blobs read 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
8c60359ae5 Emable iomage layer compression by default 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
8c7136b057 Add compress_image_layer property to TenantConfig 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
0df6c41eaa Compress image layer 2024-03-14 08:33:37 +02:00
18 changed files with 345 additions and 74 deletions

10
Cargo.lock generated
View File

@@ -2841,6 +2841,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lz4_flex"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
dependencies = [
"twox-hash",
]
[[package]] [[package]]
name = "match_cfg" name = "match_cfg"
version = "0.1.0" version = "0.1.0"
@@ -3511,6 +3520,7 @@ dependencies = [
"hyper", "hyper",
"itertools", "itertools",
"leaky-bucket", "leaky-bucket",
"lz4_flex",
"md5", "md5",
"metrics", "metrics",
"nix 0.27.1", "nix 0.27.1",

View File

@@ -100,6 +100,7 @@ jsonwebtoken = "9"
lasso = "0.7" lasso = "0.7"
leaky-bucket = "1.0.1" leaky-bucket = "1.0.1"
libc = "0.2" libc = "0.2"
lz4_flex = "0.11.1"
md5 = "0.7.0" md5 = "0.7.0"
memoffset = "0.8" memoffset = "0.8"
native-tls = "0.2" native-tls = "0.2"

View File

@@ -384,6 +384,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>()) .map(|x| x.parse::<bool>())
.transpose() .transpose()
.context("Failed to parse 'trace_read_requests' as bool")?, .context("Failed to parse 'trace_read_requests' as bool")?,
image_layer_compression: settings
.remove("image_layer_compression")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'image_layer_compression' json")?,
eviction_policy: settings eviction_policy: settings
.remove("eviction_policy") .remove("eviction_policy")
.map(serde_json::from_str) .map(serde_json::from_str)
@@ -496,6 +501,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>()) .map(|x| x.parse::<bool>())
.transpose() .transpose()
.context("Failed to parse 'trace_read_requests' as bool")?, .context("Failed to parse 'trace_read_requests' as bool")?,
image_layer_compression: settings
.remove("image_layer_compression")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'image_layer_compression' json")?,
eviction_policy: settings eviction_policy: settings
.remove("eviction_policy") .remove("eviction_policy")
.map(serde_json::from_str) .map(serde_json::from_str)

View File

@@ -294,6 +294,7 @@ pub struct TenantConfig {
pub lagging_wal_timeout: Option<String>, pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>, pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>, pub trace_read_requests: Option<bool>,
pub image_layer_compression: Option<CompressionAlgorithm>,
pub eviction_policy: Option<EvictionPolicy>, pub eviction_policy: Option<EvictionPolicy>,
pub min_resident_size_override: Option<u64>, pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>, pub evictions_low_residence_duration_metric_threshold: Option<String>,
@@ -327,6 +328,23 @@ pub enum CompactionAlgorithm {
Tiered, Tiered,
} }
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
enum_map::Enum,
)]
#[repr(u8)]
pub enum CompressionAlgorithm {
NoCompression,
LZ4,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct EvictionPolicyLayerAccessThreshold { pub struct EvictionPolicyLayerAccessThreshold {
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true hyper.workspace = true
itertools.workspace = true itertools.workspace = true
leaky-bucket.workspace = true leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true md5.workspace = true
nix.workspace = true nix.workspace = true
# hack to get the number of worker threads tokio uses # hack to get the number of worker threads tokio uses

View File

@@ -1538,6 +1538,7 @@ broker_endpoint = '{broker_endpoint}'
let broker_endpoint = "http://127.0.0.1:7777"; let broker_endpoint = "http://127.0.0.1:7777";
let trace_read_requests = true; let trace_read_requests = true;
let image_layer_compression = pageserver_api::models::CompressionAlgorithm::LZ4;
let config_string = format!( let config_string = format!(
r#"{ALL_BASE_VALUES_TOML} r#"{ALL_BASE_VALUES_TOML}
@@ -1545,7 +1546,8 @@ pg_distrib_dir='{pg_distrib_dir}'
broker_endpoint = '{broker_endpoint}' broker_endpoint = '{broker_endpoint}'
[tenant_config] [tenant_config]
trace_read_requests = {trace_read_requests}"#, trace_read_requests = {trace_read_requests}
image_layer_compression = 'LZ4'"#,
); );
let toml = config_string.parse()?; let toml = config_string.parse()?;
@@ -1555,6 +1557,10 @@ trace_read_requests = {trace_read_requests}"#,
conf.default_tenant_conf.trace_read_requests, trace_read_requests, conf.default_tenant_conf.trace_read_requests, trace_read_requests,
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants", "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
); );
assert_eq!(
conf.default_tenant_conf.image_layer_compression, image_layer_compression,
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
);
Ok(()) Ok(())
} }

View File

@@ -1444,6 +1444,8 @@ components:
type: integer type: integer
trace_read_requests: trace_read_requests:
type: boolean type: boolean
image_layer_compression:
type: string
heatmap_period: heatmap_period:
type: string type: string
TenantConfigResponse: TenantConfigResponse:

View File

@@ -42,6 +42,9 @@ use tracing::info;
/// backwards-compatible changes to the metadata format. /// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3; pub const STORAGE_FORMAT_VERSION: u16 = 3;
/// Minimal sorage format version with compression support
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
pub const DEFAULT_PG_VERSION: u32 = 15; pub const DEFAULT_PG_VERSION: u32 = 15;
// Magic constants used to identify different kinds of files // Magic constants used to identify different kinds of files

View File

@@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt; use futures::FutureExt;
use futures::StreamExt; use futures::StreamExt;
use pageserver_api::models; use pageserver_api::models;
use pageserver_api::models::CompressionAlgorithm;
use pageserver_api::models::TimelineState; use pageserver_api::models::TimelineState;
use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity; use pageserver_api::shard::ShardIdentity;
@@ -2288,6 +2289,13 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests) .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
} }
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.image_layer_compression
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
}
pub fn get_min_resident_size_override(&self) -> Option<u64> { pub fn get_min_resident_size_override(&self) -> Option<u64> {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf tenant_conf
@@ -3637,6 +3645,7 @@ pub(crate) mod harness {
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout), lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag), max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
trace_read_requests: Some(tenant_conf.trace_read_requests), trace_read_requests: Some(tenant_conf.trace_read_requests),
image_layer_compression: Some(tenant_conf.image_layer_compression),
eviction_policy: Some(tenant_conf.eviction_policy), eviction_policy: Some(tenant_conf.eviction_policy),
min_resident_size_override: tenant_conf.min_resident_size_override, min_resident_size_override: tenant_conf.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: Some( evictions_low_residence_duration_metric_threshold: Some(

View File

@@ -11,13 +11,16 @@
//! len < 128: 0XXXXXXX //! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//! //!
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ; use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor; use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile; use crate::virtual_file::VirtualFile;
use lz4_flex;
use pageserver_api::models::CompressionAlgorithm;
use postgres_ffi::BLCKSZ;
use std::cmp::min; use std::cmp::min;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
self.read_blob_into_buf(offset, &mut buf, ctx).await?; self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf) Ok(buf)
} }
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_compressed_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let blknum = (offset / PAGE_SZ as u64) as u32;
let off = (offset % PAGE_SZ as u64) as usize;
let buf = self.read_blk(blknum, ctx).await?;
let compression_alg = CompressionAlgorithm::from_repr(buf[off]).unwrap();
let res = self.read_blob(offset + 1, ctx).await?;
if compression_alg == CompressionAlgorithm::LZ4 {
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
})
} else {
assert_eq!(compression_alg, CompressionAlgorithm::NoCompression);
Ok(res)
}
}
/// Read blob into the given buffer. Any previous contents in the buffer /// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten. /// are overwritten.
pub async fn read_blob_into_buf( pub async fn read_blob_into_buf(
@@ -211,6 +237,61 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(src_buf, Ok(())) (src_buf, Ok(()))
} }
pub async fn write_compressed_blob(
&mut self,
srcbuf: Bytes,
compression: CompressionAlgorithm,
) -> Result<u64, Error> {
let offset = self.offset;
let len = srcbuf.len();
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let mut is_compressed = false;
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
io_buf.put_u8(len as u8);
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", len),
));
}
if compression == CompressionAlgorithm::LZ4 && len == BLCKSZ as usize {
let compressed = lz4_flex::block::compress(&srcbuf);
if compressed.len() < len {
io_buf.put_u8(compression as u8);
let mut len_buf = (compressed.len() as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
io_buf.extend_from_slice(&compressed[..]);
is_compressed = true;
}
}
if !is_compressed {
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
}
}
let (io_buf, hdr_res) = self.write_all(io_buf).await;
match hdr_res {
Ok(_) => (),
Err(e) => return Err(e),
}
self.io_buf = Some(io_buf);
if is_compressed {
hdr_res.map(|_| offset)
} else {
let (_buf, res) = self.write_all(srcbuf).await;
res.map(|_| offset)
}
}
/// Write a blob of data. Returns the offset that it was written to, /// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later. /// which can be used to retrieve the data later.
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
@@ -227,7 +308,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 { if len < 128 {
// Short blob. Write a 1-byte length header // Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8); io_buf.put_u8(len as u8);
self.write_all(io_buf).await
} else { } else {
// Write a 4-byte length header // Write a 4-byte length header
if len > 0x7fff_ffff { if len > 0x7fff_ffff {
@@ -242,8 +322,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut len_buf = (len as u32).to_be_bytes(); let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80; len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]); io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf).await
} }
self.write_all(io_buf).await
} }
.await; .await;
self.io_buf = Some(io_buf); self.io_buf = Some(io_buf);

View File

@@ -9,9 +9,9 @@
//! may lead to a data loss. //! may lead to a data loss.
//! //!
use anyhow::bail; use anyhow::bail;
use pageserver_api::models::CompactionAlgorithm; use pageserver_api::models::{
use pageserver_api::models::EvictionPolicy; self, CompactionAlgorithm, CompressionAlgorithm, EvictionPolicy, ThrottleConfig,
use pageserver_api::models::{self, ThrottleConfig}; };
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer; use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -41,6 +41,9 @@ pub mod defaults {
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm = pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
super::CompactionAlgorithm::Legacy; super::CompactionAlgorithm::Legacy;
pub const DEFAULT_COMPRESSION_ALGORITHM: super::CompressionAlgorithm =
super::CompressionAlgorithm::LZ4;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger. // Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
@@ -345,6 +348,7 @@ pub struct TenantConf {
/// to avoid eager reconnects. /// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64, pub max_lsn_wal_lag: NonZeroU64,
pub trace_read_requests: bool, pub trace_read_requests: bool,
pub image_layer_compression: CompressionAlgorithm,
pub eviction_policy: EvictionPolicy, pub eviction_policy: EvictionPolicy,
pub min_resident_size_override: Option<u64>, pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string. // See the corresponding metric's help string.
@@ -429,6 +433,10 @@ pub struct TenantConfOpt {
#[serde(default)] #[serde(default)]
pub trace_read_requests: Option<bool>, pub trace_read_requests: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub image_layer_compression: Option<CompressionAlgorithm>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)] #[serde(default)]
pub eviction_policy: Option<EvictionPolicy>, pub eviction_policy: Option<EvictionPolicy>,
@@ -492,6 +500,9 @@ impl TenantConfOpt {
trace_read_requests: self trace_read_requests: self
.trace_read_requests .trace_read_requests
.unwrap_or(global_conf.trace_read_requests), .unwrap_or(global_conf.trace_read_requests),
image_layer_compression: self
.image_layer_compression
.unwrap_or(global_conf.image_layer_compression),
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy), eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
min_resident_size_override: self min_resident_size_override: self
.min_resident_size_override .min_resident_size_override
@@ -538,6 +549,7 @@ impl Default for TenantConf {
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG) max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"), .expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false, trace_read_requests: false,
image_layer_compression: DEFAULT_COMPRESSION_ALGORITHM,
eviction_policy: EvictionPolicy::NoEviction, eviction_policy: EvictionPolicy::NoEviction,
min_resident_size_override: None, min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration( evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
@@ -612,6 +624,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime), lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
max_lsn_wal_lag: value.max_lsn_wal_lag, max_lsn_wal_lag: value.max_lsn_wal_lag,
trace_read_requests: value.trace_read_requests, trace_read_requests: value.trace_read_requests,
image_layer_compression: value.image_layer_compression,
eviction_policy: value.eviction_policy, eviction_policy: value.eviction_policy,
min_resident_size_override: value.min_resident_size_override, min_resident_size_override: value.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: value evictions_low_residence_duration_metric_threshold: value

View File

@@ -724,6 +724,8 @@ impl DeltaLayerInner {
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?; Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if let Some(mut expected_summary) = summary { if let Some(mut expected_summary) = summary {
// assume backward compatibility
expected_summary.format_version = actual_summary.format_version;
// production code path // production code path
expected_summary.index_start_blk = actual_summary.index_start_blk; expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk; expected_summary.index_root_blk = actual_summary.index_root_blk;
@@ -966,7 +968,7 @@ impl DeltaLayerInner {
// track when a key is done. // track when a key is done.
for read in reads.into_iter().rev() { for read in reads.into_iter().rev() {
let res = vectored_blob_reader let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer")) .read_blobs(&read, buf.take().expect("Should have a buffer"), false)
.await; .await;
let blobs_buf = match res { let blobs_buf = match res {

View File

@@ -39,14 +39,17 @@ use crate::tenant::vectored_blob_io::{
}; };
use crate::tenant::{PageReconstructError, Timeline}; use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile}; use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use crate::{
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
};
use anyhow::{anyhow, bail, ensure, Context, Result}; use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf}; use camino::{Utf8Path, Utf8PathBuf};
use hex; use hex;
use pageserver_api::keyspace::KeySpace; use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind; use pageserver_api::models::{CompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::File; use std::fs::File;
@@ -153,6 +156,7 @@ pub struct ImageLayerInner {
// values copied from summary // values copied from summary
index_start_blk: u32, index_start_blk: u32,
index_root_blk: u32, index_root_blk: u32,
format_version: u16,
lsn: Lsn, lsn: Lsn,
@@ -167,6 +171,7 @@ impl std::fmt::Debug for ImageLayerInner {
f.debug_struct("ImageLayerInner") f.debug_struct("ImageLayerInner")
.field("index_start_blk", &self.index_start_blk) .field("index_start_blk", &self.index_start_blk)
.field("index_root_blk", &self.index_root_blk) .field("index_root_blk", &self.index_root_blk)
.field("format_version", &self.format_version)
.finish() .finish()
} }
} }
@@ -391,7 +396,12 @@ impl ImageLayerInner {
let actual_summary = let actual_summary =
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?; Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if actual_summary.format_version > COMPRESSED_STORAGE_FORMAT_VERSION {
bail!("Forward compatibility of storage is not supported: current format version is {}, format version of layer {} is {}", COMPRESSED_STORAGE_FORMAT_VERSION, path, actual_summary.format_version);
}
if let Some(mut expected_summary) = summary { if let Some(mut expected_summary) = summary {
// assume backward compatibility
expected_summary.format_version = actual_summary.format_version;
// production code path // production code path
expected_summary.index_start_blk = actual_summary.index_start_blk; expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk; expected_summary.index_root_blk = actual_summary.index_root_blk;
@@ -408,6 +418,7 @@ impl ImageLayerInner {
Ok(Ok(ImageLayerInner { Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk, index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk, index_root_blk: actual_summary.index_root_blk,
format_version: actual_summary.format_version,
lsn, lsn,
file, file,
file_id, file_id,
@@ -436,18 +447,20 @@ impl ImageLayerInner {
) )
.await? .await?
{ {
let blob = block_reader let ctx = RequestContextBuilder::extend(ctx)
.block_cursor() .page_content_kind(PageContentKind::ImageLayerValue)
.read_blob( .build();
offset, let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
&RequestContextBuilder::extend(ctx) block_reader
.page_content_kind(PageContentKind::ImageLayerValue) .block_cursor()
.build(), .read_compressed_blob(offset, &ctx)
) .await
.await } else {
.with_context(|| format!("failed to read value from offset {}", offset))?; block_reader.block_cursor().read_blob(offset, &ctx).await
let value = Bytes::from(blob); })
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value)); reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete) Ok(ValueReconstructResult::Complete)
} else { } else {
@@ -539,9 +552,12 @@ impl ImageLayerInner {
.into(); .into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file); let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let compressed_storage_format = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION;
for read in reads.into_iter() { for read in reads.into_iter() {
let buf = BytesMut::with_capacity(max_vectored_read_bytes); let buf = BytesMut::with_capacity(max_vectored_read_bytes);
let res = vectored_blob_reader.read_blobs(&read, buf).await; let res = vectored_blob_reader
.read_blobs(&read, buf, compressed_storage_format)
.await;
match res { match res {
Ok(blobs_buf) => { Ok(blobs_buf) => {
@@ -549,11 +565,31 @@ impl ImageLayerInner {
for meta in blobs_buf.blobs.iter() { for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end); let img_buf = frozen_buf.slice(meta.start..meta.end);
reconstruct_state.update_key( if meta.compression_alg == CompressionAlgorithm::LZ4 {
&meta.meta.key, match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) {
self.lsn, Ok(decompressed) => {
Value::Image(img_buf), reconstruct_state.update_key(
); &meta.meta.key,
self.lsn,
Value::Image(Bytes::from(decompressed)),
);
}
Err(err) => reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::from(anyhow!(
"Failed to decompress blob from file {}: {}",
self.file.path,
err
)),
),
}
} else {
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
}
} }
} }
Err(err) => { Err(err) => {
@@ -591,6 +627,7 @@ struct ImageLayerWriterInner {
timeline_id: TimelineId, timeline_id: TimelineId,
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
key_range: Range<Key>, key_range: Range<Key>,
compression: CompressionAlgorithm,
lsn: Lsn, lsn: Lsn,
blob_writer: BlobWriter<false>, blob_writer: BlobWriter<false>,
@@ -602,16 +639,17 @@ impl ImageLayerWriterInner {
/// Start building a new image layer. /// Start building a new image layer.
/// ///
async fn new( async fn new(
conf: &'static PageServerConf, timeline: &Arc<Timeline>,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let timeline_id = timeline.timeline_id;
let tenant_shard_id = timeline.tenant_shard_id;
let compression = timeline.get_image_layer_compression();
// Create the file initially with a temporary filename. // Create the file initially with a temporary filename.
// We'll atomically rename it to the final name when we're done. // We'll atomically rename it to the final name when we're done.
let path = ImageLayer::temp_path_for( let path = ImageLayer::temp_path_for(
conf, timeline.conf,
timeline_id, timeline_id,
tenant_shard_id, tenant_shard_id,
&ImageFileName { &ImageFileName {
@@ -638,11 +676,12 @@ impl ImageLayerWriterInner {
let tree_builder = DiskBtreeBuilder::new(block_buf); let tree_builder = DiskBtreeBuilder::new(block_buf);
let writer = Self { let writer = Self {
conf, conf: timeline.conf,
path, path,
timeline_id, timeline_id,
tenant_shard_id, tenant_shard_id,
key_range: key_range.clone(), key_range: key_range.clone(),
compression,
lsn, lsn,
tree: tree_builder, tree: tree_builder,
blob_writer, blob_writer,
@@ -658,10 +697,15 @@ impl ImageLayerWriterInner {
/// ///
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key)); ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img).await; let off = if STORAGE_FORMAT_VERSION >= COMPRESSED_STORAGE_FORMAT_VERSION {
// TODO: re-use the buffer for `img` further upstack self.blob_writer
let off = res?; .write_compressed_blob(img, self.compression)
.await?
} else {
let (_img, res) = self.blob_writer.write_blob(img).await;
// TODO: re-use the buffer for `img` further upstack
res?
};
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf); key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?; self.tree.append(&keybuf, off)?;
@@ -766,17 +810,12 @@ impl ImageLayerWriter {
/// Start building a new image layer. /// Start building a new image layer.
/// ///
pub async fn new( pub async fn new(
conf: &'static PageServerConf, timeline: &Arc<Timeline>,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
) -> anyhow::Result<ImageLayerWriter> { ) -> anyhow::Result<ImageLayerWriter> {
Ok(Self { Ok(Self {
inner: Some( inner: Some(ImageLayerWriterInner::new(timeline, key_range, lsn).await?),
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
.await?,
),
}) })
} }

View File

@@ -19,8 +19,8 @@ use pageserver_api::{
key::AUX_FILES_KEY, key::AUX_FILES_KEY,
keyspace::KeySpaceAccum, keyspace::KeySpaceAccum,
models::{ models::{
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, CompactionAlgorithm, CompressionAlgorithm, DownloadRemoteLayersTaskInfo,
EvictionPolicy, LayerMapInfo, TimelineState, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState,
}, },
reltag::BlockNumber, reltag::BlockNumber,
shard::{ShardIdentity, TenantShardId}, shard::{ShardIdentity, TenantShardId},
@@ -182,7 +182,7 @@ pub(crate) struct AuxFilesState {
} }
pub struct Timeline { pub struct Timeline {
conf: &'static PageServerConf, pub(crate) conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<AttachedTenantConf>>, tenant_conf: Arc<RwLock<AttachedTenantConf>>,
myself: Weak<Self>, myself: Weak<Self>,
@@ -1515,6 +1515,13 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold) .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
} }
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.image_layer_compression
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
}
pub(super) fn tenant_conf_updated(&self) { pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so, // NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up. // changes will automatically be picked up.
@@ -3458,14 +3465,7 @@ impl Timeline {
continue; continue;
} }
let mut image_layer_writer = ImageLayerWriter::new( let mut image_layer_writer = ImageLayerWriter::new(self, &img_range, lsn).await?;
self.conf,
self.timeline_id,
self.tenant_shard_id,
&img_range,
lsn,
)
.await?;
fail_point!("image-layer-writer-fail-before-finish", |_| { fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(CreateImageLayersError::Other(anyhow::anyhow!( Err(CreateImageLayersError::Other(anyhow::anyhow!(

View File

@@ -997,14 +997,7 @@ impl TimelineAdaptor {
) -> Result<(), PageReconstructError> { ) -> Result<(), PageReconstructError> {
let timer = self.timeline.metrics.create_images_time_histo.start_timer(); let timer = self.timeline.metrics.create_images_time_histo.start_timer();
let mut image_layer_writer = ImageLayerWriter::new( let mut image_layer_writer = ImageLayerWriter::new(&self.timeline, key_range, lsn).await?;
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
key_range,
lsn,
)
.await?;
fail_point!("image-layer-writer-fail-before-finish", |_| { fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(PageReconstructError::Other(anyhow::anyhow!( Err(PageReconstructError::Other(anyhow::anyhow!(

View File

@@ -15,11 +15,11 @@
//! //!
//! Note that the vectored blob api does *not* go through the page cache. //! Note that the vectored blob api does *not* go through the page cache.
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use bytes::BytesMut; use bytes::BytesMut;
use pageserver_api::key::Key; use pageserver_api::key::Key;
use pageserver_api::models::CompressionAlgorithm;
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use utils::lsn::Lsn; use utils::lsn::Lsn;
use utils::vec_map::VecMap; use utils::vec_map::VecMap;
@@ -40,6 +40,7 @@ pub struct VectoredBlob {
pub start: usize, pub start: usize,
pub end: usize, pub end: usize,
pub meta: BlobMeta, pub meta: BlobMeta,
pub compression_alg: CompressionAlgorithm,
} }
/// Return type of [`VectoredBlobReader::read_blobs`] /// Return type of [`VectoredBlobReader::read_blobs`]
@@ -274,6 +275,7 @@ impl<'a> VectoredBlobReader<'a> {
&self, &self,
read: &VectoredRead, read: &VectoredRead,
buf: BytesMut, buf: BytesMut,
compressed_storage_format: bool,
) -> Result<VectoredBlobsBuf, std::io::Error> { ) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0); assert!(read.size() > 0);
assert!( assert!(
@@ -304,35 +306,42 @@ impl<'a> VectoredBlobReader<'a> {
); );
for ((offset, meta), next) in pairs { for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset; let mut offset_in_buf = (offset - start_offset) as usize;
let first_len_byte = buf[offset_in_buf as usize]; let compression_alg = if compressed_storage_format {
offset_in_buf += 1;
CompressionAlgorithm::from_repr(buf[offset_in_buf - 1]).unwrap()
} else {
CompressionAlgorithm::NoCompression
};
let first_len_byte = buf[offset_in_buf];
// Each blob is prefixed by a header containing it's size. // Each blob is prefixed by a header containing it's size.
// Extract the size and skip that header to find the start of the data. // Extract the size and skip that header to find the start of the data.
// The size can be 1 or 4 bytes. The most significant bit is 0 in the // The size can be 1 or 4 bytes. The most significant bit is 0 in the
// 1 byte case and 1 in the 4 byte case. // 1 byte case and 1 in the 4 byte case.
let (size_length, blob_size) = if first_len_byte < 0x80 { let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64) (1usize, first_len_byte as usize)
} else { } else {
let mut blob_size_buf = [0u8; 4]; let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]); blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f; blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64) (4usize, u32::from_be_bytes(blob_size_buf) as usize)
}; };
let start = offset_in_buf + size_length; let start = offset_in_buf + size_length;
let end = match next { let end = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset, Some((next_blob_start_offset, _)) => {
(next_blob_start_offset - start_offset) as usize
}
None => start + blob_size, None => start + blob_size,
}; };
assert_eq!(end - start, blob_size); assert_eq!(end - start, blob_size);
metas.push(VectoredBlob { metas.push(VectoredBlob {
start: start as usize, start,
end: end as usize, end,
compression_alg,
meta: *meta, meta: *meta,
}) })
} }

View File

@@ -188,6 +188,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"max": 1000, "max": 1000,
}, },
"trace_read_requests": True, "trace_read_requests": True,
"image_layer_compression": "NoCompression",
"walreceiver_connect_timeout": "13m", "walreceiver_connect_timeout": "13m",
} }

View File

@@ -0,0 +1,74 @@
import os
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
#
# Test image layer compression and log compression ratio
#
@pytest.mark.timeout(1000)
def test_compression(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
def calculate_layers_size(tenant, timeline):
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, tenant, timeline
)
delta_total_size = 0
image_total_size = 0
for filename in os.listdir(timeline_path):
if filename.startswith("00000") and not filename.endswith(".___temp"):
size = os.path.getsize(timeline_path + filename)
pos = filename.find("__")
if pos >= 0:
pos = filename.find("-", pos)
if pos >= 0:
delta_total_size += size
else:
image_total_size += size
log.info(f"Image layers size: {image_total_size}, delta layers size: {delta_total_size}")
return image_total_size
tenant, timeline = env.neon_cli.create_tenant(
conf={
# Use aggressive compaction and checkpoint settings
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "1 s",
"compaction_threshold": "1",
"image_layer_compression": '"LZ4"',
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
connstr = endpoint.connstr()
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
time.sleep(5) # wait sometime to let background tasks completed at PS
compressed_image_size = calculate_layers_size(tenant, timeline)
tenant, timeline = env.neon_cli.create_tenant(
conf={
# Use aggressive compaction and checkpoint settings
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "1 s",
"compaction_threshold": "1",
"image_layer_compression": '"NoCompression"',
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
connstr = endpoint.connstr()
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
time.sleep(5) # wait sometime to let background tasks completed at PS
raw_image_size = calculate_layers_size(tenant, timeline)
log.info(f"Compression ratio: {raw_image_size/compressed_image_size}")