Add checkpoint_distance config parameter.

- Change hardcoded OLDEST_INMEM_DISTANCE value to pageserver config option checkpoint_distance.
- Get rid of 'force' flag in checkpoint_internal(). Use checkpoint_distance=0 instead.
This commit is contained in:
anastasia
2021-09-15 16:43:58 +03:00
committed by lubennikovaav
parent 87bc18972f
commit 98d4f9cea5
5 changed files with 43 additions and 33 deletions

View File

@@ -30,6 +30,10 @@ writes out the changes from in-memory layers into new layer files[]. This proces
is called "checkpointing". The page server only creates layer files for
relations that have been modified since the last checkpoint.
Configuration parameter `checkpoint_distance` defines the distance
from current LSN to perform checkpoint of in-memory layers.
Default is `DEFAULT_CHECKPOINT_DISTANCE`.
Set this parameter to `0` to force checkpoint of every layer.
### Compute node
Stateless Postgres node that stores data in pageserver.

View File

@@ -27,6 +27,7 @@ use zenith_utils::http::endpoint;
struct CfgFileParams {
listen_pg_addr: Option<String>,
listen_http_addr: Option<String>,
checkpoint_distance: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
pg_distrib_dir: Option<String>,
@@ -44,6 +45,7 @@ impl CfgFileParams {
Self {
listen_pg_addr: get_arg("listen-pg"),
listen_http_addr: get_arg("listen-http"),
checkpoint_distance: get_arg("checkpoint_distance"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
pg_distrib_dir: get_arg("postgres-distrib"),
@@ -58,6 +60,7 @@ impl CfgFileParams {
Self {
listen_pg_addr: self.listen_pg_addr.or(other.listen_pg_addr),
listen_http_addr: self.listen_http_addr.or(other.listen_http_addr),
checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
@@ -82,6 +85,11 @@ impl CfgFileParams {
None => DEFAULT_HTTP_LISTEN_ADDR.to_owned(),
};
let checkpoint_distance: i128 = match self.checkpoint_distance.as_ref() {
Some(checkpoint_distance_str) => checkpoint_distance_str.parse()?,
None => DEFAULT_CHECKPOINT_DISTANCE,
};
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
Some(horizon_str) => horizon_str.parse()?,
None => DEFAULT_GC_HORIZON,
@@ -129,6 +137,7 @@ impl CfgFileParams {
listen_pg_addr,
listen_http_addr,
checkpoint_distance,
gc_horizon,
gc_period,
@@ -175,6 +184,12 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Initialize pageserver repo"),
)
.arg(
Arg::with_name("checkpoint_distance")
.long("checkpoint_distance")
.takes_value(true)
.help("Distance from current LSN to perform checkpoint of in-memory layers"),
)
.arg(
Arg::with_name("gc_horizon")
.long("gc_horizon")

View File

@@ -70,15 +70,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
// Flush out an inmemory layer, if it's holding WAL older than this.
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
//
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
static OLDEST_INMEM_DISTANCE: i128 = 16 * 1024 * 1024;
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
@@ -309,14 +300,16 @@ impl LayeredRepository {
info!("checkpointer thread for tenant {} waking up", self.tenantid);
// checkpoint timelines that have accumulated more than CHECKPOINT_INTERVAL
// checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE
// bytes of WAL since last checkpoint.
{
let timelines = self.timelines.lock().unwrap();
for (_timelineid, timeline) in timelines.iter() {
STORAGE_TIME
.with_label_values(&["checkpoint_timed"])
.observe_closure_duration(|| timeline.checkpoint_internal(false))?
.observe_closure_duration(|| {
timeline.checkpoint_internal(conf.checkpoint_distance)
})?
}
// release lock on 'timelines'
}
@@ -861,7 +854,8 @@ impl Timeline for LayeredTimeline {
fn checkpoint(&self) -> Result<()> {
STORAGE_TIME
.with_label_values(&["checkpoint_force"])
.observe_closure_duration(|| self.checkpoint_internal(true))
//pass checkpoint_distance=0 to force checkpoint
.observe_closure_duration(|| self.checkpoint_internal(0))
}
///
@@ -1215,9 +1209,8 @@ impl LayeredTimeline {
///
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint_internal(&self, force: bool) -> Result<()> {
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
fn checkpoint_internal(&self, checkpoint_distance: i128) -> Result<()> {
// Grab lock on the layer map.
//
// TODO: We hold it locked throughout the checkpoint operation. That's bad,
@@ -1257,15 +1250,15 @@ impl LayeredTimeline {
// Does this layer need freezing?
//
// Write out all in-memory layers that contain WAL older than OLDEST_INMEM_DISTANCE.
// Or if 'force' is true, write out all of them. If we reach a layer with the same
// Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE.
// If we reach a layer with the same
// generation number, we know that we have cycled through all layers that were open
// when we started. We don't want to process layers inserted after we started, to
// avoid getting into an infinite loop trying to process again entries that we
// inserted ourselves.
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
if distance < 0
|| (!force && distance < OLDEST_INMEM_DISTANCE)
|| distance < checkpoint_distance
|| oldest_generation == current_generation
{
info!(

View File

@@ -28,6 +28,11 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = "127.0.0.1:9898";
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
pub const DEFAULT_CHECKPOINT_DISTANCE: i128 = 64 * 1024 * 1024;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
@@ -50,6 +55,10 @@ pub struct PageServerConf {
pub daemonize: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
pub checkpoint_distance: i128,
pub gc_horizon: u64,
pub gc_period: Duration,
pub superuser: String,
@@ -135,7 +144,8 @@ impl PageServerConf {
fn dummy_conf(repo_dir: PathBuf) -> Self {
PageServerConf {
daemonize: false,
gc_horizon: 64 * 1024 * 1024,
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
listen_pg_addr: "127.0.0.1:5430".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),

View File

@@ -216,8 +216,6 @@ mod tests {
use postgres_ffi::xlog_utils::SIZEOF_CHECKPOINT;
use std::fs;
use std::str::FromStr;
use std::time::Duration;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
/// Arbitrary relation tag, for testing.
@@ -261,18 +259,8 @@ mod tests {
fs::create_dir_all(&repo_dir)?;
fs::create_dir_all(&repo_dir.join("timelines"))?;
let conf = PageServerConf {
daemonize: false,
gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10),
listen_pg_addr: "127.0.0.1:5430".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
superuser: "zenith_admin".to_string(),
workdir: repo_dir,
pg_distrib_dir: "".into(),
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
};
let conf = PageServerConf::dummy_conf(repo_dir);
// Make a static copy of the config. This can never be free'd, but that's
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));