mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
Use CompressionAlgorithm enum
This commit is contained in:
@@ -384,11 +384,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
compress_image_layer: settings
|
||||
.remove("compress_image_layer")
|
||||
.map(|x| x.parse::<bool>())
|
||||
image_layer_compression: settings
|
||||
.remove("image_layer_compression")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'compress_image_layer' as bool")?,
|
||||
.context("Failed to parse 'image_layer_compression' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
@@ -501,11 +501,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
compress_image_layer: settings
|
||||
.remove("compress_image_layer")
|
||||
.map(|x| x.parse::<bool>())
|
||||
image_layer_compression: settings
|
||||
.remove("image_layer_compression")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'compress_image_layer' as bool")?,
|
||||
.context("Failed to parse 'image_layer_compression' json")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
|
||||
@@ -294,7 +294,7 @@ pub struct TenantConfig {
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
pub compress_image_layer: Option<bool>,
|
||||
pub image_layer_compression: Option<CompressionAlgorithm>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
@@ -328,6 +328,23 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[repr(u8)]
|
||||
pub enum CompressionAlgorithm {
|
||||
NoCompression,
|
||||
LZ4,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct EvictionPolicyLayerAccessThreshold {
|
||||
#[serde(with = "humantime_serde")]
|
||||
|
||||
@@ -1538,7 +1538,7 @@ broker_endpoint = '{broker_endpoint}'
|
||||
|
||||
let broker_endpoint = "http://127.0.0.1:7777";
|
||||
let trace_read_requests = true;
|
||||
let compress_image_layer = true;
|
||||
let image_layer_compression = pageserver_api::models::CompressionAlgorithm::LZ4;
|
||||
|
||||
let config_string = format!(
|
||||
r#"{ALL_BASE_VALUES_TOML}
|
||||
@@ -1547,7 +1547,7 @@ broker_endpoint = '{broker_endpoint}'
|
||||
|
||||
[tenant_config]
|
||||
trace_read_requests = {trace_read_requests}
|
||||
compress_image_layer = {compress_image_layer}"#,
|
||||
image_layer_compression = 'LZ4'"#,
|
||||
);
|
||||
|
||||
let toml = config_string.parse()?;
|
||||
@@ -1558,7 +1558,7 @@ compress_image_layer = {compress_image_layer}"#,
|
||||
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
||||
);
|
||||
assert_eq!(
|
||||
conf.default_tenant_conf.compress_image_layer, compress_image_layer,
|
||||
conf.default_tenant_conf.image_layer_compression, image_layer_compression,
|
||||
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
||||
);
|
||||
|
||||
|
||||
@@ -1444,8 +1444,8 @@ components:
|
||||
type: integer
|
||||
trace_read_requests:
|
||||
type: boolean
|
||||
compress_image_layer:
|
||||
type: boolean
|
||||
image_layer_compression:
|
||||
type: string
|
||||
heatmap_period:
|
||||
type: string
|
||||
TenantConfigResponse:
|
||||
|
||||
@@ -45,10 +45,6 @@ pub const STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
/// Minimal sorage format version with compression support
|
||||
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Page image compression algorithm
|
||||
pub const NO_COMPRESSION: u8 = 0;
|
||||
pub const LZ4_COMPRESSION: u8 = 1;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
|
||||
// Magic constants used to identify different kinds of files
|
||||
|
||||
@@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::CompressionAlgorithm;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::models::WalRedoManagerStatus;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
@@ -2288,11 +2289,11 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn get_compress_image_layer(&self) -> bool {
|
||||
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compress_image_layer
|
||||
.unwrap_or(self.conf.default_tenant_conf.compress_image_layer)
|
||||
.image_layer_compression
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
|
||||
}
|
||||
|
||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||
@@ -3644,7 +3645,7 @@ pub(crate) mod harness {
|
||||
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
||||
compress_image_layer: Some(tenant_conf.compress_image_layer),
|
||||
image_layer_compression: Some(tenant_conf.image_layer_compression),
|
||||
eviction_policy: Some(tenant_conf.eviction_policy),
|
||||
min_resident_size_override: tenant_conf.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: Some(
|
||||
|
||||
@@ -18,8 +18,8 @@ use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
|
||||
use lz4_flex;
|
||||
use pageserver_api::models::CompressionAlgorithm;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
@@ -46,14 +46,14 @@ impl<'a> BlockCursor<'a> {
|
||||
let off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let buf = self.read_blk(blknum, ctx).await?;
|
||||
let compression_alg = buf[off];
|
||||
let compression_alg = CompressionAlgorithm::from_repr(buf[off]).unwrap();
|
||||
let res = self.read_blob(offset + 1, ctx).await?;
|
||||
if compression_alg == LZ4_COMPRESSION {
|
||||
if compression_alg == CompressionAlgorithm::LZ4 {
|
||||
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||
})
|
||||
} else {
|
||||
assert_eq!(compression_alg, NO_COMPRESSION);
|
||||
assert_eq!(compression_alg, CompressionAlgorithm::NoCompression);
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
@@ -240,10 +240,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
pub async fn write_compressed_blob(
|
||||
&mut self,
|
||||
srcbuf: Bytes,
|
||||
new_format: bool,
|
||||
compression: CompressionAlgorithm,
|
||||
) -> Result<u64, Error> {
|
||||
let offset = self.offset;
|
||||
|
||||
let len = srcbuf.len();
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
@@ -251,9 +250,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
let mut is_compressed = false;
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
if new_format {
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
}
|
||||
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
|
||||
io_buf.put_u8(len as u8);
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
@@ -263,10 +260,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
format!("blob too large ({} bytes)", len),
|
||||
));
|
||||
}
|
||||
if new_format && len == BLCKSZ as usize {
|
||||
if compression == CompressionAlgorithm::LZ4 && len == BLCKSZ as usize {
|
||||
let compressed = lz4_flex::block::compress(&srcbuf);
|
||||
if compressed.len() < len {
|
||||
io_buf.put_u8(LZ4_COMPRESSION);
|
||||
io_buf.put_u8(compression as u8);
|
||||
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
@@ -275,9 +272,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
}
|
||||
}
|
||||
if !is_compressed {
|
||||
if new_format {
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
}
|
||||
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
|
||||
@@ -9,9 +9,9 @@
|
||||
//! may lead to a data loss.
|
||||
//!
|
||||
use anyhow::bail;
|
||||
use pageserver_api::models::CompactionAlgorithm;
|
||||
use pageserver_api::models::EvictionPolicy;
|
||||
use pageserver_api::models::{self, ThrottleConfig};
|
||||
use pageserver_api::models::{
|
||||
self, CompactionAlgorithm, CompressionAlgorithm, EvictionPolicy, ThrottleConfig,
|
||||
};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::de::IntoDeserializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -41,6 +41,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
|
||||
super::CompactionAlgorithm::Legacy;
|
||||
|
||||
pub const DEFAULT_COMPRESSION_ALGORITHM: super::CompressionAlgorithm =
|
||||
super::CompressionAlgorithm::LZ4;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||
@@ -345,7 +348,7 @@ pub struct TenantConf {
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub trace_read_requests: bool,
|
||||
pub compress_image_layer: bool,
|
||||
pub image_layer_compression: CompressionAlgorithm,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
@@ -432,7 +435,7 @@ pub struct TenantConfOpt {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub compress_image_layer: Option<bool>,
|
||||
pub image_layer_compression: Option<CompressionAlgorithm>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
@@ -497,9 +500,9 @@ impl TenantConfOpt {
|
||||
trace_read_requests: self
|
||||
.trace_read_requests
|
||||
.unwrap_or(global_conf.trace_read_requests),
|
||||
compress_image_layer: self
|
||||
.compress_image_layer
|
||||
.unwrap_or(global_conf.compress_image_layer),
|
||||
image_layer_compression: self
|
||||
.image_layer_compression
|
||||
.unwrap_or(global_conf.image_layer_compression),
|
||||
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
|
||||
min_resident_size_override: self
|
||||
.min_resident_size_override
|
||||
@@ -546,7 +549,7 @@ impl Default for TenantConf {
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
trace_read_requests: false,
|
||||
compress_image_layer: true,
|
||||
image_layer_compression: DEFAULT_COMPRESSION_ALGORITHM,
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
@@ -621,7 +624,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
trace_read_requests: value.trace_read_requests,
|
||||
compress_image_layer: value.compress_image_layer,
|
||||
image_layer_compression: value.image_layer_compression,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
|
||||
@@ -40,15 +40,14 @@ use crate::tenant::vectored_blob_io::{
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{
|
||||
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, LZ4_COMPRESSION, STORAGE_FORMAT_VERSION,
|
||||
TEMP_FILE_SUFFIX,
|
||||
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||
};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{CompressionAlgorithm, LayerAccessKind};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
@@ -548,11 +547,11 @@ impl ImageLayerInner {
|
||||
.into();
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
let compressed = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION;
|
||||
let compressed_storage_format = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION;
|
||||
for read in reads.into_iter() {
|
||||
let buf = BytesMut::with_capacity(max_vectored_read_bytes);
|
||||
let res = vectored_blob_reader
|
||||
.read_blobs(&read, buf, compressed)
|
||||
.read_blobs(&read, buf, compressed_storage_format)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
@@ -561,7 +560,7 @@ impl ImageLayerInner {
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
if meta.compression_alg == LZ4_COMPRESSION {
|
||||
if meta.compression_alg == CompressionAlgorithm::LZ4 {
|
||||
match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) {
|
||||
Ok(decompressed) => {
|
||||
reconstruct_state.update_key(
|
||||
@@ -623,7 +622,7 @@ struct ImageLayerWriterInner {
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: Range<Key>,
|
||||
compression: bool,
|
||||
compression: CompressionAlgorithm,
|
||||
lsn: Lsn,
|
||||
|
||||
blob_writer: BlobWriter<false>,
|
||||
|
||||
@@ -19,8 +19,8 @@ use pageserver_api::{
|
||||
key::AUX_FILES_KEY,
|
||||
keyspace::KeySpaceAccum,
|
||||
models::{
|
||||
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
EvictionPolicy, LayerMapInfo, TimelineState,
|
||||
CompactionAlgorithm, CompressionAlgorithm, DownloadRemoteLayersTaskInfo,
|
||||
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState,
|
||||
},
|
||||
reltag::BlockNumber,
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -1515,11 +1515,11 @@ impl Timeline {
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
pub fn get_image_layer_compression(&self) -> bool {
|
||||
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compress_image_layer
|
||||
.unwrap_or(self.conf.default_tenant_conf.compress_image_layer)
|
||||
.image_layer_compression
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
|
||||
@@ -15,9 +15,9 @@
|
||||
//!
|
||||
//! Note that the vectored blob api does *not* go through the page cache.
|
||||
|
||||
use crate::NO_COMPRESSION;
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::CompressionAlgorithm;
|
||||
use std::collections::BTreeMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -40,7 +40,7 @@ pub struct VectoredBlob {
|
||||
pub start: usize,
|
||||
pub end: usize,
|
||||
pub meta: BlobMeta,
|
||||
pub compression_alg: u8,
|
||||
pub compression_alg: CompressionAlgorithm,
|
||||
}
|
||||
|
||||
/// Return type of [`VectoredBlobReader::read_blobs`]
|
||||
@@ -275,7 +275,7 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
&self,
|
||||
read: &VectoredRead,
|
||||
buf: BytesMut,
|
||||
new_storage_format: bool,
|
||||
compressed_storage_format: bool,
|
||||
) -> Result<VectoredBlobsBuf, std::io::Error> {
|
||||
assert!(read.size() > 0);
|
||||
assert!(
|
||||
@@ -307,11 +307,11 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
|
||||
for ((offset, meta), next) in pairs {
|
||||
let mut offset_in_buf = (offset - start_offset) as usize;
|
||||
let compression_alg = if new_storage_format {
|
||||
let compression_alg = if compressed_storage_format {
|
||||
offset_in_buf += 1;
|
||||
buf[offset_in_buf - 1]
|
||||
CompressionAlgorithm::from_repr(buf[offset_in_buf - 1]).unwrap()
|
||||
} else {
|
||||
NO_COMPRESSION
|
||||
CompressionAlgorithm::NoCompression
|
||||
};
|
||||
let first_len_byte = buf[offset_in_buf];
|
||||
|
||||
|
||||
Reference in New Issue
Block a user