Refactor the checkpoint and compaction functions.

The concept of a "checkpoint" had become quite muddled. This tries to
clarify it again.
This commit is contained in:
Heikki Linnakangas
2022-03-14 13:22:46 +02:00
parent 2d8587f67d
commit 09f2dff537
10 changed files with 158 additions and 128 deletions

View File

@@ -31,7 +31,8 @@ pub mod defaults {
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: &str = "1 s";
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
@@ -57,7 +58,7 @@ pub mod defaults {
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_period = '{DEFAULT_CHECKPOINT_PERIOD}'
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
@@ -91,7 +92,9 @@ pub struct PageServerConf {
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
pub checkpoint_distance: u64,
pub checkpoint_period: Duration,
// How often to check if there's compaction work to be done.
pub compaction_period: Duration,
pub gc_horizon: u64,
pub gc_period: Duration,
@@ -145,7 +148,8 @@ struct PageServerConfigBuilder {
listen_http_addr: BuilderValue<String>,
checkpoint_distance: BuilderValue<u64>,
checkpoint_period: BuilderValue<Duration>,
compaction_period: BuilderValue<Duration>,
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
@@ -179,8 +183,8 @@ impl Default for PageServerConfigBuilder {
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
checkpoint_period: Set(humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)
.expect("cannot parse default checkpoint period")),
compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period")),
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
@@ -216,8 +220,8 @@ impl PageServerConfigBuilder {
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
}
pub fn checkpoint_period(&mut self, checkpoint_period: Duration) {
self.checkpoint_period = BuilderValue::Set(checkpoint_period)
pub fn compaction_period(&mut self, compaction_period: Duration) {
self.compaction_period = BuilderValue::Set(compaction_period)
}
pub fn gc_horizon(&mut self, gc_horizon: u64) {
@@ -286,9 +290,9 @@ impl PageServerConfigBuilder {
checkpoint_distance: self
.checkpoint_distance
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
checkpoint_period: self
.checkpoint_period
.ok_or(anyhow::anyhow!("missing checkpoint_period"))?,
compaction_period: self
.compaction_period
.ok_or(anyhow::anyhow!("missing compaction_period"))?,
gc_horizon: self
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
@@ -425,7 +429,7 @@ impl PageServerConf {
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
"checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?),
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
@@ -561,7 +565,7 @@ impl PageServerConf {
PageServerConf {
id: ZNodeId(0),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: Duration::from_secs(10),
compaction_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
wait_lsn_timeout: Duration::from_secs(60),
@@ -631,7 +635,8 @@ listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
checkpoint_distance = 111 # in bytes
checkpoint_period = '111 s'
compaction_period = '111 s'
gc_period = '222 s'
gc_horizon = 222
@@ -668,7 +673,7 @@ id = 10
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: humantime::parse_duration(defaults::DEFAULT_CHECKPOINT_PERIOD)?,
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
@@ -712,7 +717,7 @@ id = 10
listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
checkpoint_distance: 111,
checkpoint_period: Duration::from_secs(111),
compaction_period: Duration::from_secs(111),
gc_horizon: 222,
gc_period: Duration::from_secs(222),
wait_lsn_timeout: Duration::from_secs(111),

View File

@@ -72,8 +72,6 @@ use layer_map::LayerMap;
use layer_map::SearchResult;
use storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::keyspace::TARGET_FILE_SIZE_BYTES;
// re-export this function so that page_cache.rs can use it.
pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file;
@@ -104,7 +102,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
/// Repository consists of multiple timelines. Keep them in a hash table.
///
pub struct LayeredRepository {
conf: &'static PageServerConf,
pub conf: &'static PageServerConf,
tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
// This mutex prevents creation of new timelines during GC.
@@ -246,23 +244,58 @@ impl Repository for LayeredRepository {
})
}
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> {
fn compaction_iteration(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
// compactions. We don't want to block everything else while the
// compaction runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_checkpoint = timelines
let timelines_to_compact = timelines
.iter()
.map(|(timelineid, timeline)| (*timelineid, timeline.clone()))
.collect::<Vec<_>>();
drop(timelines);
for (timelineid, timeline) in &timelines_to_checkpoint {
for (timelineid, timeline) in &timelines_to_compact {
let _entered =
info_span!("compact", timeline = %timelineid, tenant = %self.tenantid).entered();
match timeline {
LayeredTimelineEntry::Local(timeline) => {
timeline.compact()?;
}
LayeredTimelineEntry::Remote { .. } => {
debug!("Cannot compact remote timeline {}", timelineid)
}
}
}
Ok(())
}
///
/// Flush all in-memory data to disk.
///
/// Used at shutdown.
///
fn checkpoint(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_compact = timelines
.iter()
.map(|(timelineid, timeline)| (*timelineid, timeline.clone()))
.collect::<Vec<_>>();
drop(timelines);
for (timelineid, timeline) in &timelines_to_compact {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered();
match timeline {
LayeredTimelineEntry::Local(timeline) => timeline.checkpoint(cconf)?,
LayeredTimelineEntry::Local(timeline) => {
timeline.checkpoint(CheckpointConfig::Flush)?;
}
LayeredTimelineEntry::Remote { .. } => debug!(
"Cannot run the checkpoint for remote timeline {}",
timelineid
@@ -756,9 +789,9 @@ pub struct LayeredTimeline {
// Metrics histograms
reconstruct_time_histo: Histogram,
checkpoint_time_histo: Histogram,
flush_checkpoint_time_histo: Histogram,
forced_checkpoint_time_histo: Histogram,
flush_time_histo: Histogram,
compact_time_histo: Histogram,
create_images_time_histo: Histogram,
/// If `true`, will backup its files that appear after each checkpointing to the remote storage.
upload_relishes: AtomicBool,
@@ -860,23 +893,14 @@ impl Timeline for LayeredTimeline {
fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()> {
match cconf {
CheckpointConfig::Flush => {
self.flush_checkpoint_time_histo
.observe_closure_duration(|| {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)
})
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)
}
CheckpointConfig::Forced => {
self.forced_checkpoint_time_histo
.observe_closure_duration(|| {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)?;
self.checkpoint_internal()
})
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)?;
self.compact()
}
CheckpointConfig::Distance => self
.checkpoint_time_histo
.observe_closure_duration(|| self.checkpoint_internal()),
}
}
@@ -946,23 +970,23 @@ impl LayeredTimeline {
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
.unwrap();
let checkpoint_time_histo = STORAGE_TIME
let flush_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"checkpoint",
"layer flush",
&tenantid.to_string(),
&timelineid.to_string(),
])
.unwrap();
let flush_checkpoint_time_histo = STORAGE_TIME
let compact_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"flush checkpoint",
"compact",
&tenantid.to_string(),
&timelineid.to_string(),
])
.unwrap();
let forced_checkpoint_time_histo = STORAGE_TIME
let create_images_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"forced checkpoint",
"create images",
&tenantid.to_string(),
&timelineid.to_string(),
])
@@ -989,9 +1013,9 @@ impl LayeredTimeline {
ancestor_lsn: metadata.ancestor_lsn(),
reconstruct_time_histo,
checkpoint_time_histo,
flush_checkpoint_time_histo,
forced_checkpoint_time_histo,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
upload_relishes: AtomicBool::new(upload_relishes),
@@ -1331,37 +1355,6 @@ impl LayeredTimeline {
drop(layers);
}
///
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
fn checkpoint_internal(&self) -> Result<()> {
info!("checkpoint starting");
// Prevent concurrent checkpoints
// FIXME: This does compaction now, not the flushing of layers.
// Is this lock still needed?
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
// Create new image layers to allow GC and to reduce read latency
// TODO: the threshold for how often we create image layers is
// currently hard-coded at 3. It means, write out a new image layer,
// if there are at least three delta layers on top of it.
self.compact(TARGET_FILE_SIZE_BYTES as usize)?;
// TODO: We should also compact existing delta layers here.
// Call unload() on all frozen layers, to release memory.
// This shouldn't be much memory, as only metadata is slurped
// into memory.
let layers = self.layers.lock().unwrap();
for layer in layers.iter_historic_layers() {
layer.unload()?;
}
drop(layers);
Ok(())
}
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn();
@@ -1402,6 +1395,8 @@ impl LayeredTimeline {
}
};
let timer = self.flush_time_histo.start_timer();
loop {
let layers = self.layers.lock().unwrap();
if let Some(frozen_layer) = layers.frozen_layers.front() {
@@ -1422,6 +1417,8 @@ impl LayeredTimeline {
}
}
timer.stop_and_record();
Ok(())
}
@@ -1525,13 +1522,13 @@ impl LayeredTimeline {
Ok(())
}
fn compact(&self, target_file_size: usize) -> Result<()> {
pub fn compact(&self) -> Result<()> {
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into TARGET_FILE_SIZE chunks, but also take into
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
// chunk boundaries with the existing image layers to avoid
// too much churn. Also try to align chunk boundaries with
@@ -1561,10 +1558,13 @@ impl LayeredTimeline {
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let target_file_size = self.conf.checkpoint_distance;
// 1. The partitioning was already done by the code in
// pgdatadir_mapping.rs. We just use it here.
let partitioning_guard = self.partitioning.read().unwrap();
if let Some((partitioning, lsn)) = partitioning_guard.as_ref() {
let timer = self.create_images_time_histo.start_timer();
// Make a copy of the partitioning, so that we can release
// the lock. Otherwise we could block the WAL receiver.
let lsn = *lsn;
@@ -1578,12 +1578,25 @@ impl LayeredTimeline {
self.create_image_layer(partition, lsn)?;
}
}
timer.stop_and_record();
// 3. Compact
let timer = self.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
} else {
info!("Could not compact because no partitioning specified yet");
}
// Call unload() on all frozen layers, to release memory.
// This shouldn't be much memory, as only metadata is slurped
// into memory.
let layers = self.layers.lock().unwrap();
for layer in layers.iter_historic_layers() {
layer.unload()?;
}
drop(layers);
Ok(())
}
@@ -1643,7 +1656,7 @@ impl LayeredTimeline {
Ok(())
}
fn compact_level0(&self, target_file_size: usize) -> Result<()> {
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
let layers = self.layers.lock().unwrap();
// We compact or "shuffle" the level-0 delta layers when 10 have
@@ -1698,7 +1711,7 @@ impl LayeredTimeline {
if let Some(prev_key) = prev_key {
if key != prev_key && writer.is_some() {
let size = writer.as_mut().unwrap().size();
if size > target_file_size as u64 {
if size > target_file_size {
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
writer = None;
}
@@ -2032,7 +2045,7 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
/// file format and directory layout. The test here are more low level.
///
#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::repository::repo_harness::*;
@@ -2072,7 +2085,7 @@ mod tests {
// file size is much larger, maybe 1 GB. But a small size makes it
// much faster to exercise all the logic for creating the files,
// garbage collection, compaction etc.
const TEST_FILE_SIZE: usize = 4 * 1024 * 1024;
pub const TEST_FILE_SIZE: u64 = 4 * 1024 * 1024;
#[test]
fn test_images() -> Result<()> {
@@ -2088,7 +2101,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact(TEST_FILE_SIZE)?;
tline.compact()?;
let writer = tline.writer();
writer.put(TEST_KEY, Lsn(0x20), Value::Image(TEST_IMG("foo at 0x20")))?;
@@ -2096,7 +2109,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact(TEST_FILE_SIZE)?;
tline.compact()?;
let writer = tline.writer();
writer.put(TEST_KEY, Lsn(0x30), Value::Image(TEST_IMG("foo at 0x30")))?;
@@ -2104,7 +2117,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact(TEST_FILE_SIZE)?;
tline.compact()?;
let writer = tline.writer();
writer.put(TEST_KEY, Lsn(0x40), Value::Image(TEST_IMG("foo at 0x40")))?;
@@ -2112,7 +2125,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact(TEST_FILE_SIZE)?;
tline.compact()?;
assert_eq!(tline.get(TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10"));
assert_eq!(tline.get(TEST_KEY, Lsn(0x1f))?, TEST_IMG("foo at 0x10"));
@@ -2165,7 +2178,7 @@ mod tests {
tline.update_gc_info(Vec::new(), cutoff);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact(TEST_FILE_SIZE)?;
tline.compact()?;
tline.gc()?;
}
@@ -2239,7 +2252,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact(TEST_FILE_SIZE)?;
tline.compact()?;
tline.gc()?;
}
@@ -2325,7 +2338,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact(TEST_FILE_SIZE)?;
tline.compact()?;
tline.gc()?;
}

View File

@@ -41,8 +41,6 @@ pub const LOG_FILE_NAME: &str = "pageserver.log";
/// Config for the Repository checkpointer
#[derive(Debug, Clone, Copy)]
pub enum CheckpointConfig {
// Flush in-memory data that is older than this
Distance,
// Flush all in-memory data
Flush,
// Flush all in-memory data and reconstruct all page images

View File

@@ -729,6 +729,13 @@ impl postgres_backend::Handler for PageServerHandler {
.context("Failed to fetch local timeline for checkpoint request")?;
timeline.tline.checkpoint(CheckpointConfig::Forced)?;
// Also compact it.
//
// FIXME: This probably shouldn't be part of a "checkpoint" command, but a
// separate operation. Update the tests if you change this.
timeline.tline.compact()?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {

View File

@@ -35,6 +35,8 @@ where
pub tline: Arc<R::Timeline>,
pub last_partitioning: AtomicLsn,
pub current_logical_size: AtomicIsize,
pub repartition_threshold: u64,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -71,11 +73,12 @@ pub struct SlruSegmentDirectory {
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
impl<R: Repository> DatadirTimeline<R> {
pub fn new(tline: Arc<R::Timeline>) -> Self {
pub fn new(tline: Arc<R::Timeline>, repartition_threshold: u64) -> Self {
DatadirTimeline {
tline,
last_partitioning: AtomicLsn::new(0),
current_logical_size: AtomicIsize::new(0),
repartition_threshold,
}
}
@@ -1178,7 +1181,7 @@ pub fn create_test_timeline<R: Repository>(
timeline_id: zenith_utils::zid::ZTimelineId,
) -> Result<Arc<crate::DatadirTimeline<R>>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = DatadirTimeline::new(tline);
let tline = DatadirTimeline::new(tline, crate::layered_repository::tests::TEST_FILE_SIZE / 10);
let mut writer = tline.begin_record(Lsn(8));
writer.init_empty()?;

View File

@@ -194,6 +194,11 @@ pub trait Repository: Send + Sync {
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>;
/// Flush all data to disk.
///
/// this is used at graceful shutdown.
fn checkpoint(&self) -> Result<()>;
/// perform one garbage collection iteration, removing old data files from disk.
/// this function is periodically called by gc thread.
/// also it can be explicitly requested through page server api 'do_gc' command.
@@ -210,9 +215,9 @@ pub trait Repository: Send + Sync {
checkpoint_before_gc: bool,
) -> Result<GcResult>;
/// perform one checkpoint iteration, flushing in-memory data on disk.
/// this function is periodically called by checkponter thread.
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()>;
/// perform one compaction iteration.
/// this function is periodically called by compactor thread.
fn compaction_iteration(&self) -> Result<()>;
}
/// A timeline, that belongs to the current repository.

View File

@@ -9,7 +9,6 @@ use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
use crate::walredo::PostgresRedoManager;
use crate::CheckpointConfig;
use crate::{DatadirTimelineImpl, RepositoryImpl};
use anyhow::{Context, Result};
use lazy_static::lazy_static;
@@ -152,7 +151,7 @@ pub fn shutdown_all_tenants() {
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Checkpointer), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
// Ok, no background threads running anymore. Flush any remaining data in
// memory to disk.
@@ -166,7 +165,7 @@ pub fn shutdown_all_tenants() {
debug!("shutdown tenant {}", tenantid);
match get_repository_for_tenant(tenantid) {
Ok(repo) => {
if let Err(err) = repo.checkpoint_iteration(CheckpointConfig::Flush) {
if let Err(err) = repo.checkpoint() {
error!(
"Could not checkpoint tenant {} during shutdown: {:?}",
tenantid, err
@@ -212,7 +211,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
}
///
/// Change the state of a tenant to Active and launch its checkpointer and GC
/// Change the state of a tenant to Active and launch its compactor and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Result<()> {
@@ -227,18 +226,18 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Re
// If the tenant is already active, nothing to do.
TenantState::Active => {}
// If it's Idle, launch the checkpointer and GC threads
// If it's Idle, launch the compactor and GC threads
TenantState::Idle => {
thread_mgr::spawn(
ThreadKind::Checkpointer,
ThreadKind::Compactor,
Some(tenantid),
None,
"Checkpointer thread",
move || crate::tenant_threads::checkpoint_loop(tenantid, conf),
"Compactor thread",
move || crate::tenant_threads::compact_loop(tenantid, conf),
)?;
// FIXME: if we fail to launch the GC thread, but already launched the
// checkpointer, we're in a strange state.
// compactor, we're in a strange state.
thread_mgr::spawn(
ThreadKind::GarbageCollector,
@@ -286,7 +285,9 @@ pub fn get_timeline_for_tenant(
.local_timeline()
.with_context(|| format!("cannot fetch timeline {}", timelineid))?;
let page_tline = Arc::new(DatadirTimelineImpl::new(tline));
let repartition_distance = tenant.repo.conf.checkpoint_distance / 10;
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
page_tline.init_logical_size()?;
tenant.timelines.insert(timelineid, Arc::clone(&page_tline));
Ok(page_tline)

View File

@@ -1,44 +1,42 @@
//! This module contains functions to serve per-tenant background processes,
//! such as checkpointer and GC
//! such as compaction and GC
use crate::config::PageServerConf;
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::CheckpointConfig;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use zenith_utils::zid::ZTenantId;
///
/// Checkpointer thread's main loop
/// Compaction thread's main loop
///
pub fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
if let Err(err) = checkpoint_loop_ext(tenantid, conf) {
error!("checkpoint loop terminated with error: {:?}", err);
pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid, conf) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn checkpoint_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
std::thread::sleep(conf.checkpoint_period);
trace!("checkpointer thread for tenant {} waking up", tenantid);
std::thread::sleep(conf.compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE
// bytes of WAL since last checkpoint.
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.checkpoint_iteration(CheckpointConfig::Distance)?;
repo.compaction_iteration()?;
}
trace!(
"checkpointer thread stopped for tenant {} state is {:?}",
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);

View File

@@ -92,8 +92,8 @@ pub enum ThreadKind {
// Thread that connects to a safekeeper to fetch WAL for one timeline.
WalReceiver,
// Thread that handles checkpointing of all timelines for a tenant.
Checkpointer,
// Thread that handles compaction of all timelines for a tenant.
Compactor,
// Thread that handles GC of a tenant
GarbageCollector,

View File

@@ -259,7 +259,7 @@ fn bootstrap_timeline<R: Repository>(
// Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = repo.create_empty_timeline(tli, lsn)?;
let mut page_tline: DatadirTimeline<R> = DatadirTimeline::new(timeline);
let mut page_tline: DatadirTimeline<R> = DatadirTimeline::new(timeline, u64::MAX);
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &mut page_tline, lsn)?;
page_tline.tline.checkpoint(CheckpointConfig::Forced)?;