From 572b3f48cf1fb1217efc8067fde2597f38dfa447 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 1 Apr 2022 19:40:39 +0300 Subject: [PATCH] Add compaction_target_size parameter --- pageserver/src/config.rs | 27 +++++++++++++++++++++++++++ pageserver/src/keyspace.rs | 3 --- pageserver/src/layered_repository.rs | 3 ++- pageserver/src/pgdatadir_mapping.rs | 8 ++++---- 4 files changed, 33 insertions(+), 8 deletions(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 9f7cd34a7a..0d5cac8b4f 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -30,8 +30,13 @@ pub mod defaults { // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. + // This parameter actually determines L0 layer file size. pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; + // Target file size, when creating image and delta layers. + // This parameter determines L1 layer file size. + pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024; + pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s"; pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; @@ -58,6 +63,7 @@ pub mod defaults { #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}' #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes +#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes #compaction_period = '{DEFAULT_COMPACTION_PERIOD}' #gc_period = '{DEFAULT_GC_PERIOD}' @@ -91,8 +97,13 @@ pub struct PageServerConf { // Flush out an inmemory layer, if it's holding WAL older than this // This puts a backstop on how much WAL needs to be re-digested if the // page server crashes. + // This parameter actually determines L0 layer file size. pub checkpoint_distance: u64, + // Target file size, when creating image and delta layers. + // This parameter determines L1 layer file size. + pub compaction_target_size: u64, + // How often to check if there's compaction work to be done. pub compaction_period: Duration, @@ -149,6 +160,7 @@ struct PageServerConfigBuilder { checkpoint_distance: BuilderValue, + compaction_target_size: BuilderValue, compaction_period: BuilderValue, gc_horizon: BuilderValue, @@ -183,6 +195,7 @@ impl Default for PageServerConfigBuilder { listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()), listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()), checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE), + compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE), compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD) .expect("cannot parse default compaction period")), gc_horizon: Set(DEFAULT_GC_HORIZON), @@ -220,6 +233,10 @@ impl PageServerConfigBuilder { self.checkpoint_distance = BuilderValue::Set(checkpoint_distance) } + pub fn compaction_target_size(&mut self, compaction_target_size: u64) { + self.compaction_target_size = BuilderValue::Set(compaction_target_size) + } + pub fn compaction_period(&mut self, compaction_period: Duration) { self.compaction_period = BuilderValue::Set(compaction_period) } @@ -290,6 +307,9 @@ impl PageServerConfigBuilder { checkpoint_distance: self .checkpoint_distance .ok_or(anyhow::anyhow!("missing checkpoint_distance"))?, + compaction_target_size: self + .compaction_target_size + .ok_or(anyhow::anyhow!("missing compaction_target_size"))?, compaction_period: self .compaction_period .ok_or(anyhow::anyhow!("missing compaction_period"))?, @@ -429,6 +449,9 @@ impl PageServerConf { "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?), "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?), "checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?), + "compaction_target_size" => { + builder.compaction_target_size(parse_toml_u64(key, item)?) + } "compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?), "gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?), "gc_period" => builder.gc_period(parse_toml_duration(key, item)?), @@ -565,6 +588,7 @@ impl PageServerConf { PageServerConf { id: ZNodeId(0), checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, + compaction_target_size: 4 * 1024 * 1024, compaction_period: Duration::from_secs(10), gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), @@ -636,6 +660,7 @@ listen_http_addr = '127.0.0.1:9898' checkpoint_distance = 111 # in bytes +compaction_target_size = 111 # in bytes compaction_period = '111 s' gc_period = '222 s' @@ -673,6 +698,7 @@ id = 10 listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, + compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE, compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?, gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?, @@ -717,6 +743,7 @@ id = 10 listen_pg_addr: "127.0.0.1:64000".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(), checkpoint_distance: 111, + compaction_target_size: 111, compaction_period: Duration::from_secs(111), gc_horizon: 222, gc_period: Duration::from_secs(222), diff --git a/pageserver/src/keyspace.rs b/pageserver/src/keyspace.rs index 9973568b07..f6f0d7b7cf 100644 --- a/pageserver/src/keyspace.rs +++ b/pageserver/src/keyspace.rs @@ -2,9 +2,6 @@ use crate::repository::{key_range_size, singleton_range, Key}; use postgres_ffi::pg_constants; use std::ops::Range; -// Target file size, when creating image and delta layers -pub const TARGET_FILE_SIZE_BYTES: u64 = 128 * 1024 * 1024; // 128 MB - /// /// Represents a set of Keys, in a compact form. /// diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 60b0e921ce..2d9b680624 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1581,7 +1581,8 @@ impl LayeredTimeline { // Define partitioning schema if needed if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid) { - let (partitioning, lsn) = pgdir.repartition(self.get_last_record_lsn())?; + let (partitioning, lsn) = + pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?; let timer = self.create_images_time_histo.start_timer(); // 2. Create new image layers for partitions that have been modified // "enough". diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 2e0040f0c0..af12084766 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -6,7 +6,7 @@ //! walingest.rs handles a few things like implicit relation creation and extension. //! Clarify that) //! -use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum, TARGET_FILE_SIZE_BYTES}; +use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum}; use crate::reltag::{RelTag, SlruKind}; use crate::repository::*; use crate::repository::{Repository, Timeline}; @@ -388,13 +388,13 @@ impl DatadirTimeline { Ok(result.to_keyspace()) } - pub fn repartition(&self, lsn: Lsn) -> Result<(KeyPartitioning, Lsn)> { + pub fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> { let mut partitioning_guard = self.partitioning.lock().unwrap(); if partitioning_guard.1 == Lsn(0) || lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold { let keyspace = self.collect_keyspace(lsn)?; - let partitioning = keyspace.partition(TARGET_FILE_SIZE_BYTES); + let partitioning = keyspace.partition(partition_size); *partitioning_guard = (partitioning, lsn); return Ok((partitioning_guard.0.clone(), lsn)); } @@ -1215,7 +1215,7 @@ pub fn create_test_timeline( timeline_id: zenith_utils::zid::ZTimelineId, ) -> Result>> { let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; - let tline = DatadirTimeline::new(tline, crate::layered_repository::tests::TEST_FILE_SIZE / 10); + let tline = DatadirTimeline::new(tline, tline.conf.compaction_target_size / 10); let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; m.commit()?;