mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
Add pageserver checkpoint_timeout option.
To flush inmemory layer eventually when no new data arrives, which helps safekeepers to suspend activity (stop pushing to the broker). Default 10m should be ok.
This commit is contained in:
@@ -401,6 +401,7 @@ impl PageServerNode {
|
||||
.get("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.get("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
@@ -455,6 +456,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.get("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
|
||||
@@ -15,7 +15,7 @@ listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
|
||||
checkpoint_distance = '268435456' # in bytes
|
||||
checkpoint_period = '1 s'
|
||||
checkpoint_timeout = '10m'
|
||||
|
||||
gc_period = '100 s'
|
||||
gc_horizon = '67108864'
|
||||
@@ -46,7 +46,7 @@ Note the `[remote_storage]` section: it's a [table](https://toml.io/en/v1.0.0#ta
|
||||
|
||||
All values can be passed as an argument to the pageserver binary, using the `-c` parameter and specified as a valid TOML string. All tables should be passed in the inline form.
|
||||
|
||||
Example: `${PAGESERVER_BIN} -c "checkpoint_period = '100 s'" -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
Example: `${PAGESERVER_BIN} -c "checkpoint_timeout = '10 m'" -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
|
||||
Note that TOML distinguishes between strings and integers, the former require single or double quotes around them.
|
||||
|
||||
@@ -82,6 +82,14 @@ S3.
|
||||
|
||||
The unit is # of bytes.
|
||||
|
||||
#### checkpoint_timeout
|
||||
|
||||
Apart from `checkpoint_distance`, open layer flushing is also triggered
|
||||
`checkpoint_timeout` after the last flush. This makes WAL eventually uploaded to
|
||||
s3 when activity is stopped.
|
||||
|
||||
The default is 10m.
|
||||
|
||||
#### compaction_period
|
||||
|
||||
Every `compaction_period` seconds, the page server checks if
|
||||
|
||||
@@ -59,6 +59,7 @@ pub mod defaults {
|
||||
|
||||
# [tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
|
||||
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
|
||||
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
|
||||
@@ -452,6 +453,13 @@ impl PageServerConf {
|
||||
Some(parse_toml_u64("checkpoint_distance", checkpoint_distance)?);
|
||||
}
|
||||
|
||||
if let Some(checkpoint_timeout) = item.get("checkpoint_timeout") {
|
||||
t_conf.checkpoint_timeout = Some(parse_toml_duration(
|
||||
"checkpoint_timeout",
|
||||
checkpoint_timeout,
|
||||
)?);
|
||||
}
|
||||
|
||||
if let Some(compaction_target_size) = item.get("compaction_target_size") {
|
||||
t_conf.compaction_target_size = Some(parse_toml_u64(
|
||||
"compaction_target_size",
|
||||
|
||||
@@ -32,6 +32,7 @@ pub struct TenantCreateRequest {
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_tenant_id: Option<ZTenantId>,
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
@@ -70,6 +71,7 @@ pub struct TenantConfigRequest {
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
@@ -87,6 +89,7 @@ impl TenantConfigRequest {
|
||||
TenantConfigRequest {
|
||||
tenant_id,
|
||||
checkpoint_distance: None,
|
||||
checkpoint_timeout: None,
|
||||
compaction_target_size: None,
|
||||
compaction_period: None,
|
||||
compaction_threshold: None,
|
||||
|
||||
@@ -560,6 +560,8 @@ components:
|
||||
type: string
|
||||
checkpoint_distance:
|
||||
type: integer
|
||||
checkpoint_timeout:
|
||||
type: string
|
||||
compaction_period:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
@@ -578,6 +580,8 @@ components:
|
||||
type: string
|
||||
checkpoint_distance:
|
||||
type: integer
|
||||
checkpoint_timeout:
|
||||
type: string
|
||||
compaction_period:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
|
||||
@@ -623,6 +623,11 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
tenant_conf.checkpoint_timeout =
|
||||
Some(humantime::parse_duration(&checkpoint_timeout).map_err(ApiError::from_err)?);
|
||||
}
|
||||
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
@@ -683,6 +688,10 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
tenant_conf.checkpoint_timeout =
|
||||
Some(humantime::parse_duration(&checkpoint_timeout).map_err(ApiError::from_err)?);
|
||||
}
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
|
||||
@@ -433,6 +433,13 @@ impl LayeredRepository {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::ops::{Deref, Range};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{self, AtomicBool, AtomicIsize, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use metrics::{
|
||||
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
|
||||
@@ -233,6 +233,8 @@ pub struct LayeredTimeline {
|
||||
pub layers: RwLock<LayerMap>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
|
||||
// WAL redo manager
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
|
||||
@@ -560,6 +562,13 @@ impl LayeredTimeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
@@ -649,6 +658,7 @@ impl LayeredTimeline {
|
||||
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0),
|
||||
|
||||
last_freeze_at: AtomicLsn::new(metadata.disk_consistent_lsn().0),
|
||||
last_freeze_ts: RwLock::new(Instant::now()),
|
||||
|
||||
ancestor_timeline: ancestor,
|
||||
ancestor_lsn: metadata.ancestor_lsn(),
|
||||
@@ -1094,8 +1104,11 @@ impl LayeredTimeline {
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated
|
||||
/// in the in-memory layer, and initiate flushing it if so.
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
///
|
||||
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
@@ -1103,21 +1116,27 @@ impl LayeredTimeline {
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}",
|
||||
distance, open_layer_size
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a thread to flush the frozen layer to disk, unless
|
||||
// a thread was already running. (If the thread was running
|
||||
|
||||
@@ -1044,6 +1044,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"checkpoint_timeout"),
|
||||
RowDescriptor::int8_col(b"compaction_target_size"),
|
||||
RowDescriptor::int8_col(b"compaction_period"),
|
||||
RowDescriptor::int8_col(b"compaction_threshold"),
|
||||
@@ -1054,6 +1055,12 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(repo.get_checkpoint_distance().to_string().as_bytes()),
|
||||
Some(
|
||||
repo.get_checkpoint_timeout()
|
||||
.as_secs()
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(repo.get_compaction_target_size().to_string().as_bytes()),
|
||||
Some(
|
||||
repo.get_compaction_period()
|
||||
|
||||
@@ -445,6 +445,7 @@ pub mod repo_harness {
|
||||
fn from(tenant_conf: TenantConf) -> Self {
|
||||
Self {
|
||||
checkpoint_distance: Some(tenant_conf.checkpoint_distance),
|
||||
checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
|
||||
compaction_target_size: Some(tenant_conf.compaction_target_size),
|
||||
compaction_period: Some(tenant_conf.compaction_period),
|
||||
compaction_threshold: Some(tenant_conf.compaction_threshold),
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod defaults {
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
@@ -48,6 +49,9 @@ pub struct TenantConf {
|
||||
// page server crashes.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub checkpoint_distance: u64,
|
||||
// Inmemory layer is also flushed at least once in checkpoint_timeout to
|
||||
// eventually upload WAL after activity is stopped.
|
||||
pub checkpoint_timeout: Duration,
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub compaction_target_size: u64,
|
||||
@@ -90,6 +94,7 @@ pub struct TenantConf {
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct TenantConfOpt {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Option<Duration>,
|
||||
@@ -113,6 +118,9 @@ impl TenantConfOpt {
|
||||
checkpoint_distance: self
|
||||
.checkpoint_distance
|
||||
.unwrap_or(global_conf.checkpoint_distance),
|
||||
checkpoint_timeout: self
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(global_conf.checkpoint_timeout),
|
||||
compaction_target_size: self
|
||||
.compaction_target_size
|
||||
.unwrap_or(global_conf.compaction_target_size),
|
||||
@@ -142,6 +150,9 @@ impl TenantConfOpt {
|
||||
if let Some(checkpoint_distance) = other.checkpoint_distance {
|
||||
self.checkpoint_distance = Some(checkpoint_distance);
|
||||
}
|
||||
if let Some(checkpoint_timeout) = other.checkpoint_timeout {
|
||||
self.checkpoint_timeout = Some(checkpoint_timeout);
|
||||
}
|
||||
if let Some(compaction_target_size) = other.compaction_target_size {
|
||||
self.compaction_target_size = Some(compaction_target_size);
|
||||
}
|
||||
@@ -181,6 +192,8 @@ impl TenantConf {
|
||||
|
||||
TenantConf {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
|
||||
.expect("cannot parse default checkpoint timeout"),
|
||||
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period"),
|
||||
@@ -212,6 +225,7 @@ impl TenantConf {
|
||||
pub fn dummy_conf() -> Self {
|
||||
TenantConf {
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: Duration::from_secs(600),
|
||||
compaction_target_size: 4 * 1024 * 1024,
|
||||
compaction_period: Duration::from_secs(10),
|
||||
compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD,
|
||||
|
||||
@@ -178,16 +178,6 @@ pub async fn handle_walreceiver_connection(
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
let timeline_to_check = Arc::clone(&timeline);
|
||||
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Spawned checkpoint check task panicked for timeline {id}")
|
||||
})?
|
||||
.with_context(|| {
|
||||
format!("Failed to check checkpoint distance for timeline {id}")
|
||||
})?;
|
||||
|
||||
Some(endlsn)
|
||||
}
|
||||
|
||||
@@ -208,6 +198,12 @@ pub async fn handle_walreceiver_connection(
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let timeline_to_check = Arc::clone(&timeline);
|
||||
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
|
||||
.await
|
||||
.with_context(|| format!("Spawned checkpoint check task panicked for timeline {id}"))?
|
||||
.with_context(|| format!("Failed to check checkpoint distance for timeline {id}"))?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let remote_index = repo.get_remote_index();
|
||||
let timeline_remote_consistent_lsn = remote_index
|
||||
|
||||
@@ -727,7 +727,7 @@ where
|
||||
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
|
||||
}
|
||||
// Initializing commit_lsn before acking first flushed record is
|
||||
// important to let find_end_of_wal skip the whole in the beginning
|
||||
// important to let find_end_of_wal skip the hole in the beginning
|
||||
// of the first segment.
|
||||
//
|
||||
// NB: on new clusters, this happens at the same time as
|
||||
@@ -738,6 +738,10 @@ where
|
||||
|
||||
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
|
||||
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
|
||||
// Initializing remote_consistent_lsn sets that we have nothing to
|
||||
// stream to pageserver(s) immediately after creation.
|
||||
self.inmem.remote_consistent_lsn =
|
||||
max(self.inmem.remote_consistent_lsn, state.timeline_start_lsn);
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.persist_control_file(state)?;
|
||||
|
||||
@@ -137,7 +137,7 @@ impl SharedState {
|
||||
self.is_wal_backup_required()
|
||||
// FIXME: add tracking of relevant pageservers and check them here individually,
|
||||
// otherwise migration won't work (we suspend too early).
|
||||
|| self.sk.inmem.remote_consistent_lsn <= self.sk.inmem.commit_lsn
|
||||
|| self.sk.inmem.remote_consistent_lsn < self.sk.inmem.commit_lsn
|
||||
}
|
||||
|
||||
/// Mark timeline active/inactive and return whether s3 offloading requires
|
||||
|
||||
@@ -284,9 +284,12 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
env.neon_cli.create_branch('test_safekeepers_wal_removal')
|
||||
pg = env.postgres.create_start('test_safekeepers_wal_removal')
|
||||
|
||||
# Note: it is important to insert at least two segments, as currently
|
||||
# control file is synced roughly once in segment range and WAL is not
|
||||
# removed until all horizons are persisted.
|
||||
pg.safe_psql_many([
|
||||
'CREATE TABLE t(key int primary key, value text)',
|
||||
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
|
||||
"INSERT INTO t SELECT generate_series(1,200000), 'payload'",
|
||||
])
|
||||
|
||||
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
|
||||
|
||||
Reference in New Issue
Block a user