Compare commits

...

9 Commits

Author SHA1 Message Date
Anastasia Lubennikova
4d3b15fc3e Remove fields that are present in TenantConf from global PageserverConf to avoid confusion 2022-04-08 15:48:56 +03:00
Anastasia Lubennikova
c93d31f9db Move tenant_config code to a separate module.
Add checksum to tenantconf file.
Save tenant conf file to tenant's directory, when tenant is created.
2022-04-08 15:48:56 +03:00
Anastasia Lubennikova
47ece2be8b Add compaction_target_size to per-tenant config 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
3da8233f08 Fixes after rebase 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
51c7ceb1d9 Add test for per-tenant config 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
5169d664c5 Add 'show' query to pageserver protocol for tenantspecific config parameters 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
1bb364b5bc Address code review issues 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
7e6eff4969 Save tenant config in pageserver directory 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
baac8ac410 Define PiTR interval for GC and make it possible to specify per-tenant configuration parameters
refer #1320
2022-04-08 15:48:56 +03:00
17 changed files with 553 additions and 165 deletions

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
@@ -342,10 +343,25 @@ impl PageServerNode {
pub fn tenant_create(
&self,
new_tenant_id: Option<ZTenantId>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<Option<ZTenantId>> {
let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest { new_tenant_id })
.json(&TenantCreateRequest {
new_tenant_id,
checkpoint_distance: settings
.get("checkpoint_distance")
.map(|x| x.parse::<u64>().unwrap()),
compaction_target_size: settings
.get("compaction_target_size")
.map(|x| x.parse::<u64>().unwrap()),
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
gc_horizon: settings
.get("gc_horizon")
.map(|x| x.parse::<u64>().unwrap()),
gc_period: settings.get("gc_period").map(|x| x.to_string()),
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
})
.send()?
.error_from_body()?
.json::<Option<String>>()?;

View File

@@ -27,21 +27,6 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -62,13 +47,6 @@ pub mod defaults {
#listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
@@ -94,22 +72,6 @@ pub struct PageServerConf {
/// Example (default): 127.0.0.1:9898
pub listen_http_addr: String,
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
pub compaction_period: Duration,
pub gc_horizon: u64,
pub gc_period: Duration,
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
pub wait_lsn_timeout: Duration,
// How long to wait for WAL redo to complete.
@@ -158,14 +120,6 @@ struct PageServerConfigBuilder {
listen_http_addr: BuilderValue<String>,
checkpoint_distance: BuilderValue<u64>,
compaction_target_size: BuilderValue<u64>,
compaction_period: BuilderValue<Duration>,
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
wait_lsn_timeout: BuilderValue<Duration>,
wal_redo_timeout: BuilderValue<Duration>,
@@ -194,13 +148,6 @@ impl Default for PageServerConfigBuilder {
Self {
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE),
compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period")),
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
@@ -229,26 +176,6 @@ impl PageServerConfigBuilder {
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
}
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
}
pub fn compaction_target_size(&mut self, compaction_target_size: u64) {
self.compaction_target_size = BuilderValue::Set(compaction_target_size)
}
pub fn compaction_period(&mut self, compaction_period: Duration) {
self.compaction_period = BuilderValue::Set(compaction_period)
}
pub fn gc_horizon(&mut self, gc_horizon: u64) {
self.gc_horizon = BuilderValue::Set(gc_horizon)
}
pub fn gc_period(&mut self, gc_period: Duration) {
self.gc_period = BuilderValue::Set(gc_period)
}
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
}
@@ -304,19 +231,6 @@ impl PageServerConfigBuilder {
listen_http_addr: self
.listen_http_addr
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
checkpoint_distance: self
.checkpoint_distance
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
compaction_target_size: self
.compaction_target_size
.ok_or(anyhow::anyhow!("missing compaction_target_size"))?,
compaction_period: self
.compaction_period
.ok_or(anyhow::anyhow!("missing compaction_period"))?,
gc_horizon: self
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
@@ -448,13 +362,6 @@ impl PageServerConf {
match key {
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
"compaction_target_size" => {
builder.compaction_target_size(parse_toml_u64(key, item)?)
}
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
@@ -587,11 +494,6 @@ impl PageServerConf {
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
PageServerConf {
id: ZNodeId(0),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
@@ -658,14 +560,6 @@ mod tests {
listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
checkpoint_distance = 111 # in bytes
compaction_target_size = 111 # in bytes
compaction_period = '111 s'
gc_period = '222 s'
gc_horizon = 222
wait_lsn_timeout = '111 s'
wal_redo_timeout = '111 s'
@@ -697,11 +591,6 @@ id = 10
id: ZNodeId(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
@@ -742,11 +631,6 @@ id = 10
id: ZNodeId(10),
listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
checkpoint_distance: 111,
compaction_target_size: 111,
compaction_period: Duration::from_secs(111),
gc_horizon: 222,
gc_period: Duration::from_secs(222),
wait_lsn_timeout: Duration::from_secs(111),
wal_redo_timeout: Duration::from_secs(111),
superuser: "zzzz".to_string(),

View File

@@ -25,6 +25,12 @@ pub struct TenantCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<ZTenantId>,
pub checkpoint_distance: Option<u64>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub pitr_interval: Option<String>,
}
#[serde_as]
@@ -36,3 +42,17 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId
pub struct StatusResponse {
pub id: ZNodeId,
}
impl TenantCreateRequest {
pub fn new(new_tenant_id: Option<ZTenantId>) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
checkpoint_distance: None,
compaction_target_size: None,
compaction_period: None,
gc_horizon: None,
gc_period: None,
pitr_interval: None,
}
}
}

View File

@@ -333,6 +333,16 @@ paths:
new_tenant_id:
type: string
format: hex
gc_period:
type: string
gc_horizon:
type: integer
pitr_interval:
type: string
checkpoint_distance:
type: integer
compaction_period:
type: string
responses:
"201":
description: New tenant created successfully

View File

@@ -23,6 +23,7 @@ use super::models::{
};
use crate::remote_storage::{schedule_timeline_download, RemoteIndex};
use crate::repository::Repository;
use crate::tenant_config::TenantConf;
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
@@ -290,6 +291,28 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let remote_index = get_state(&request).remote_index.clone();
let mut tenant_conf = TenantConf::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period =
humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?;
}
if let Some(gc_horizon) = request_data.gc_horizon {
tenant_conf.gc_horizon = gc_horizon;
}
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval =
humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?;
}
if let Some(checkpoint_distance) = request_data.checkpoint_distance {
tenant_conf.checkpoint_distance = checkpoint_distance;
}
if let Some(compaction_target_size) = request_data.compaction_target_size {
tenant_conf.compaction_target_size = compaction_target_size;
}
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?;
}
let target_tenant_id = request_data
.new_tenant_id
.map(ZTenantId::from)
@@ -297,8 +320,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let new_tenant_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
let conf = get_config(&request);
tenant_mgr::create_tenant_repository(get_config(&request), target_tenant_id, remote_index)
tenant_mgr::create_tenant_repository(conf, tenant_conf, target_tenant_id, remote_index)
})
.await
.map_err(ApiError::from_err)??;

View File

@@ -29,11 +29,13 @@ use std::ops::{Bound::Included, Deref, Range};
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::Instant;
use std::time::{Duration, Instant, SystemTime};
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config::PageServerConf;
use crate::keyspace::KeySpace;
use crate::tenant_config::{TenantConf, TenantConfFile};
use crate::page_cache;
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
use crate::repository::{
@@ -117,6 +119,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
///
pub struct LayeredRepository {
pub conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
// This mutex prevents creation of new timelines during GC.
@@ -140,6 +144,10 @@ pub struct LayeredRepository {
impl Repository for LayeredRepository {
type Timeline = LayeredTimeline;
fn get_tenant_conf(&self) -> TenantConf {
self.tenant_conf
}
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> {
let timelines = self.timelines.lock().unwrap();
self.get_timeline_internal(timelineid, &timelines)
@@ -269,6 +277,7 @@ impl Repository for LayeredRepository {
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
let timeline_str = target_timelineid
@@ -278,7 +287,7 @@ impl Repository for LayeredRepository {
STORAGE_TIME
.with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
})
}
@@ -540,6 +549,7 @@ impl LayeredRepository {
pub fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenantid: ZTenantId,
remote_index: RemoteIndex,
@@ -548,6 +558,7 @@ impl LayeredRepository {
LayeredRepository {
tenantid,
conf,
tenant_conf,
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
@@ -556,6 +567,58 @@ impl LayeredRepository {
}
}
/// Save tenant's config to file
pub fn save_tenantconf(
conf: &'static PageServerConf,
tenantid: ZTenantId,
tenant_conf: TenantConf,
first_save: bool,
) -> Result<()> {
let _enter = info_span!("saving tenantconf").entered();
let path = TenantConf::tenantconf_path(conf, tenantid);
info!("save tenantconf to {}", path.display());
// use OpenOptions to ensure file presence is consistent with first_save
let mut file = VirtualFile::open_with_options(
&path,
OpenOptions::new().write(true).create_new(first_save),
)?;
let data = TenantConfFile::from(tenant_conf);
let tenantconf_bytes = data.to_bytes().context("Failed to get tenantconf bytes")?;
if file.write(&tenantconf_bytes)? != tenantconf_bytes.len() {
bail!("Could not write all the tenantconf bytes in a single call");
}
file.sync_all()?;
// fsync the parent directory to ensure the directory entry is durable
if first_save {
let tenant_dir = File::open(
&path
.parent()
.expect("Tenantconf should always have a parent dir"),
)?;
tenant_dir.sync_all()?;
}
Ok(())
}
pub fn load_tenantconf(
conf: &'static PageServerConf,
tenantid: ZTenantId,
) -> Result<TenantConf> {
let path = TenantConf::tenantconf_path(conf, tenantid);
info!("loading tenantconf from {}", path.display());
let tenantconf_bytes = std::fs::read(&path)?;
let tenant_conf_file = TenantConfFile::from_bytes(&tenantconf_bytes);
match tenant_conf_file {
Ok(tenant_conf) => return Ok(tenant_conf.body),
Err(err) => return Err(err),
};
}
/// Save timeline metadata to file
fn save_metadata(
conf: &'static PageServerConf,
@@ -631,6 +694,7 @@ impl LayeredRepository {
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
let _span_guard =
@@ -706,7 +770,7 @@ impl LayeredRepository {
timeline.checkpoint(CheckpointConfig::Forced)?;
info!("timeline {} checkpoint_before_gc done", timelineid);
}
timeline.update_gc_info(branchpoints, cutoff);
timeline.update_gc_info(branchpoints, cutoff, pitr);
let result = timeline.gc()?;
totals += result;
@@ -823,6 +887,11 @@ struct GcInfo {
///
/// FIXME: is this inclusive or exclusive?
cutoff: Lsn,
/// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()'
/// minus 'pitr_interval'
///
pitr: Duration,
}
/// Public interface functions
@@ -1028,6 +1097,7 @@ impl LayeredTimeline {
gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(),
cutoff: Lsn(0),
pitr: Duration::ZERO,
}),
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
@@ -1370,9 +1440,11 @@ impl LayeredTimeline {
///
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn();
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
let tenant_conf = repo.get_tenant_conf();
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
if distance >= self.conf.checkpoint_distance.into() {
if distance >= tenant_conf.checkpoint_distance.into() {
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
}
@@ -1578,13 +1650,18 @@ impl LayeredTimeline {
// above. Rewrite it.
let _compaction_cs = self.compaction_cs.lock().unwrap();
let target_file_size = self.conf.checkpoint_distance;
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
let tenant_conf = repo.get_tenant_conf();
let target_file_size = tenant_conf.checkpoint_distance;
// Define partitioning schema if needed
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
{
let (partitioning, lsn) =
pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?;
let (partitioning, lsn) = pgdir.repartition(
self.get_last_record_lsn(),
tenant_conf.compaction_target_size,
)?;
let timer = self.create_images_time_histo.start_timer();
// 2. Create new image layers for partitions that have been modified
// "enough".
@@ -1810,10 +1887,11 @@ impl LayeredTimeline {
/// the latest LSN subtracted by a constant, and doesn't do anything smart
/// to figure out what read-only nodes might actually need.)
///
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn) {
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn, pitr: Duration) {
let mut gc_info = self.gc_info.write().unwrap();
gc_info.retain_lsns = retain_lsns;
gc_info.cutoff = cutoff;
gc_info.pitr = pitr;
}
///
@@ -1824,7 +1902,7 @@ impl LayeredTimeline {
/// obsolete.
///
fn gc(&self) -> Result<GcResult> {
let now = Instant::now();
let now = SystemTime::now();
let mut result: GcResult = Default::default();
let disk_consistent_lsn = self.get_disk_consistent_lsn();
@@ -1833,6 +1911,7 @@ impl LayeredTimeline {
let gc_info = self.gc_info.read().unwrap();
let retain_lsns = &gc_info.retain_lsns;
let cutoff = gc_info.cutoff;
let pitr = gc_info.pitr;
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
@@ -1850,8 +1929,9 @@ impl LayeredTimeline {
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk image layers cover the layer's whole key range
// 2. it is older than PITR interval;
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
//
let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
@@ -1877,8 +1957,27 @@ impl LayeredTimeline {
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// 2. Is it needed by a child branch?
// 2. It is newer than PiTR interval?
// We use modification time of layer file to estimate update time.
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
// It is not expected that users will need high precision here. And this estimation
// is conservative: modification time of file is always newer than actual time of version
// creation. So it is safe for users.
//
if let Ok(metadata) = fs::metadata(&l.filename()) {
let last_modified = metadata.modified()?;
if now.duration_since(last_modified)? < pitr {
debug!(
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
l.filename().display(),
last_modified,
pitr
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
}
// 3. Is it needed by a child branch?
// NOTE With that wee would keep data that
// might be referenced by child branches forever.
// We can track this in child timeline GC and delete parent layers when
@@ -1897,7 +1996,7 @@ impl LayeredTimeline {
}
}
// 3. Is there a later on-disk layer for this relation?
// 4. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -1938,7 +2037,7 @@ impl LayeredTimeline {
result.layers_removed += 1;
}
result.elapsed = now.elapsed();
result.elapsed = now.elapsed()?;
Ok(result)
}
@@ -2215,7 +2314,8 @@ pub mod tests {
}
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
@@ -2285,7 +2385,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
@@ -2362,7 +2462,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;

View File

@@ -10,6 +10,7 @@ pub mod pgdatadir_mapping;
pub mod reltag;
pub mod remote_storage;
pub mod repository;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;

View File

@@ -19,6 +19,7 @@ use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
use tracing::*;
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::auth::{self, JwtAuth};
@@ -672,6 +673,37 @@ impl postgres_backend::Handler for PageServerHandler {
}
}
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
ensure!(params.len() == 1, "invalid param number for config command");
let tenantid = ZTenantId::from_str(params[0])?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"compaction_target_size"),
RowDescriptor::int8_col(b"compaction_period"),
RowDescriptor::int8_col(b"gc_horizon"),
RowDescriptor::int8_col(b"gc_period"),
RowDescriptor::int8_col(b"pitr_interval"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(tenant_conf.checkpoint_distance.to_string().as_bytes()),
Some(tenant_conf.compaction_target_size.to_string().as_bytes()),
Some(
tenant_conf
.compaction_period
.as_secs()
.to_string()
.as_bytes(),
),
Some(tenant_conf.gc_horizon.to_string().as_bytes()),
Some(tenant_conf.gc_period.as_secs().to_string().as_bytes()),
Some(tenant_conf.pitr_interval.as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") {
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.
@@ -689,16 +721,21 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
let gc_horizon: u64 = caps
.get(4)
.map(|h| h.as_str().parse())
.unwrap_or(Ok(self.conf.gc_horizon))?;
.unwrap_or(Ok(tenant_conf.gc_horizon))?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
RowDescriptor::int8_col(b"layers_needed_by_branches"),
RowDescriptor::int8_col(b"layers_not_updated"),
RowDescriptor::int8_col(b"layers_removed"),
@@ -707,6 +744,7 @@ impl postgres_backend::Handler for PageServerHandler {
.write_message_noflush(&BeMessage::DataRow(&[
Some(result.layers_total.to_string().as_bytes()),
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
Some(result.layers_needed_by_branches.to_string().as_bytes()),
Some(result.layers_not_updated.to_string().as_bytes()),
Some(result.layers_removed.to_string().as_bytes()),

View File

@@ -1,5 +1,6 @@
use crate::layered_repository::metadata::TimelineMetadata;
use crate::remote_storage::RemoteIndex;
use crate::tenant_config::TenantConf;
use crate::walrecord::ZenithWalRecord;
use crate::CheckpointConfig;
use anyhow::{bail, Result};
@@ -249,6 +250,7 @@ pub trait Repository: Send + Sync {
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
@@ -261,6 +263,8 @@ pub trait Repository: Send + Sync {
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
fn get_remote_index(&self) -> &RemoteIndex;
fn get_tenant_conf(&self) -> TenantConf;
}
/// A timeline, that belongs to the current repository.
@@ -303,6 +307,7 @@ impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
pub struct GcResult {
pub layers_total: u64,
pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64,
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
@@ -313,6 +318,7 @@ pub struct GcResult {
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.layers_total += other.layers_total;
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_not_updated += other.layers_not_updated;
@@ -454,6 +460,7 @@ pub mod repo_harness {
pub struct RepoHarness<'a> {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_id: ZTenantId,
pub lock_guard: (
@@ -485,12 +492,15 @@ pub mod repo_harness {
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let tenant_conf = TenantConf::dummy_conf();
let tenant_id = ZTenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
fs::create_dir_all(conf.timelines_path(&tenant_id))?;
Ok(Self {
conf,
tenant_conf,
tenant_id,
lock_guard,
})
@@ -505,6 +515,7 @@ pub mod repo_harness {
let repo = LayeredRepository::new(
self.conf,
self.tenant_conf,
walredo_mgr,
self.tenant_id,
RemoteIndex::empty(),
@@ -720,7 +731,7 @@ mod tests {
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
@@ -771,7 +782,7 @@ mod tests {
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
match tline.get(*TEST_KEY, Lsn(0x25)) {
@@ -794,7 +805,7 @@ mod tests {
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(())
@@ -813,7 +824,7 @@ mod tests {
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
// run gc on parent
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// Check that the data is still accessible on the branch.
assert_eq!(

View File

@@ -0,0 +1,220 @@
//! Functions for handling per-tenant configuration options
//!
//! If tenant is created with --config option,
//! the tenant-specific config will be stored in tenant's directory.
//! Otherwise, global pageserver's config is used.
//!
//! If the tenant config file is corrupted, the tenant will be disabled.
//! We cannot use global or default config instead, because wrong settings
//! may lead to a data loss.
//!
use crate::config::PageServerConf;
use crate::STORAGE_FORMAT_VERSION;
use anyhow::ensure;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::zid::ZTenantId;
pub const TENANT_CONFIG_NAME: &str = "config";
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
pub checkpoint_distance: u64,
pub compaction_target_size: u64,
pub compaction_period: Duration,
pub gc_horizon: u64,
pub gc_period: Duration,
pub pitr_interval: Duration,
}
/// We assume that a write of up to TENANTCONF_MAX_SIZE bytes is atomic.
///
/// This is the same assumption that PostgreSQL makes with the control file,
/// see PG_CONTROL_MAX_SAFE_SIZE
const TENANTCONF_MAX_SIZE: usize = 512;
/// TenantConfFile is stored on disk in tenant's directory
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TenantConfFile {
hdr: TenantConfHeader,
pub body: TenantConf,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TenantConfHeader {
checksum: u32, // CRC of serialized tenantconf body
size: u16, // size of serialized tenantconf
format_version: u16, // storage format version (used for compatibility checks)
}
const TENANTCONF_HDR_SIZE: usize = std::mem::size_of::<TenantConfHeader>();
impl TenantConfFile {
pub fn new(
checkpoint_distance: u64,
compaction_target_size: u64,
compaction_period: Duration,
gc_horizon: u64,
gc_period: Duration,
pitr_interval: Duration,
) -> Self {
Self {
hdr: TenantConfHeader {
checksum: 0,
size: 0,
format_version: STORAGE_FORMAT_VERSION,
},
body: TenantConf {
gc_period,
gc_horizon,
pitr_interval,
checkpoint_distance,
compaction_period,
compaction_target_size,
},
}
}
pub fn from(tconf: TenantConf) -> Self {
Self {
hdr: TenantConfHeader {
checksum: 0,
size: 0,
format_version: STORAGE_FORMAT_VERSION,
},
body: TenantConf {
gc_period: tconf.gc_period,
gc_horizon: tconf.gc_horizon,
pitr_interval: tconf.pitr_interval,
checkpoint_distance: tconf.checkpoint_distance,
compaction_period: tconf.compaction_period,
compaction_target_size: tconf.compaction_target_size,
},
}
}
pub fn from_bytes(tenantconf_bytes: &[u8]) -> anyhow::Result<Self> {
ensure!(
tenantconf_bytes.len() == TENANTCONF_MAX_SIZE,
"tenantconf bytes size is wrong"
);
let hdr = TenantConfHeader::des(&tenantconf_bytes[0..TENANTCONF_HDR_SIZE])?;
ensure!(
hdr.format_version == STORAGE_FORMAT_VERSION,
"format version mismatch"
);
let tenantconf_size = hdr.size as usize;
ensure!(
tenantconf_size <= TENANTCONF_MAX_SIZE,
"corrupted tenantconf file"
);
let calculated_checksum =
crc32c::crc32c(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size]);
ensure!(
hdr.checksum == calculated_checksum,
"tenantconf checksum mismatch"
);
let body = TenantConf::des(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size])?;
Ok(TenantConfFile { hdr, body })
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let body_bytes = self.body.ser()?;
let tenantconf_size = TENANTCONF_HDR_SIZE + body_bytes.len();
let hdr = TenantConfHeader {
size: tenantconf_size as u16,
format_version: STORAGE_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut tenantconf_bytes = vec![0u8; TENANTCONF_MAX_SIZE];
tenantconf_bytes[0..TENANTCONF_HDR_SIZE].copy_from_slice(&hdr_bytes);
tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size].copy_from_slice(&body_bytes);
Ok(tenantconf_bytes)
}
}
impl TenantConf {
pub fn default() -> TenantConf {
use defaults::*;
TenantConf {
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
}
}
/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
pub fn tenantconf_path(conf: &'static PageServerConf, tenantid: ZTenantId) -> PathBuf {
conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME)
}
#[cfg(test)]
pub fn dummy_conf() -> Self {
TenantConf {
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
pitr_interval: Duration::from_secs(60 * 60),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tenantconf_serializes_correctly() {
let original_tenantconf = TenantConfFile::new(
111,
111,
Duration::from_secs(111),
222,
Duration::from_secs(111),
Duration::from_secs(60 * 60),
);
let tenantconf_bytes = original_tenantconf
.to_bytes()
.expect("Should serialize correct tenantconf to bytes");
let deserialized_tenantconf = TenantConfFile::from_bytes(&tenantconf_bytes)
.expect("Should deserialize its own bytes");
assert_eq!(
deserialized_tenantconf.body, original_tenantconf.body,
"Tenantconf that was serialized to bytes and deserialized back should not change"
);
}
}

View File

@@ -5,6 +5,7 @@ use crate::config::PageServerConf;
use crate::layered_repository::LayeredRepository;
use crate::remote_storage::RemoteIndex;
use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::tenant_config::TenantConf;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
@@ -63,7 +64,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
TENANTS.lock().unwrap()
}
// Sets up wal redo manager and repository for tenant. Reduces code duplocation.
// Sets up wal redo manager and repository for tenant. Reduces code duplication.
// Used during pageserver startup, or when new tenant is attached to pageserver.
pub fn load_local_repo(
conf: &'static PageServerConf,
@@ -75,9 +76,15 @@ pub fn load_local_repo(
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Try to load config file
let tenant_conf = LayeredRepository::load_tenantconf(conf, tenant_id)
.with_context(|| format!("Failed to load tenant state for id {}", tenant_id))
.unwrap();
// Set up an object repository, for actual data storage.
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
conf,
tenant_conf,
Arc::new(walredo_mgr),
tenant_id,
remote_index.clone(),
@@ -174,6 +181,7 @@ pub fn shutdown_all_tenants() {
pub fn create_tenant_repository(
conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenantid: ZTenantId,
remote_index: RemoteIndex,
) -> Result<Option<ZTenantId>> {
@@ -186,6 +194,7 @@ pub fn create_tenant_repository(
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = timelines::create_repo(
conf,
tenant_conf,
tenantid,
CreateRepo::Real {
wal_redo_manager,
@@ -210,7 +219,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
/// Change the state of a tenant to Active and launch its compactor and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> {
pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
let mut m = access_tenants();
let tenant = m
.get_mut(&tenant_id)
@@ -230,7 +239,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None,
"Compactor thread",
true,
move || crate::tenant_threads::compact_loop(tenant_id, conf),
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
let gc_spawn_result = thread_mgr::spawn(
@@ -239,7 +248,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None,
"GC thread",
true,
move || crate::tenant_threads::gc_loop(tenant_id, conf),
move || crate::tenant_threads::gc_loop(tenant_id),
)
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
@@ -251,7 +260,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
tenant.state = TenantState::Active;
}
@@ -290,7 +298,7 @@ pub fn get_timeline_for_tenant_load(
.get_timeline_load(timelineid)
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?;
let repartition_distance = tenant.repo.conf.checkpoint_distance / 10;
let repartition_distance = tenant.repo.get_tenant_conf().checkpoint_distance / 10;
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
page_tline.init_logical_size()?;

View File

@@ -1,6 +1,5 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::config::PageServerConf;
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
@@ -12,8 +11,8 @@ use zenith_utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid, conf) {
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
@@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
}
}
fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
std::thread::sleep(conf.compaction_period);
std::thread::sleep(tenant_conf.compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
@@ -46,23 +47,29 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.gc_iteration(None, conf.gc_horizon, false)?;
if tenant_conf.gc_horizon > 0 {
repo.gc_iteration(
None,
tenant_conf.gc_horizon,
tenant_conf.pitr_interval,
false,
)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
let mut sleep_time = tenant_conf.gc_period.as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;

View File

@@ -23,6 +23,7 @@ use crate::{
layered_repository::metadata::TimelineMetadata,
remote_storage::RemoteIndex,
repository::{LocalTimelineState, Repository},
tenant_config::TenantConf,
DatadirTimeline, RepositoryImpl,
};
use crate::{import_datadir, LOG_FILE_NAME};
@@ -149,8 +150,8 @@ pub fn init_pageserver(
if let Some(tenant_id) = create_tenant {
println!("initializing tenantid {}", tenant_id);
let repo =
create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?;
let repo = create_repo(conf, TenantConf::default(), tenant_id, CreateRepo::Dummy)
.context("failed to create repo")?;
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
.context("failed to create initial timeline")?;
@@ -173,6 +174,7 @@ pub enum CreateRepo {
pub fn create_repo(
conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenant_id: ZTenantId,
create_repo: CreateRepo,
) -> Result<Arc<RepositoryImpl>> {
@@ -209,8 +211,12 @@ pub fn create_repo(
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
info!("created directory structure in {}", repo_dir.display());
// Save tenant's config
LayeredRepository::save_tenantconf(conf, tenant_id, tenant_conf, true)?;
Ok(Arc::new(LayeredRepository::new(
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_index,

View File

@@ -93,7 +93,7 @@ pub fn launch_wal_receiver(
receivers.insert((tenantid, timelineid), receiver);
// Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::activate_tenant(conf, tenantid)?;
tenant_mgr::activate_tenant(tenantid)?;
}
};
Ok(())

View File

@@ -0,0 +1,31 @@
from contextlib import closing
import pytest
from fixtures.zenith_fixtures import ZenithEnvBuilder
def test_tenant_config(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init_start()
"""Test per tenant configuration"""
tenant = env.zenith_cli.create_tenant(
conf={
'gc_period': '100sec',
'gc_horizon': '1024',
'pitr_interval': '3600sec',
'checkpoint_distance': '10000',
'compaction_period': '60sec',
'compaction_target_size': '1048576'
})
env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant)
pg = env.postgres.create_start(
"test_tenant_conf",
"main",
tenant,
)
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"show {tenant.hex}")
assert pscur.fetchone() == (10000, 1048576, 60, 1024, 100, 3600)

View File

@@ -853,13 +853,20 @@ class ZenithCli:
self.env = env
pass
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
def create_tenant(self,
tenant_id: Optional[uuid.UUID] = None,
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
"""
Creates a new tenant, returns its id and its initial timeline's id.
"""
if tenant_id is None:
tenant_id = uuid.uuid4()
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
if conf is None:
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
else:
res = self.raw_cli(
['tenant', 'create', '--tenant-id', tenant_id.hex] +
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res.check_returncode()
return tenant_id

View File

@@ -164,7 +164,8 @@ fn main() -> Result<()> {
.subcommand(App::new("create")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
)
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
)
.subcommand(
App::new("pageserver")
@@ -521,8 +522,12 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
}
Some(("create", create_match)) => {
let initial_tenant_id = parse_tenant_id(create_match)?;
let tenant_conf: HashMap<_, _> = create_match
.values_of("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let new_tenant_id = pageserver
.tenant_create(initial_tenant_id)?
.tenant_create(initial_tenant_id, tenant_conf)?
.ok_or_else(|| {
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
})?;