mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 11:40:37 +00:00
Compare commits
15 Commits
ktls
...
image_laye
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
70d1086e0f | ||
|
|
5a8e8baf9f | ||
|
|
57a4119a7b | ||
|
|
aaef3789b0 | ||
|
|
0b57e0b8f2 | ||
|
|
485ecbaf8f | ||
|
|
0bcbce197a | ||
|
|
19d59e58d2 | ||
|
|
ce65d13dbd | ||
|
|
18fefff026 | ||
|
|
2a69861896 | ||
|
|
98375b3896 | ||
|
|
8c60359ae5 | ||
|
|
8c7136b057 | ||
|
|
0df6c41eaa |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2841,6 +2841,15 @@ version = "0.4.20"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lz4_flex"
|
||||||
|
version = "0.11.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
|
||||||
|
dependencies = [
|
||||||
|
"twox-hash",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "match_cfg"
|
name = "match_cfg"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -3511,6 +3520,7 @@ dependencies = [
|
|||||||
"hyper",
|
"hyper",
|
||||||
"itertools",
|
"itertools",
|
||||||
"leaky-bucket",
|
"leaky-bucket",
|
||||||
|
"lz4_flex",
|
||||||
"md5",
|
"md5",
|
||||||
"metrics",
|
"metrics",
|
||||||
"nix 0.27.1",
|
"nix 0.27.1",
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ jsonwebtoken = "9"
|
|||||||
lasso = "0.7"
|
lasso = "0.7"
|
||||||
leaky-bucket = "1.0.1"
|
leaky-bucket = "1.0.1"
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
|
lz4_flex = "0.11.1"
|
||||||
md5 = "0.7.0"
|
md5 = "0.7.0"
|
||||||
memoffset = "0.8"
|
memoffset = "0.8"
|
||||||
native-tls = "0.2"
|
native-tls = "0.2"
|
||||||
|
|||||||
@@ -384,6 +384,11 @@ impl PageServerNode {
|
|||||||
.map(|x| x.parse::<bool>())
|
.map(|x| x.parse::<bool>())
|
||||||
.transpose()
|
.transpose()
|
||||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||||
|
image_layer_compression: settings
|
||||||
|
.remove("image_layer_compression")
|
||||||
|
.map(serde_json::from_str)
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'image_layer_compression' json")?,
|
||||||
eviction_policy: settings
|
eviction_policy: settings
|
||||||
.remove("eviction_policy")
|
.remove("eviction_policy")
|
||||||
.map(serde_json::from_str)
|
.map(serde_json::from_str)
|
||||||
@@ -496,6 +501,11 @@ impl PageServerNode {
|
|||||||
.map(|x| x.parse::<bool>())
|
.map(|x| x.parse::<bool>())
|
||||||
.transpose()
|
.transpose()
|
||||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||||
|
image_layer_compression: settings
|
||||||
|
.remove("image_layer_compression")
|
||||||
|
.map(serde_json::from_str)
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'image_layer_compression' json")?,
|
||||||
eviction_policy: settings
|
eviction_policy: settings
|
||||||
.remove("eviction_policy")
|
.remove("eviction_policy")
|
||||||
.map(serde_json::from_str)
|
.map(serde_json::from_str)
|
||||||
|
|||||||
@@ -294,6 +294,7 @@ pub struct TenantConfig {
|
|||||||
pub lagging_wal_timeout: Option<String>,
|
pub lagging_wal_timeout: Option<String>,
|
||||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
pub trace_read_requests: Option<bool>,
|
pub trace_read_requests: Option<bool>,
|
||||||
|
pub image_layer_compression: Option<CompressionAlgorithm>,
|
||||||
pub eviction_policy: Option<EvictionPolicy>,
|
pub eviction_policy: Option<EvictionPolicy>,
|
||||||
pub min_resident_size_override: Option<u64>,
|
pub min_resident_size_override: Option<u64>,
|
||||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||||
@@ -327,6 +328,23 @@ pub enum CompactionAlgorithm {
|
|||||||
Tiered,
|
Tiered,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(
|
||||||
|
Debug,
|
||||||
|
Clone,
|
||||||
|
Copy,
|
||||||
|
PartialEq,
|
||||||
|
Eq,
|
||||||
|
Serialize,
|
||||||
|
Deserialize,
|
||||||
|
strum_macros::FromRepr,
|
||||||
|
enum_map::Enum,
|
||||||
|
)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub enum CompressionAlgorithm {
|
||||||
|
NoCompression,
|
||||||
|
LZ4,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub struct EvictionPolicyLayerAccessThreshold {
|
pub struct EvictionPolicyLayerAccessThreshold {
|
||||||
#[serde(with = "humantime_serde")]
|
#[serde(with = "humantime_serde")]
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
|||||||
hyper.workspace = true
|
hyper.workspace = true
|
||||||
itertools.workspace = true
|
itertools.workspace = true
|
||||||
leaky-bucket.workspace = true
|
leaky-bucket.workspace = true
|
||||||
|
lz4_flex.workspace = true
|
||||||
md5.workspace = true
|
md5.workspace = true
|
||||||
nix.workspace = true
|
nix.workspace = true
|
||||||
# hack to get the number of worker threads tokio uses
|
# hack to get the number of worker threads tokio uses
|
||||||
|
|||||||
@@ -1538,6 +1538,7 @@ broker_endpoint = '{broker_endpoint}'
|
|||||||
|
|
||||||
let broker_endpoint = "http://127.0.0.1:7777";
|
let broker_endpoint = "http://127.0.0.1:7777";
|
||||||
let trace_read_requests = true;
|
let trace_read_requests = true;
|
||||||
|
let image_layer_compression = pageserver_api::models::CompressionAlgorithm::LZ4;
|
||||||
|
|
||||||
let config_string = format!(
|
let config_string = format!(
|
||||||
r#"{ALL_BASE_VALUES_TOML}
|
r#"{ALL_BASE_VALUES_TOML}
|
||||||
@@ -1545,7 +1546,8 @@ pg_distrib_dir='{pg_distrib_dir}'
|
|||||||
broker_endpoint = '{broker_endpoint}'
|
broker_endpoint = '{broker_endpoint}'
|
||||||
|
|
||||||
[tenant_config]
|
[tenant_config]
|
||||||
trace_read_requests = {trace_read_requests}"#,
|
trace_read_requests = {trace_read_requests}
|
||||||
|
image_layer_compression = 'LZ4'"#,
|
||||||
);
|
);
|
||||||
|
|
||||||
let toml = config_string.parse()?;
|
let toml = config_string.parse()?;
|
||||||
@@ -1555,6 +1557,10 @@ trace_read_requests = {trace_read_requests}"#,
|
|||||||
conf.default_tenant_conf.trace_read_requests, trace_read_requests,
|
conf.default_tenant_conf.trace_read_requests, trace_read_requests,
|
||||||
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
|
conf.default_tenant_conf.image_layer_compression, image_layer_compression,
|
||||||
|
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1444,6 +1444,8 @@ components:
|
|||||||
type: integer
|
type: integer
|
||||||
trace_read_requests:
|
trace_read_requests:
|
||||||
type: boolean
|
type: boolean
|
||||||
|
image_layer_compression:
|
||||||
|
type: string
|
||||||
heatmap_period:
|
heatmap_period:
|
||||||
type: string
|
type: string
|
||||||
TenantConfigResponse:
|
TenantConfigResponse:
|
||||||
|
|||||||
@@ -42,6 +42,9 @@ use tracing::info;
|
|||||||
/// backwards-compatible changes to the metadata format.
|
/// backwards-compatible changes to the metadata format.
|
||||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||||
|
|
||||||
|
/// Minimal sorage format version with compression support
|
||||||
|
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
|
||||||
|
|
||||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||||
|
|
||||||
// Magic constants used to identify different kinds of files
|
// Magic constants used to identify different kinds of files
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered;
|
|||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use pageserver_api::models;
|
use pageserver_api::models;
|
||||||
|
use pageserver_api::models::CompressionAlgorithm;
|
||||||
use pageserver_api::models::TimelineState;
|
use pageserver_api::models::TimelineState;
|
||||||
use pageserver_api::models::WalRedoManagerStatus;
|
use pageserver_api::models::WalRedoManagerStatus;
|
||||||
use pageserver_api::shard::ShardIdentity;
|
use pageserver_api::shard::ShardIdentity;
|
||||||
@@ -2288,6 +2289,13 @@ impl Tenant {
|
|||||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
|
||||||
|
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||||
|
tenant_conf
|
||||||
|
.image_layer_compression
|
||||||
|
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||||
tenant_conf
|
tenant_conf
|
||||||
@@ -3637,6 +3645,7 @@ pub(crate) mod harness {
|
|||||||
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||||
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
||||||
|
image_layer_compression: Some(tenant_conf.image_layer_compression),
|
||||||
eviction_policy: Some(tenant_conf.eviction_policy),
|
eviction_policy: Some(tenant_conf.eviction_policy),
|
||||||
min_resident_size_override: tenant_conf.min_resident_size_override,
|
min_resident_size_override: tenant_conf.min_resident_size_override,
|
||||||
evictions_low_residence_duration_metric_threshold: Some(
|
evictions_low_residence_duration_metric_threshold: Some(
|
||||||
|
|||||||
@@ -11,13 +11,16 @@
|
|||||||
//! len < 128: 0XXXXXXX
|
//! len < 128: 0XXXXXXX
|
||||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||||
//!
|
//!
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||||
|
|
||||||
use crate::context::RequestContext;
|
use crate::context::RequestContext;
|
||||||
use crate::page_cache::PAGE_SZ;
|
use crate::page_cache::PAGE_SZ;
|
||||||
use crate::tenant::block_io::BlockCursor;
|
use crate::tenant::block_io::BlockCursor;
|
||||||
use crate::virtual_file::VirtualFile;
|
use crate::virtual_file::VirtualFile;
|
||||||
|
use lz4_flex;
|
||||||
|
use pageserver_api::models::CompressionAlgorithm;
|
||||||
|
use postgres_ffi::BLCKSZ;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::io::{Error, ErrorKind};
|
use std::io::{Error, ErrorKind};
|
||||||
|
|
||||||
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
|
|||||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
}
|
}
|
||||||
|
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||||
|
/// are overwritten.
|
||||||
|
pub async fn read_compressed_blob(
|
||||||
|
&self,
|
||||||
|
offset: u64,
|
||||||
|
ctx: &RequestContext,
|
||||||
|
) -> Result<Vec<u8>, std::io::Error> {
|
||||||
|
let blknum = (offset / PAGE_SZ as u64) as u32;
|
||||||
|
let off = (offset % PAGE_SZ as u64) as usize;
|
||||||
|
|
||||||
|
let buf = self.read_blk(blknum, ctx).await?;
|
||||||
|
let compression_alg = CompressionAlgorithm::from_repr(buf[off]).unwrap();
|
||||||
|
let res = self.read_blob(offset + 1, ctx).await?;
|
||||||
|
if compression_alg == CompressionAlgorithm::LZ4 {
|
||||||
|
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||||
|
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
assert_eq!(compression_alg, CompressionAlgorithm::NoCompression);
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||||
/// are overwritten.
|
/// are overwritten.
|
||||||
pub async fn read_blob_into_buf(
|
pub async fn read_blob_into_buf(
|
||||||
@@ -211,6 +237,61 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
|||||||
(src_buf, Ok(()))
|
(src_buf, Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn write_compressed_blob(
|
||||||
|
&mut self,
|
||||||
|
srcbuf: Bytes,
|
||||||
|
compression: CompressionAlgorithm,
|
||||||
|
) -> Result<u64, Error> {
|
||||||
|
let offset = self.offset;
|
||||||
|
let len = srcbuf.len();
|
||||||
|
|
||||||
|
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||||
|
io_buf.clear();
|
||||||
|
let mut is_compressed = false;
|
||||||
|
if len < 128 {
|
||||||
|
// Short blob. Write a 1-byte length header
|
||||||
|
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
|
||||||
|
io_buf.put_u8(len as u8);
|
||||||
|
} else {
|
||||||
|
// Write a 4-byte length header
|
||||||
|
if len > 0x7fff_ffff {
|
||||||
|
return Err(Error::new(
|
||||||
|
ErrorKind::Other,
|
||||||
|
format!("blob too large ({} bytes)", len),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if compression == CompressionAlgorithm::LZ4 && len == BLCKSZ as usize {
|
||||||
|
let compressed = lz4_flex::block::compress(&srcbuf);
|
||||||
|
if compressed.len() < len {
|
||||||
|
io_buf.put_u8(compression as u8);
|
||||||
|
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||||
|
len_buf[0] |= 0x80;
|
||||||
|
io_buf.extend_from_slice(&len_buf[..]);
|
||||||
|
io_buf.extend_from_slice(&compressed[..]);
|
||||||
|
is_compressed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !is_compressed {
|
||||||
|
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
|
||||||
|
let mut len_buf = (len as u32).to_be_bytes();
|
||||||
|
len_buf[0] |= 0x80;
|
||||||
|
io_buf.extend_from_slice(&len_buf[..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let (io_buf, hdr_res) = self.write_all(io_buf).await;
|
||||||
|
match hdr_res {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
self.io_buf = Some(io_buf);
|
||||||
|
if is_compressed {
|
||||||
|
hdr_res.map(|_| offset)
|
||||||
|
} else {
|
||||||
|
let (_buf, res) = self.write_all(srcbuf).await;
|
||||||
|
res.map(|_| offset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Write a blob of data. Returns the offset that it was written to,
|
/// Write a blob of data. Returns the offset that it was written to,
|
||||||
/// which can be used to retrieve the data later.
|
/// which can be used to retrieve the data later.
|
||||||
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||||
@@ -227,7 +308,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
|||||||
if len < 128 {
|
if len < 128 {
|
||||||
// Short blob. Write a 1-byte length header
|
// Short blob. Write a 1-byte length header
|
||||||
io_buf.put_u8(len as u8);
|
io_buf.put_u8(len as u8);
|
||||||
self.write_all(io_buf).await
|
|
||||||
} else {
|
} else {
|
||||||
// Write a 4-byte length header
|
// Write a 4-byte length header
|
||||||
if len > 0x7fff_ffff {
|
if len > 0x7fff_ffff {
|
||||||
@@ -242,8 +322,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
|||||||
let mut len_buf = (len as u32).to_be_bytes();
|
let mut len_buf = (len as u32).to_be_bytes();
|
||||||
len_buf[0] |= 0x80;
|
len_buf[0] |= 0x80;
|
||||||
io_buf.extend_from_slice(&len_buf[..]);
|
io_buf.extend_from_slice(&len_buf[..]);
|
||||||
self.write_all(io_buf).await
|
|
||||||
}
|
}
|
||||||
|
self.write_all(io_buf).await
|
||||||
}
|
}
|
||||||
.await;
|
.await;
|
||||||
self.io_buf = Some(io_buf);
|
self.io_buf = Some(io_buf);
|
||||||
|
|||||||
@@ -9,9 +9,9 @@
|
|||||||
//! may lead to a data loss.
|
//! may lead to a data loss.
|
||||||
//!
|
//!
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use pageserver_api::models::CompactionAlgorithm;
|
use pageserver_api::models::{
|
||||||
use pageserver_api::models::EvictionPolicy;
|
self, CompactionAlgorithm, CompressionAlgorithm, EvictionPolicy, ThrottleConfig,
|
||||||
use pageserver_api::models::{self, ThrottleConfig};
|
};
|
||||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||||
use serde::de::IntoDeserializer;
|
use serde::de::IntoDeserializer;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -41,6 +41,9 @@ pub mod defaults {
|
|||||||
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
|
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
|
||||||
super::CompactionAlgorithm::Legacy;
|
super::CompactionAlgorithm::Legacy;
|
||||||
|
|
||||||
|
pub const DEFAULT_COMPRESSION_ALGORITHM: super::CompressionAlgorithm =
|
||||||
|
super::CompressionAlgorithm::LZ4;
|
||||||
|
|
||||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||||
|
|
||||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||||
@@ -345,6 +348,7 @@ pub struct TenantConf {
|
|||||||
/// to avoid eager reconnects.
|
/// to avoid eager reconnects.
|
||||||
pub max_lsn_wal_lag: NonZeroU64,
|
pub max_lsn_wal_lag: NonZeroU64,
|
||||||
pub trace_read_requests: bool,
|
pub trace_read_requests: bool,
|
||||||
|
pub image_layer_compression: CompressionAlgorithm,
|
||||||
pub eviction_policy: EvictionPolicy,
|
pub eviction_policy: EvictionPolicy,
|
||||||
pub min_resident_size_override: Option<u64>,
|
pub min_resident_size_override: Option<u64>,
|
||||||
// See the corresponding metric's help string.
|
// See the corresponding metric's help string.
|
||||||
@@ -429,6 +433,10 @@ pub struct TenantConfOpt {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub trace_read_requests: Option<bool>,
|
pub trace_read_requests: Option<bool>,
|
||||||
|
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
#[serde(default)]
|
||||||
|
pub image_layer_compression: Option<CompressionAlgorithm>,
|
||||||
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub eviction_policy: Option<EvictionPolicy>,
|
pub eviction_policy: Option<EvictionPolicy>,
|
||||||
@@ -492,6 +500,9 @@ impl TenantConfOpt {
|
|||||||
trace_read_requests: self
|
trace_read_requests: self
|
||||||
.trace_read_requests
|
.trace_read_requests
|
||||||
.unwrap_or(global_conf.trace_read_requests),
|
.unwrap_or(global_conf.trace_read_requests),
|
||||||
|
image_layer_compression: self
|
||||||
|
.image_layer_compression
|
||||||
|
.unwrap_or(global_conf.image_layer_compression),
|
||||||
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
|
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
|
||||||
min_resident_size_override: self
|
min_resident_size_override: self
|
||||||
.min_resident_size_override
|
.min_resident_size_override
|
||||||
@@ -538,6 +549,7 @@ impl Default for TenantConf {
|
|||||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||||
trace_read_requests: false,
|
trace_read_requests: false,
|
||||||
|
image_layer_compression: DEFAULT_COMPRESSION_ALGORITHM,
|
||||||
eviction_policy: EvictionPolicy::NoEviction,
|
eviction_policy: EvictionPolicy::NoEviction,
|
||||||
min_resident_size_override: None,
|
min_resident_size_override: None,
|
||||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||||
@@ -612,6 +624,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
|||||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||||
trace_read_requests: value.trace_read_requests,
|
trace_read_requests: value.trace_read_requests,
|
||||||
|
image_layer_compression: value.image_layer_compression,
|
||||||
eviction_policy: value.eviction_policy,
|
eviction_policy: value.eviction_policy,
|
||||||
min_resident_size_override: value.min_resident_size_override,
|
min_resident_size_override: value.min_resident_size_override,
|
||||||
evictions_low_residence_duration_metric_threshold: value
|
evictions_low_residence_duration_metric_threshold: value
|
||||||
|
|||||||
@@ -724,6 +724,8 @@ impl DeltaLayerInner {
|
|||||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||||
|
|
||||||
if let Some(mut expected_summary) = summary {
|
if let Some(mut expected_summary) = summary {
|
||||||
|
// assume backward compatibility
|
||||||
|
expected_summary.format_version = actual_summary.format_version;
|
||||||
// production code path
|
// production code path
|
||||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||||
@@ -966,7 +968,7 @@ impl DeltaLayerInner {
|
|||||||
// track when a key is done.
|
// track when a key is done.
|
||||||
for read in reads.into_iter().rev() {
|
for read in reads.into_iter().rev() {
|
||||||
let res = vectored_blob_reader
|
let res = vectored_blob_reader
|
||||||
.read_blobs(&read, buf.take().expect("Should have a buffer"))
|
.read_blobs(&read, buf.take().expect("Should have a buffer"), false)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let blobs_buf = match res {
|
let blobs_buf = match res {
|
||||||
|
|||||||
@@ -39,14 +39,17 @@ use crate::tenant::vectored_blob_io::{
|
|||||||
};
|
};
|
||||||
use crate::tenant::{PageReconstructError, Timeline};
|
use crate::tenant::{PageReconstructError, Timeline};
|
||||||
use crate::virtual_file::{self, VirtualFile};
|
use crate::virtual_file::{self, VirtualFile};
|
||||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
use crate::{
|
||||||
|
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||||
|
};
|
||||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use camino::{Utf8Path, Utf8PathBuf};
|
use camino::{Utf8Path, Utf8PathBuf};
|
||||||
use hex;
|
use hex;
|
||||||
use pageserver_api::keyspace::KeySpace;
|
use pageserver_api::keyspace::KeySpace;
|
||||||
use pageserver_api::models::LayerAccessKind;
|
use pageserver_api::models::{CompressionAlgorithm, LayerAccessKind};
|
||||||
use pageserver_api::shard::TenantShardId;
|
use pageserver_api::shard::TenantShardId;
|
||||||
|
use postgres_ffi::BLCKSZ;
|
||||||
use rand::{distributions::Alphanumeric, Rng};
|
use rand::{distributions::Alphanumeric, Rng};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@@ -153,6 +156,7 @@ pub struct ImageLayerInner {
|
|||||||
// values copied from summary
|
// values copied from summary
|
||||||
index_start_blk: u32,
|
index_start_blk: u32,
|
||||||
index_root_blk: u32,
|
index_root_blk: u32,
|
||||||
|
format_version: u16,
|
||||||
|
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
|
|
||||||
@@ -167,6 +171,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
|||||||
f.debug_struct("ImageLayerInner")
|
f.debug_struct("ImageLayerInner")
|
||||||
.field("index_start_blk", &self.index_start_blk)
|
.field("index_start_blk", &self.index_start_blk)
|
||||||
.field("index_root_blk", &self.index_root_blk)
|
.field("index_root_blk", &self.index_root_blk)
|
||||||
|
.field("format_version", &self.format_version)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -391,7 +396,12 @@ impl ImageLayerInner {
|
|||||||
let actual_summary =
|
let actual_summary =
|
||||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||||
|
|
||||||
|
if actual_summary.format_version > COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||||
|
bail!("Forward compatibility of storage is not supported: current format version is {}, format version of layer {} is {}", COMPRESSED_STORAGE_FORMAT_VERSION, path, actual_summary.format_version);
|
||||||
|
}
|
||||||
if let Some(mut expected_summary) = summary {
|
if let Some(mut expected_summary) = summary {
|
||||||
|
// assume backward compatibility
|
||||||
|
expected_summary.format_version = actual_summary.format_version;
|
||||||
// production code path
|
// production code path
|
||||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||||
@@ -408,6 +418,7 @@ impl ImageLayerInner {
|
|||||||
Ok(Ok(ImageLayerInner {
|
Ok(Ok(ImageLayerInner {
|
||||||
index_start_blk: actual_summary.index_start_blk,
|
index_start_blk: actual_summary.index_start_blk,
|
||||||
index_root_blk: actual_summary.index_root_blk,
|
index_root_blk: actual_summary.index_root_blk,
|
||||||
|
format_version: actual_summary.format_version,
|
||||||
lsn,
|
lsn,
|
||||||
file,
|
file,
|
||||||
file_id,
|
file_id,
|
||||||
@@ -436,18 +447,20 @@ impl ImageLayerInner {
|
|||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
let blob = block_reader
|
let ctx = RequestContextBuilder::extend(ctx)
|
||||||
.block_cursor()
|
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||||
.read_blob(
|
.build();
|
||||||
offset,
|
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||||
&RequestContextBuilder::extend(ctx)
|
block_reader
|
||||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
.block_cursor()
|
||||||
.build(),
|
.read_compressed_blob(offset, &ctx)
|
||||||
)
|
.await
|
||||||
.await
|
} else {
|
||||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
block_reader.block_cursor().read_blob(offset, &ctx).await
|
||||||
let value = Bytes::from(blob);
|
})
|
||||||
|
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||||
|
|
||||||
|
let value = Bytes::from(blob);
|
||||||
reconstruct_state.img = Some((self.lsn, value));
|
reconstruct_state.img = Some((self.lsn, value));
|
||||||
Ok(ValueReconstructResult::Complete)
|
Ok(ValueReconstructResult::Complete)
|
||||||
} else {
|
} else {
|
||||||
@@ -539,9 +552,12 @@ impl ImageLayerInner {
|
|||||||
.into();
|
.into();
|
||||||
|
|
||||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||||
|
let compressed_storage_format = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION;
|
||||||
for read in reads.into_iter() {
|
for read in reads.into_iter() {
|
||||||
let buf = BytesMut::with_capacity(max_vectored_read_bytes);
|
let buf = BytesMut::with_capacity(max_vectored_read_bytes);
|
||||||
let res = vectored_blob_reader.read_blobs(&read, buf).await;
|
let res = vectored_blob_reader
|
||||||
|
.read_blobs(&read, buf, compressed_storage_format)
|
||||||
|
.await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(blobs_buf) => {
|
Ok(blobs_buf) => {
|
||||||
@@ -549,11 +565,31 @@ impl ImageLayerInner {
|
|||||||
|
|
||||||
for meta in blobs_buf.blobs.iter() {
|
for meta in blobs_buf.blobs.iter() {
|
||||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||||
reconstruct_state.update_key(
|
if meta.compression_alg == CompressionAlgorithm::LZ4 {
|
||||||
&meta.meta.key,
|
match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) {
|
||||||
self.lsn,
|
Ok(decompressed) => {
|
||||||
Value::Image(img_buf),
|
reconstruct_state.update_key(
|
||||||
);
|
&meta.meta.key,
|
||||||
|
self.lsn,
|
||||||
|
Value::Image(Bytes::from(decompressed)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(err) => reconstruct_state.on_key_error(
|
||||||
|
meta.meta.key,
|
||||||
|
PageReconstructError::from(anyhow!(
|
||||||
|
"Failed to decompress blob from file {}: {}",
|
||||||
|
self.file.path,
|
||||||
|
err
|
||||||
|
)),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
reconstruct_state.update_key(
|
||||||
|
&meta.meta.key,
|
||||||
|
self.lsn,
|
||||||
|
Value::Image(img_buf),
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -591,6 +627,7 @@ struct ImageLayerWriterInner {
|
|||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
tenant_shard_id: TenantShardId,
|
tenant_shard_id: TenantShardId,
|
||||||
key_range: Range<Key>,
|
key_range: Range<Key>,
|
||||||
|
compression: CompressionAlgorithm,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
|
|
||||||
blob_writer: BlobWriter<false>,
|
blob_writer: BlobWriter<false>,
|
||||||
@@ -602,16 +639,17 @@ impl ImageLayerWriterInner {
|
|||||||
/// Start building a new image layer.
|
/// Start building a new image layer.
|
||||||
///
|
///
|
||||||
async fn new(
|
async fn new(
|
||||||
conf: &'static PageServerConf,
|
timeline: &Arc<Timeline>,
|
||||||
timeline_id: TimelineId,
|
|
||||||
tenant_shard_id: TenantShardId,
|
|
||||||
key_range: &Range<Key>,
|
key_range: &Range<Key>,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
|
let timeline_id = timeline.timeline_id;
|
||||||
|
let tenant_shard_id = timeline.tenant_shard_id;
|
||||||
|
let compression = timeline.get_image_layer_compression();
|
||||||
// Create the file initially with a temporary filename.
|
// Create the file initially with a temporary filename.
|
||||||
// We'll atomically rename it to the final name when we're done.
|
// We'll atomically rename it to the final name when we're done.
|
||||||
let path = ImageLayer::temp_path_for(
|
let path = ImageLayer::temp_path_for(
|
||||||
conf,
|
timeline.conf,
|
||||||
timeline_id,
|
timeline_id,
|
||||||
tenant_shard_id,
|
tenant_shard_id,
|
||||||
&ImageFileName {
|
&ImageFileName {
|
||||||
@@ -638,11 +676,12 @@ impl ImageLayerWriterInner {
|
|||||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||||
|
|
||||||
let writer = Self {
|
let writer = Self {
|
||||||
conf,
|
conf: timeline.conf,
|
||||||
path,
|
path,
|
||||||
timeline_id,
|
timeline_id,
|
||||||
tenant_shard_id,
|
tenant_shard_id,
|
||||||
key_range: key_range.clone(),
|
key_range: key_range.clone(),
|
||||||
|
compression,
|
||||||
lsn,
|
lsn,
|
||||||
tree: tree_builder,
|
tree: tree_builder,
|
||||||
blob_writer,
|
blob_writer,
|
||||||
@@ -658,10 +697,15 @@ impl ImageLayerWriterInner {
|
|||||||
///
|
///
|
||||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||||
ensure!(self.key_range.contains(&key));
|
ensure!(self.key_range.contains(&key));
|
||||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
let off = if STORAGE_FORMAT_VERSION >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||||
// TODO: re-use the buffer for `img` further upstack
|
self.blob_writer
|
||||||
let off = res?;
|
.write_compressed_blob(img, self.compression)
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
let (_img, res) = self.blob_writer.write_blob(img).await;
|
||||||
|
// TODO: re-use the buffer for `img` further upstack
|
||||||
|
res?
|
||||||
|
};
|
||||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||||
key.write_to_byte_slice(&mut keybuf);
|
key.write_to_byte_slice(&mut keybuf);
|
||||||
self.tree.append(&keybuf, off)?;
|
self.tree.append(&keybuf, off)?;
|
||||||
@@ -766,17 +810,12 @@ impl ImageLayerWriter {
|
|||||||
/// Start building a new image layer.
|
/// Start building a new image layer.
|
||||||
///
|
///
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
conf: &'static PageServerConf,
|
timeline: &Arc<Timeline>,
|
||||||
timeline_id: TimelineId,
|
|
||||||
tenant_shard_id: TenantShardId,
|
|
||||||
key_range: &Range<Key>,
|
key_range: &Range<Key>,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
) -> anyhow::Result<ImageLayerWriter> {
|
) -> anyhow::Result<ImageLayerWriter> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner: Some(
|
inner: Some(ImageLayerWriterInner::new(timeline, key_range, lsn).await?),
|
||||||
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
|
|
||||||
.await?,
|
|
||||||
),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,8 +19,8 @@ use pageserver_api::{
|
|||||||
key::AUX_FILES_KEY,
|
key::AUX_FILES_KEY,
|
||||||
keyspace::KeySpaceAccum,
|
keyspace::KeySpaceAccum,
|
||||||
models::{
|
models::{
|
||||||
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
CompactionAlgorithm, CompressionAlgorithm, DownloadRemoteLayersTaskInfo,
|
||||||
EvictionPolicy, LayerMapInfo, TimelineState,
|
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState,
|
||||||
},
|
},
|
||||||
reltag::BlockNumber,
|
reltag::BlockNumber,
|
||||||
shard::{ShardIdentity, TenantShardId},
|
shard::{ShardIdentity, TenantShardId},
|
||||||
@@ -182,7 +182,7 @@ pub(crate) struct AuxFilesState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct Timeline {
|
pub struct Timeline {
|
||||||
conf: &'static PageServerConf,
|
pub(crate) conf: &'static PageServerConf,
|
||||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||||
|
|
||||||
myself: Weak<Self>,
|
myself: Weak<Self>,
|
||||||
@@ -1515,6 +1515,13 @@ impl Timeline {
|
|||||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
|
||||||
|
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||||
|
tenant_conf
|
||||||
|
.image_layer_compression
|
||||||
|
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn tenant_conf_updated(&self) {
|
pub(super) fn tenant_conf_updated(&self) {
|
||||||
// NB: Most tenant conf options are read by background loops, so,
|
// NB: Most tenant conf options are read by background loops, so,
|
||||||
// changes will automatically be picked up.
|
// changes will automatically be picked up.
|
||||||
@@ -3458,14 +3465,7 @@ impl Timeline {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut image_layer_writer = ImageLayerWriter::new(
|
let mut image_layer_writer = ImageLayerWriter::new(self, &img_range, lsn).await?;
|
||||||
self.conf,
|
|
||||||
self.timeline_id,
|
|
||||||
self.tenant_shard_id,
|
|
||||||
&img_range,
|
|
||||||
lsn,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||||
|
|||||||
@@ -997,14 +997,7 @@ impl TimelineAdaptor {
|
|||||||
) -> Result<(), PageReconstructError> {
|
) -> Result<(), PageReconstructError> {
|
||||||
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
|
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
|
||||||
|
|
||||||
let mut image_layer_writer = ImageLayerWriter::new(
|
let mut image_layer_writer = ImageLayerWriter::new(&self.timeline, key_range, lsn).await?;
|
||||||
self.timeline.conf,
|
|
||||||
self.timeline.timeline_id,
|
|
||||||
self.timeline.tenant_shard_id,
|
|
||||||
key_range,
|
|
||||||
lsn,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||||
Err(PageReconstructError::Other(anyhow::anyhow!(
|
Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||||
|
|||||||
@@ -15,11 +15,11 @@
|
|||||||
//!
|
//!
|
||||||
//! Note that the vectored blob api does *not* go through the page cache.
|
//! Note that the vectored blob api does *not* go through the page cache.
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::num::NonZeroUsize;
|
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use pageserver_api::key::Key;
|
use pageserver_api::key::Key;
|
||||||
|
use pageserver_api::models::CompressionAlgorithm;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
use utils::lsn::Lsn;
|
use utils::lsn::Lsn;
|
||||||
use utils::vec_map::VecMap;
|
use utils::vec_map::VecMap;
|
||||||
|
|
||||||
@@ -40,6 +40,7 @@ pub struct VectoredBlob {
|
|||||||
pub start: usize,
|
pub start: usize,
|
||||||
pub end: usize,
|
pub end: usize,
|
||||||
pub meta: BlobMeta,
|
pub meta: BlobMeta,
|
||||||
|
pub compression_alg: CompressionAlgorithm,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return type of [`VectoredBlobReader::read_blobs`]
|
/// Return type of [`VectoredBlobReader::read_blobs`]
|
||||||
@@ -274,6 +275,7 @@ impl<'a> VectoredBlobReader<'a> {
|
|||||||
&self,
|
&self,
|
||||||
read: &VectoredRead,
|
read: &VectoredRead,
|
||||||
buf: BytesMut,
|
buf: BytesMut,
|
||||||
|
compressed_storage_format: bool,
|
||||||
) -> Result<VectoredBlobsBuf, std::io::Error> {
|
) -> Result<VectoredBlobsBuf, std::io::Error> {
|
||||||
assert!(read.size() > 0);
|
assert!(read.size() > 0);
|
||||||
assert!(
|
assert!(
|
||||||
@@ -304,35 +306,42 @@ impl<'a> VectoredBlobReader<'a> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
for ((offset, meta), next) in pairs {
|
for ((offset, meta), next) in pairs {
|
||||||
let offset_in_buf = offset - start_offset;
|
let mut offset_in_buf = (offset - start_offset) as usize;
|
||||||
let first_len_byte = buf[offset_in_buf as usize];
|
let compression_alg = if compressed_storage_format {
|
||||||
|
offset_in_buf += 1;
|
||||||
|
CompressionAlgorithm::from_repr(buf[offset_in_buf - 1]).unwrap()
|
||||||
|
} else {
|
||||||
|
CompressionAlgorithm::NoCompression
|
||||||
|
};
|
||||||
|
let first_len_byte = buf[offset_in_buf];
|
||||||
|
|
||||||
// Each blob is prefixed by a header containing it's size.
|
// Each blob is prefixed by a header containing it's size.
|
||||||
// Extract the size and skip that header to find the start of the data.
|
// Extract the size and skip that header to find the start of the data.
|
||||||
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
|
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
|
||||||
// 1 byte case and 1 in the 4 byte case.
|
// 1 byte case and 1 in the 4 byte case.
|
||||||
let (size_length, blob_size) = if first_len_byte < 0x80 {
|
let (size_length, blob_size) = if first_len_byte < 0x80 {
|
||||||
(1, first_len_byte as u64)
|
(1usize, first_len_byte as usize)
|
||||||
} else {
|
} else {
|
||||||
let mut blob_size_buf = [0u8; 4];
|
let mut blob_size_buf = [0u8; 4];
|
||||||
let offset_in_buf = offset_in_buf as usize;
|
|
||||||
|
|
||||||
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
|
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
|
||||||
blob_size_buf[0] &= 0x7f;
|
blob_size_buf[0] &= 0x7f;
|
||||||
(4, u32::from_be_bytes(blob_size_buf) as u64)
|
(4usize, u32::from_be_bytes(blob_size_buf) as usize)
|
||||||
};
|
};
|
||||||
|
|
||||||
let start = offset_in_buf + size_length;
|
let start = offset_in_buf + size_length;
|
||||||
let end = match next {
|
let end = match next {
|
||||||
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
|
Some((next_blob_start_offset, _)) => {
|
||||||
|
(next_blob_start_offset - start_offset) as usize
|
||||||
|
}
|
||||||
None => start + blob_size,
|
None => start + blob_size,
|
||||||
};
|
};
|
||||||
|
|
||||||
assert_eq!(end - start, blob_size);
|
assert_eq!(end - start, blob_size);
|
||||||
|
|
||||||
metas.push(VectoredBlob {
|
metas.push(VectoredBlob {
|
||||||
start: start as usize,
|
start,
|
||||||
end: end as usize,
|
end,
|
||||||
|
compression_alg,
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -188,6 +188,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
|||||||
"max": 1000,
|
"max": 1000,
|
||||||
},
|
},
|
||||||
"trace_read_requests": True,
|
"trace_read_requests": True,
|
||||||
|
"image_layer_compression": "NoCompression",
|
||||||
"walreceiver_connect_timeout": "13m",
|
"walreceiver_connect_timeout": "13m",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
74
test_runner/regress/test_compression.py
Normal file
74
test_runner/regress/test_compression.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# Test image layer compression and log compression ratio
|
||||||
|
#
|
||||||
|
@pytest.mark.timeout(1000)
|
||||||
|
def test_compression(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||||
|
env = neon_simple_env
|
||||||
|
|
||||||
|
def calculate_layers_size(tenant, timeline):
|
||||||
|
timeline_path = "{}/tenants/{}/timelines/{}/".format(
|
||||||
|
env.pageserver.workdir, tenant, timeline
|
||||||
|
)
|
||||||
|
delta_total_size = 0
|
||||||
|
image_total_size = 0
|
||||||
|
for filename in os.listdir(timeline_path):
|
||||||
|
if filename.startswith("00000") and not filename.endswith(".___temp"):
|
||||||
|
size = os.path.getsize(timeline_path + filename)
|
||||||
|
pos = filename.find("__")
|
||||||
|
if pos >= 0:
|
||||||
|
pos = filename.find("-", pos)
|
||||||
|
if pos >= 0:
|
||||||
|
delta_total_size += size
|
||||||
|
else:
|
||||||
|
image_total_size += size
|
||||||
|
log.info(f"Image layers size: {image_total_size}, delta layers size: {delta_total_size}")
|
||||||
|
return image_total_size
|
||||||
|
|
||||||
|
tenant, timeline = env.neon_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
# Use aggressive compaction and checkpoint settings
|
||||||
|
"checkpoint_distance": f"{1024 ** 2}",
|
||||||
|
"compaction_target_size": f"{1024 ** 2}",
|
||||||
|
"compaction_period": "1 s",
|
||||||
|
"compaction_threshold": "1",
|
||||||
|
"image_layer_compression": '"LZ4"',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||||
|
connstr = endpoint.connstr()
|
||||||
|
|
||||||
|
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||||
|
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
|
||||||
|
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
|
||||||
|
|
||||||
|
time.sleep(5) # wait sometime to let background tasks completed at PS
|
||||||
|
compressed_image_size = calculate_layers_size(tenant, timeline)
|
||||||
|
|
||||||
|
tenant, timeline = env.neon_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
# Use aggressive compaction and checkpoint settings
|
||||||
|
"checkpoint_distance": f"{1024 ** 2}",
|
||||||
|
"compaction_target_size": f"{1024 ** 2}",
|
||||||
|
"compaction_period": "1 s",
|
||||||
|
"compaction_threshold": "1",
|
||||||
|
"image_layer_compression": '"NoCompression"',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||||
|
connstr = endpoint.connstr()
|
||||||
|
|
||||||
|
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||||
|
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
|
||||||
|
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
|
||||||
|
|
||||||
|
time.sleep(5) # wait sometime to let background tasks completed at PS
|
||||||
|
raw_image_size = calculate_layers_size(tenant, timeline)
|
||||||
|
log.info(f"Compression ratio: {raw_image_size/compressed_image_size}")
|
||||||
Reference in New Issue
Block a user