Define PiTR interval for GC and make it possible to specify per-tenant configuration parameters

refer #1320
This commit is contained in:
Konstantin Knizhnik
2022-02-25 19:33:44 +03:00
committed by Anastasia Lubennikova
parent 214567bf8f
commit baac8ac410
11 changed files with 183 additions and 36 deletions

View File

@@ -345,7 +345,7 @@ impl PageServerNode {
) -> anyhow::Result<Option<ZTenantId>> {
let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest { new_tenant_id })
.json(&TenantCreateRequest::new(tenantid))
.send()?
.error_from_body()?
.json::<Option<String>>()?;

View File

@@ -41,6 +41,7 @@ pub mod defaults {
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -68,6 +69,7 @@ pub mod defaults {
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
@@ -109,6 +111,7 @@ pub struct PageServerConf {
pub gc_horizon: u64,
pub gc_period: Duration,
pub pitr_interval: Duration,
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
pub wait_lsn_timeout: Duration,
@@ -136,6 +139,27 @@ pub struct PageServerConf {
pub remote_storage_config: Option<RemoteStorageConfig>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TenantConf {
pub checkpoint_distance: u64,
pub compaction_period: Duration,
pub gc_horizon: u64,
pub gc_period: Duration,
pub pitr_interval: Duration,
}
impl TenantConf {
pub fn from(conf: &PageServerConf) -> TenantConf {
TenantConf {
gc_period: conf.gc_period,
gc_horizon: conf.gc_horizon,
pitr_interval: conf.pitr_interval,
checkpoint_distance: conf.checkpoint_distance,
compaction_period: conf.compaction_period,
}
}
}
// use dedicated enum for builder to better indicate the intention
// and avoid possible confusion with nested options
pub enum BuilderValue<T> {
@@ -165,6 +189,7 @@ struct PageServerConfigBuilder {
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
pitr_interval: BuilderValue<Duration>,
wait_lsn_timeout: BuilderValue<Duration>,
wal_redo_timeout: BuilderValue<Duration>,
@@ -201,6 +226,8 @@ impl Default for PageServerConfigBuilder {
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
pitr_interval: Set(humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval")),
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
@@ -249,6 +276,10 @@ impl PageServerConfigBuilder {
self.gc_period = BuilderValue::Set(gc_period)
}
pub fn pitr_interval(&mut self, gc_period: Duration) {
self.pitr_interval = BuilderValue::Set(pitr_interval)
}
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
}
@@ -317,6 +348,7 @@ impl PageServerConfigBuilder {
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
pitr_interval: self.pitr_interval.ok_or(anyhow::anyhow!("missing pitr_interval"))?,
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
@@ -455,6 +487,7 @@ impl PageServerConf {
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"pitr_interval" => builder.pitr_interval(parse_toml_duration(key, item)?),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
@@ -592,6 +625,7 @@ impl PageServerConf {
compaction_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
pitr_interval: Duration::from_secs(60 * 60),
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
@@ -666,6 +700,8 @@ compaction_period = '111 s'
gc_period = '222 s'
gc_horizon = 222
pitr_interval = '30 days'
wait_lsn_timeout = '111 s'
wal_redo_timeout = '111 s'
@@ -702,6 +738,7 @@ id = 10
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
pitr_interval: humantime::parse_duration(defaults::DEFAULT_PITR_INTERVAL)?,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
@@ -749,6 +786,7 @@ id = 10
gc_period: Duration::from_secs(222),
wait_lsn_timeout: Duration::from_secs(111),
wal_redo_timeout: Duration::from_secs(111),
pitr_interval: Duration::from_secs(30 * 24 * 60 * 60),
superuser: "zzzz".to_string(),
page_cache_size: 444,
max_file_descriptors: 333,

View File

@@ -25,6 +25,11 @@ pub struct TenantCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<ZTenantId>,
pub checkpoint_distance: Option<u64>,
pub compaction_period: Option<String>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub pitr_interval: Option<String>,
}
#[serde_as]
@@ -36,3 +41,16 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId
pub struct StatusResponse {
pub id: ZNodeId,
}
impl TenantCreateRequest {
pub fn new(new_tenant_id: ZTenantId) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
checkpoint_distance: None,
compaction_period: None,
gc_horizon: None,
gc_period: None,
pitr_interval: None,
}
}
}

View File

@@ -333,6 +333,16 @@ paths:
new_tenant_id:
type: string
format: hex
gc_period:
type: string
gc_horizon:
type: integer
pitr_interval:
type: string
checkpoint_distance:
type: integer
compaction_period:
type: string
responses:
"201":
description: New tenant created successfully

View File

@@ -24,7 +24,7 @@ use super::models::{
use crate::remote_storage::{schedule_timeline_download, RemoteIndex};
use crate::repository::Repository;
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
use crate::{config::PageServerConf, config::TenantConf, tenant_mgr, timelines, ZTenantId};
struct State {
conf: &'static PageServerConf,
@@ -290,6 +290,26 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let remote_index = get_state(&request).remote_index.clone();
let conf = get_config(&request);
let mut tenant_conf = TenantConf::from(conf);
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period =
humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?;
}
if let Some(gc_horizon) = request_data.gc_horizon {
tenant_conf.gc_horizon = gc_horizon;
}
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval =
humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?;
}
if let Some(checkpoint_distance) = request_data.checkpoint_distance {
tenant_conf.checkpoint_distance = checkpoint_distance;
}
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?;
}
let target_tenant_id = request_data
.new_tenant_id
.map(ZTenantId::from)
@@ -298,7 +318,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let new_tenant_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
tenant_mgr::create_tenant_repository(get_config(&request), target_tenant_id, remote_index)
tenant_mgr::create_tenant_repository(conf, tenant_conf,target_tenant_id, remote_index)
})
.await
.map_err(ApiError::from_err)??;

View File

@@ -29,11 +29,12 @@ use std::ops::{Bound::Included, Deref, Range};
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::Instant;
use std::time::{Duration, Instant, SystemTime};
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config::PageServerConf;
use crate::keyspace::KeySpace;
use crate::config::{PageServerConf, TenantConf};
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::page_cache;
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
use crate::repository::{
@@ -117,6 +118,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
///
pub struct LayeredRepository {
pub conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
// This mutex prevents creation of new timelines during GC.
@@ -140,6 +143,10 @@ pub struct LayeredRepository {
impl Repository for LayeredRepository {
type Timeline = LayeredTimeline;
fn get_tenant_conf(&self) -> TenantConf {
self.tenant_conf
}
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> {
let timelines = self.timelines.lock().unwrap();
self.get_timeline_internal(timelineid, &timelines)
@@ -269,6 +276,7 @@ impl Repository for LayeredRepository {
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
let timeline_str = target_timelineid
@@ -278,7 +286,7 @@ impl Repository for LayeredRepository {
STORAGE_TIME
.with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
})
}
@@ -540,6 +548,7 @@ impl LayeredRepository {
pub fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenantid: ZTenantId,
remote_index: RemoteIndex,
@@ -548,6 +557,7 @@ impl LayeredRepository {
LayeredRepository {
tenantid,
conf,
tenant_conf,
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
@@ -631,6 +641,7 @@ impl LayeredRepository {
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
let _span_guard =
@@ -706,7 +717,7 @@ impl LayeredRepository {
timeline.checkpoint(CheckpointConfig::Forced)?;
info!("timeline {} checkpoint_before_gc done", timelineid);
}
timeline.update_gc_info(branchpoints, cutoff);
timeline.update_gc_info(branchpoints, cutoff, pitr);
let result = timeline.gc()?;
totals += result;
@@ -1824,7 +1835,7 @@ impl LayeredTimeline {
/// obsolete.
///
fn gc(&self) -> Result<GcResult> {
let now = Instant::now();
let now = SystemTime::now();
let mut result: GcResult = Default::default();
let disk_consistent_lsn = self.get_disk_consistent_lsn();
@@ -1833,6 +1844,7 @@ impl LayeredTimeline {
let gc_info = self.gc_info.read().unwrap();
let retain_lsns = &gc_info.retain_lsns;
let cutoff = gc_info.cutoff;
let pitr = gc_info.pitr;
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
@@ -1850,8 +1862,9 @@ impl LayeredTimeline {
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk image layers cover the layer's whole key range
// 2. it is older than PITR interval;
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
//
let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
@@ -1877,8 +1890,33 @@ impl LayeredTimeline {
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// 2. Is it needed by a child branch?
// 2. It is newer than PiTR interval?
// We use modification time of layer file to estimate update time.
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
// It is not expected that users will need high precision here. And this estimation
// is conservative: modification time of file is always newer than actual time of version
// creation. So it is safe for users.
//
if let Ok(metadata) = fs::metadata(&l.filename()) {
let last_modified = metadata.modified()?;
if now.duration_since(last_modified)? < pitr {
info!(
"keeping {} {}-{} because it's modification time {:?} is newer than PiTR {:?}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
last_modified,
pitr
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_pitr += 1;
} else {
result.ondisk_nonrelfiles_needed_by_pitr += 1;
}
continue 'outer;
}
}
// 3. Is it needed by a child branch?
// NOTE With that wee would keep data that
// might be referenced by child branches forever.
// We can track this in child timeline GC and delete parent layers when
@@ -1897,7 +1935,7 @@ impl LayeredTimeline {
}
}
// 3. Is there a later on-disk layer for this relation?
// 4. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -1938,7 +1976,7 @@ impl LayeredTimeline {
result.layers_removed += 1;
}
result.elapsed = now.elapsed();
result.elapsed = now.elapsed()?;
Ok(result)
}

View File

@@ -19,6 +19,7 @@ use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
use tracing::*;
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::auth::{self, JwtAuth};
@@ -695,10 +696,11 @@ impl postgres_backend::Handler for PageServerHandler {
.unwrap_or(Ok(self.conf.gc_horizon))?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
RowDescriptor::int8_col(b"layers_needed_by_branches"),
RowDescriptor::int8_col(b"layers_not_updated"),
RowDescriptor::int8_col(b"layers_removed"),
@@ -707,6 +709,7 @@ impl postgres_backend::Handler for PageServerHandler {
.write_message_noflush(&BeMessage::DataRow(&[
Some(result.layers_total.to_string().as_bytes()),
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
Some(result.layers_needed_by_branches.to_string().as_bytes()),
Some(result.layers_not_updated.to_string().as_bytes()),
Some(result.layers_removed.to_string().as_bytes()),

View File

@@ -1,6 +1,7 @@
use crate::layered_repository::metadata::TimelineMetadata;
use crate::remote_storage::RemoteIndex;
use crate::walrecord::ZenithWalRecord;
use crate::config::TenantConf;
use crate::CheckpointConfig;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
@@ -249,6 +250,7 @@ pub trait Repository: Send + Sync {
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
@@ -261,6 +263,8 @@ pub trait Repository: Send + Sync {
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
fn get_remote_index(&self) -> &RemoteIndex;
fn get_tenant_conf(&self) -> TenantConf;
}
/// A timeline, that belongs to the current repository.
@@ -303,6 +307,7 @@ impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
pub struct GcResult {
pub layers_total: u64,
pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64,
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
@@ -313,6 +318,7 @@ pub struct GcResult {
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.layers_total += other.layers_total;
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_not_updated += other.layers_not_updated;
@@ -505,6 +511,7 @@ pub mod repo_harness {
let repo = LayeredRepository::new(
self.conf,
TenantConf::from(self.conf),
walredo_mgr,
self.tenant_id,
RemoteIndex::empty(),
@@ -720,7 +727,7 @@ mod tests {
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
@@ -771,7 +778,7 @@ mod tests {
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
match tline.get(*TEST_KEY, Lsn(0x25)) {
@@ -794,7 +801,7 @@ mod tests {
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(())
@@ -813,7 +820,7 @@ mod tests {
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
// run gc on parent
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// Check that the data is still accessible on the branch.
assert_eq!(

View File

@@ -1,7 +1,8 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use crate::config::PageServerConf;
use crate::branches;
use crate::config::{PageServerConf, TenantConf};
use crate::layered_repository::LayeredRepository;
use crate::remote_storage::RemoteIndex;
use crate::repository::{Repository, TimelineSyncStatusUpdate};
@@ -174,9 +175,13 @@ pub fn shutdown_all_tenants() {
pub fn create_tenant_repository(
conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenantid: ZTenantId,
remote_index: RemoteIndex,
) -> Result<Option<ZTenantId>> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = branches::create_repo(conf, tenant_conf, tenantid, wal_redo_manager)?;
match access_tenants().entry(tenantid) {
Entry::Occupied(_) => {
debug!("tenant {} already exists", tenantid);
@@ -184,8 +189,10 @@ pub fn create_tenant_repository(
}
Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let tenant_conf = match TenantConf::load(conf, tenant_id)?;
let repo = timelines::create_repo(
conf,
tenant_conf,
tenantid,
CreateRepo::Real {
wal_redo_manager,
@@ -210,7 +217,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
/// Change the state of a tenant to Active and launch its compactor and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> {
pub fn activate_tenant(tenantid: ZTenantId) -> Result<()> {
let mut m = access_tenants();
let tenant = m
.get_mut(&tenant_id)
@@ -230,7 +237,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None,
"Compactor thread",
true,
move || crate::tenant_threads::compact_loop(tenant_id, conf),
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
let gc_spawn_result = thread_mgr::spawn(
@@ -239,7 +246,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None,
"GC thread",
true,
move || crate::tenant_threads::gc_loop(tenant_id, conf),
move || crate::tenant_threads::gc_loop(tenant_id),
)
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
@@ -251,7 +258,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
tenant.state = TenantState::Active;
}

View File

@@ -1,6 +1,5 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::config::PageServerConf;
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
@@ -12,8 +11,8 @@ use zenith_utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid, conf) {
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
@@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
}
}
fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
std::thread::sleep(conf.compaction_period);
std::thread::sleep(tenant_conf.checkpoint_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
@@ -46,23 +47,29 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.gc_iteration(None, conf.gc_horizon, false)?;
if tenant_conf.gc_horizon > 0 {
repo.gc_iteration(
None,
tenant_conf.gc_horizon,
tenant_conf.pitr_interval,
false,
)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
let mut sleep_time = tenant_conf.gc_period.as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;

View File

@@ -93,7 +93,7 @@ pub fn launch_wal_receiver(
receivers.insert((tenantid, timelineid), receiver);
// Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::activate_tenant(conf, tenantid)?;
tenant_mgr::activate_tenant(tenantid)?;
}
};
Ok(())