Add compaction_target_size parameter

This commit is contained in:
Konstantin Knizhnik
2022-04-01 19:40:39 +03:00
committed by Anastasia Lubennikova
parent bef9b837f1
commit 572b3f48cf
4 changed files with 33 additions and 8 deletions

View File

@@ -30,8 +30,13 @@ pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
@@ -58,6 +63,7 @@ pub mod defaults {
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#gc_period = '{DEFAULT_GC_PERIOD}'
@@ -91,8 +97,13 @@ pub struct PageServerConf {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
pub compaction_period: Duration,
@@ -149,6 +160,7 @@ struct PageServerConfigBuilder {
checkpoint_distance: BuilderValue<u64>,
compaction_target_size: BuilderValue<u64>,
compaction_period: BuilderValue<Duration>,
gc_horizon: BuilderValue<u64>,
@@ -183,6 +195,7 @@ impl Default for PageServerConfigBuilder {
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE),
compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period")),
gc_horizon: Set(DEFAULT_GC_HORIZON),
@@ -220,6 +233,10 @@ impl PageServerConfigBuilder {
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
}
pub fn compaction_target_size(&mut self, compaction_target_size: u64) {
self.compaction_target_size = BuilderValue::Set(compaction_target_size)
}
pub fn compaction_period(&mut self, compaction_period: Duration) {
self.compaction_period = BuilderValue::Set(compaction_period)
}
@@ -290,6 +307,9 @@ impl PageServerConfigBuilder {
checkpoint_distance: self
.checkpoint_distance
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
compaction_target_size: self
.compaction_target_size
.ok_or(anyhow::anyhow!("missing compaction_target_size"))?,
compaction_period: self
.compaction_period
.ok_or(anyhow::anyhow!("missing compaction_period"))?,
@@ -429,6 +449,9 @@ impl PageServerConf {
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
"compaction_target_size" => {
builder.compaction_target_size(parse_toml_u64(key, item)?)
}
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
@@ -565,6 +588,7 @@ impl PageServerConf {
PageServerConf {
id: ZNodeId(0),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
@@ -636,6 +660,7 @@ listen_http_addr = '127.0.0.1:9898'
checkpoint_distance = 111 # in bytes
compaction_target_size = 111 # in bytes
compaction_period = '111 s'
gc_period = '222 s'
@@ -673,6 +698,7 @@ id = 10
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
@@ -717,6 +743,7 @@ id = 10
listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
checkpoint_distance: 111,
compaction_target_size: 111,
compaction_period: Duration::from_secs(111),
gc_horizon: 222,
gc_period: Duration::from_secs(222),

View File

@@ -2,9 +2,6 @@ use crate::repository::{key_range_size, singleton_range, Key};
use postgres_ffi::pg_constants;
use std::ops::Range;
// Target file size, when creating image and delta layers
pub const TARGET_FILE_SIZE_BYTES: u64 = 128 * 1024 * 1024; // 128 MB
///
/// Represents a set of Keys, in a compact form.
///

View File

@@ -1581,7 +1581,8 @@ impl LayeredTimeline {
// Define partitioning schema if needed
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
{
let (partitioning, lsn) = pgdir.repartition(self.get_last_record_lsn())?;
let (partitioning, lsn) =
pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?;
let timer = self.create_images_time_histo.start_timer();
// 2. Create new image layers for partitions that have been modified
// "enough".

View File

@@ -6,7 +6,7 @@
//! walingest.rs handles a few things like implicit relation creation and extension.
//! Clarify that)
//!
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum, TARGET_FILE_SIZE_BYTES};
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::*;
use crate::repository::{Repository, Timeline};
@@ -388,13 +388,13 @@ impl<R: Repository> DatadirTimeline<R> {
Ok(result.to_keyspace())
}
pub fn repartition(&self, lsn: Lsn) -> Result<(KeyPartitioning, Lsn)> {
pub fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
let mut partitioning_guard = self.partitioning.lock().unwrap();
if partitioning_guard.1 == Lsn(0)
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
{
let keyspace = self.collect_keyspace(lsn)?;
let partitioning = keyspace.partition(TARGET_FILE_SIZE_BYTES);
let partitioning = keyspace.partition(partition_size);
*partitioning_guard = (partitioning, lsn);
return Ok((partitioning_guard.0.clone(), lsn));
}
@@ -1215,7 +1215,7 @@ pub fn create_test_timeline<R: Repository>(
timeline_id: zenith_utils::zid::ZTimelineId,
) -> Result<Arc<crate::DatadirTimeline<R>>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = DatadirTimeline::new(tline, crate::layered_repository::tests::TEST_FILE_SIZE / 10);
let tline = DatadirTimeline::new(tline, tline.conf.compaction_target_size / 10);
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;