From 19d59e58d2a3bd547605bdf8dc5a3c171b6989ae Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 13 Mar 2024 15:52:38 +0200 Subject: [PATCH] Use CompressionAlgorithm enum --- control_plane/src/pageserver.rs | 16 ++++++------- libs/pageserver_api/src/models.rs | 19 ++++++++++++++- pageserver/src/config.rs | 6 ++--- pageserver/src/http/openapi_spec.yml | 4 ++-- pageserver/src/lib.rs | 4 ---- pageserver/src/tenant.rs | 9 ++++---- pageserver/src/tenant/blob_io.rs | 23 ++++++++----------- pageserver/src/tenant/config.rs | 23 +++++++++++-------- .../src/tenant/storage_layer/image_layer.rs | 13 +++++------ pageserver/src/tenant/timeline.rs | 10 ++++---- pageserver/src/tenant/vectored_blob_io.rs | 12 +++++----- 11 files changed, 75 insertions(+), 64 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index b9e226ebcd..e0111c214f 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -384,11 +384,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'trace_read_requests' as bool")?, - compress_image_layer: settings - .remove("compress_image_layer") - .map(|x| x.parse::()) + image_layer_compression: settings + .remove("image_layer_compression") + .map(serde_json::from_str) .transpose() - .context("Failed to parse 'compress_image_layer' as bool")?, + .context("Failed to parse 'image_layer_compression' as bool")?, eviction_policy: settings .remove("eviction_policy") .map(serde_json::from_str) @@ -501,11 +501,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'trace_read_requests' as bool")?, - compress_image_layer: settings - .remove("compress_image_layer") - .map(|x| x.parse::()) + image_layer_compression: settings + .remove("image_layer_compression") + .map(serde_json::from_str) .transpose() - .context("Failed to parse 'compress_image_layer' as bool")?, + .context("Failed to parse 'image_layer_compression' json")?, eviction_policy: settings .remove("eviction_policy") .map(serde_json::from_str) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 259c080483..6145ae4a9b 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -294,7 +294,7 @@ pub struct TenantConfig { pub lagging_wal_timeout: Option, pub max_lsn_wal_lag: Option, pub trace_read_requests: Option, - pub compress_image_layer: Option, + pub image_layer_compression: Option, pub eviction_policy: Option, pub min_resident_size_override: Option, pub evictions_low_residence_duration_metric_threshold: Option, @@ -328,6 +328,23 @@ pub enum CompactionAlgorithm { Tiered, } +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Serialize, + Deserialize, + strum_macros::FromRepr, + enum_map::Enum, +)] +#[repr(u8)] +pub enum CompressionAlgorithm { + NoCompression, + LZ4, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct EvictionPolicyLayerAccessThreshold { #[serde(with = "humantime_serde")] diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index fcd1e98b51..2985af66f9 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -1538,7 +1538,7 @@ broker_endpoint = '{broker_endpoint}' let broker_endpoint = "http://127.0.0.1:7777"; let trace_read_requests = true; - let compress_image_layer = true; + let image_layer_compression = pageserver_api::models::CompressionAlgorithm::LZ4; let config_string = format!( r#"{ALL_BASE_VALUES_TOML} @@ -1547,7 +1547,7 @@ broker_endpoint = '{broker_endpoint}' [tenant_config] trace_read_requests = {trace_read_requests} -compress_image_layer = {compress_image_layer}"#, +image_layer_compression = 'LZ4'"#, ); let toml = config_string.parse()?; @@ -1558,7 +1558,7 @@ compress_image_layer = {compress_image_layer}"#, "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants", ); assert_eq!( - conf.default_tenant_conf.compress_image_layer, compress_image_layer, + conf.default_tenant_conf.image_layer_compression, image_layer_compression, "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants", ); diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 263f56a935..6367096ef2 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -1444,8 +1444,8 @@ components: type: integer trace_read_requests: type: boolean - compress_image_layer: - type: boolean + image_layer_compression: + type: string heatmap_period: type: string TenantConfigResponse: diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 83f549d7e8..c160851b40 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -45,10 +45,6 @@ pub const STORAGE_FORMAT_VERSION: u16 = 4; /// Minimal sorage format version with compression support pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4; -/// Page image compression algorithm -pub const NO_COMPRESSION: u8 = 0; -pub const LZ4_COMPRESSION: u8 = 1; - pub const DEFAULT_PG_VERSION: u32 = 15; // Magic constants used to identify different kinds of files diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 94b5b00194..bcaa092488 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use pageserver_api::models; +use pageserver_api::models::CompressionAlgorithm; use pageserver_api::models::TimelineState; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::ShardIdentity; @@ -2288,11 +2289,11 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.trace_read_requests) } - pub fn get_compress_image_layer(&self) -> bool { + pub fn get_image_layer_compression(&self) -> CompressionAlgorithm { let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf - .compress_image_layer - .unwrap_or(self.conf.default_tenant_conf.compress_image_layer) + .image_layer_compression + .unwrap_or(self.conf.default_tenant_conf.image_layer_compression) } pub fn get_min_resident_size_override(&self) -> Option { @@ -3644,7 +3645,7 @@ pub(crate) mod harness { lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout), max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag), trace_read_requests: Some(tenant_conf.trace_read_requests), - compress_image_layer: Some(tenant_conf.compress_image_layer), + image_layer_compression: Some(tenant_conf.image_layer_compression), eviction_policy: Some(tenant_conf.eviction_policy), min_resident_size_override: tenant_conf.min_resident_size_override, evictions_low_residence_duration_metric_threshold: Some( diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 5b6ce25781..032a251969 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -18,8 +18,8 @@ use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use crate::virtual_file::VirtualFile; -use crate::{LZ4_COMPRESSION, NO_COMPRESSION}; use lz4_flex; +use pageserver_api::models::CompressionAlgorithm; use postgres_ffi::BLCKSZ; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -46,14 +46,14 @@ impl<'a> BlockCursor<'a> { let off = (offset % PAGE_SZ as u64) as usize; let buf = self.read_blk(blknum, ctx).await?; - let compression_alg = buf[off]; + let compression_alg = CompressionAlgorithm::from_repr(buf[off]).unwrap(); let res = self.read_blob(offset + 1, ctx).await?; - if compression_alg == LZ4_COMPRESSION { + if compression_alg == CompressionAlgorithm::LZ4 { lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| { std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error") }) } else { - assert_eq!(compression_alg, NO_COMPRESSION); + assert_eq!(compression_alg, CompressionAlgorithm::NoCompression); Ok(res) } } @@ -240,10 +240,9 @@ impl BlobWriter { pub async fn write_compressed_blob( &mut self, srcbuf: Bytes, - new_format: bool, + compression: CompressionAlgorithm, ) -> Result { let offset = self.offset; - let len = srcbuf.len(); let mut io_buf = self.io_buf.take().expect("we always put it back below"); @@ -251,9 +250,7 @@ impl BlobWriter { let mut is_compressed = false; if len < 128 { // Short blob. Write a 1-byte length header - if new_format { - io_buf.put_u8(NO_COMPRESSION); - } + io_buf.put_u8(CompressionAlgorithm::NoCompression as u8); io_buf.put_u8(len as u8); } else { // Write a 4-byte length header @@ -263,10 +260,10 @@ impl BlobWriter { format!("blob too large ({} bytes)", len), )); } - if new_format && len == BLCKSZ as usize { + if compression == CompressionAlgorithm::LZ4 && len == BLCKSZ as usize { let compressed = lz4_flex::block::compress(&srcbuf); if compressed.len() < len { - io_buf.put_u8(LZ4_COMPRESSION); + io_buf.put_u8(compression as u8); let mut len_buf = (compressed.len() as u32).to_be_bytes(); len_buf[0] |= 0x80; io_buf.extend_from_slice(&len_buf[..]); @@ -275,9 +272,7 @@ impl BlobWriter { } } if !is_compressed { - if new_format { - io_buf.put_u8(NO_COMPRESSION); - } + io_buf.put_u8(CompressionAlgorithm::NoCompression as u8); let mut len_buf = (len as u32).to_be_bytes(); len_buf[0] |= 0x80; io_buf.extend_from_slice(&len_buf[..]); diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index b8fab30c97..a4759e496e 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -9,9 +9,9 @@ //! may lead to a data loss. //! use anyhow::bail; -use pageserver_api::models::CompactionAlgorithm; -use pageserver_api::models::EvictionPolicy; -use pageserver_api::models::{self, ThrottleConfig}; +use pageserver_api::models::{ + self, CompactionAlgorithm, CompressionAlgorithm, EvictionPolicy, ThrottleConfig, +}; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; use serde::de::IntoDeserializer; use serde::{Deserialize, Serialize}; @@ -41,6 +41,9 @@ pub mod defaults { pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm = super::CompactionAlgorithm::Legacy; + pub const DEFAULT_COMPRESSION_ALGORITHM: super::CompressionAlgorithm = + super::CompressionAlgorithm::LZ4; + pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; // Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger. @@ -345,7 +348,7 @@ pub struct TenantConf { /// to avoid eager reconnects. pub max_lsn_wal_lag: NonZeroU64, pub trace_read_requests: bool, - pub compress_image_layer: bool, + pub image_layer_compression: CompressionAlgorithm, pub eviction_policy: EvictionPolicy, pub min_resident_size_override: Option, // See the corresponding metric's help string. @@ -432,7 +435,7 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] - pub compress_image_layer: Option, + pub image_layer_compression: Option, #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] @@ -497,9 +500,9 @@ impl TenantConfOpt { trace_read_requests: self .trace_read_requests .unwrap_or(global_conf.trace_read_requests), - compress_image_layer: self - .compress_image_layer - .unwrap_or(global_conf.compress_image_layer), + image_layer_compression: self + .image_layer_compression + .unwrap_or(global_conf.image_layer_compression), eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy), min_resident_size_override: self .min_resident_size_override @@ -546,7 +549,7 @@ impl Default for TenantConf { max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG) .expect("cannot parse default max walreceiver Lsn wal lag"), trace_read_requests: false, - compress_image_layer: true, + image_layer_compression: DEFAULT_COMPRESSION_ALGORITHM, eviction_policy: EvictionPolicy::NoEviction, min_resident_size_override: None, evictions_low_residence_duration_metric_threshold: humantime::parse_duration( @@ -621,7 +624,7 @@ impl From for models::TenantConfig { lagging_wal_timeout: value.lagging_wal_timeout.map(humantime), max_lsn_wal_lag: value.max_lsn_wal_lag, trace_read_requests: value.trace_read_requests, - compress_image_layer: value.compress_image_layer, + image_layer_compression: value.image_layer_compression, eviction_policy: value.eviction_policy, min_resident_size_override: value.min_resident_size_override, evictions_low_residence_duration_metric_threshold: value diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 51cac59b26..072ecce30f 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -40,15 +40,14 @@ use crate::tenant::vectored_blob_io::{ use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{ - COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, LZ4_COMPRESSION, STORAGE_FORMAT_VERSION, - TEMP_FILE_SUFFIX, + COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX, }; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use hex; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::LayerAccessKind; +use pageserver_api::models::{CompressionAlgorithm, LayerAccessKind}; use pageserver_api::shard::TenantShardId; use postgres_ffi::BLCKSZ; use rand::{distributions::Alphanumeric, Rng}; @@ -548,11 +547,11 @@ impl ImageLayerInner { .into(); let vectored_blob_reader = VectoredBlobReader::new(&self.file); - let compressed = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION; + let compressed_storage_format = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION; for read in reads.into_iter() { let buf = BytesMut::with_capacity(max_vectored_read_bytes); let res = vectored_blob_reader - .read_blobs(&read, buf, compressed) + .read_blobs(&read, buf, compressed_storage_format) .await; match res { @@ -561,7 +560,7 @@ impl ImageLayerInner { for meta in blobs_buf.blobs.iter() { let img_buf = frozen_buf.slice(meta.start..meta.end); - if meta.compression_alg == LZ4_COMPRESSION { + if meta.compression_alg == CompressionAlgorithm::LZ4 { match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) { Ok(decompressed) => { reconstruct_state.update_key( @@ -623,7 +622,7 @@ struct ImageLayerWriterInner { timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_range: Range, - compression: bool, + compression: CompressionAlgorithm, lsn: Lsn, blob_writer: BlobWriter, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ce255e216f..3a9662356e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -19,8 +19,8 @@ use pageserver_api::{ key::AUX_FILES_KEY, keyspace::KeySpaceAccum, models::{ - CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, - EvictionPolicy, LayerMapInfo, TimelineState, + CompactionAlgorithm, CompressionAlgorithm, DownloadRemoteLayersTaskInfo, + DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState, }, reltag::BlockNumber, shard::{ShardIdentity, TenantShardId}, @@ -1515,11 +1515,11 @@ impl Timeline { .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold) } - pub fn get_image_layer_compression(&self) -> bool { + pub fn get_image_layer_compression(&self) -> CompressionAlgorithm { let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf - .compress_image_layer - .unwrap_or(self.conf.default_tenant_conf.compress_image_layer) + .image_layer_compression + .unwrap_or(self.conf.default_tenant_conf.image_layer_compression) } pub(super) fn tenant_conf_updated(&self) { diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 17e618b4e0..3fbfec0ccc 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -15,9 +15,9 @@ //! //! Note that the vectored blob api does *not* go through the page cache. -use crate::NO_COMPRESSION; use bytes::BytesMut; use pageserver_api::key::Key; +use pageserver_api::models::CompressionAlgorithm; use std::collections::BTreeMap; use std::num::NonZeroUsize; use utils::lsn::Lsn; @@ -40,7 +40,7 @@ pub struct VectoredBlob { pub start: usize, pub end: usize, pub meta: BlobMeta, - pub compression_alg: u8, + pub compression_alg: CompressionAlgorithm, } /// Return type of [`VectoredBlobReader::read_blobs`] @@ -275,7 +275,7 @@ impl<'a> VectoredBlobReader<'a> { &self, read: &VectoredRead, buf: BytesMut, - new_storage_format: bool, + compressed_storage_format: bool, ) -> Result { assert!(read.size() > 0); assert!( @@ -307,11 +307,11 @@ impl<'a> VectoredBlobReader<'a> { for ((offset, meta), next) in pairs { let mut offset_in_buf = (offset - start_offset) as usize; - let compression_alg = if new_storage_format { + let compression_alg = if compressed_storage_format { offset_in_buf += 1; - buf[offset_in_buf - 1] + CompressionAlgorithm::from_repr(buf[offset_in_buf - 1]).unwrap() } else { - NO_COMPRESSION + CompressionAlgorithm::NoCompression }; let first_len_byte = buf[offset_in_buf];