Compare commits

...

9 Commits

Author SHA1 Message Date
Anastasia Lubennikova
4d3b15fc3e Remove fields that are present in TenantConf from global PageserverConf to avoid confusion 2022-04-08 15:48:56 +03:00
Anastasia Lubennikova
c93d31f9db Move tenant_config code to a separate module.
Add checksum to tenantconf file.
Save tenant conf file to tenant's directory, when tenant is created.
2022-04-08 15:48:56 +03:00
Anastasia Lubennikova
47ece2be8b Add compaction_target_size to per-tenant config 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
3da8233f08 Fixes after rebase 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
51c7ceb1d9 Add test for per-tenant config 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
5169d664c5 Add 'show' query to pageserver protocol for tenantspecific config parameters 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
1bb364b5bc Address code review issues 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
7e6eff4969 Save tenant config in pageserver directory 2022-04-08 15:48:56 +03:00
Konstantin Knizhnik
baac8ac410 Define PiTR interval for GC and make it possible to specify per-tenant configuration parameters
refer #1320
2022-04-08 15:48:56 +03:00
17 changed files with 553 additions and 165 deletions

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use std::net::TcpStream; use std::net::TcpStream;
use std::path::PathBuf; use std::path::PathBuf;
@@ -342,10 +343,25 @@ impl PageServerNode {
pub fn tenant_create( pub fn tenant_create(
&self, &self,
new_tenant_id: Option<ZTenantId>, new_tenant_id: Option<ZTenantId>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<Option<ZTenantId>> { ) -> anyhow::Result<Option<ZTenantId>> {
let tenant_id_string = self let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url)) .http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest { new_tenant_id }) .json(&TenantCreateRequest {
new_tenant_id,
checkpoint_distance: settings
.get("checkpoint_distance")
.map(|x| x.parse::<u64>().unwrap()),
compaction_target_size: settings
.get("compaction_target_size")
.map(|x| x.parse::<u64>().unwrap()),
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
gc_horizon: settings
.get("gc_horizon")
.map(|x| x.parse::<u64>().unwrap()),
gc_period: settings.get("gc_period").map(|x| x.to_string()),
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
})
.send()? .send()?
.error_from_body()? .error_from_body()?
.json::<Option<String>>()?; .json::<Option<String>>()?;

View File

@@ -27,21 +27,6 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -62,13 +47,6 @@ pub mod defaults {
#listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}' #listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}' #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}' #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}' #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
@@ -94,22 +72,6 @@ pub struct PageServerConf {
/// Example (default): 127.0.0.1:9898 /// Example (default): 127.0.0.1:9898
pub listen_http_addr: String, pub listen_http_addr: String,
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
pub compaction_period: Duration,
pub gc_horizon: u64,
pub gc_period: Duration,
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
pub wait_lsn_timeout: Duration, pub wait_lsn_timeout: Duration,
// How long to wait for WAL redo to complete. // How long to wait for WAL redo to complete.
@@ -158,14 +120,6 @@ struct PageServerConfigBuilder {
listen_http_addr: BuilderValue<String>, listen_http_addr: BuilderValue<String>,
checkpoint_distance: BuilderValue<u64>,
compaction_target_size: BuilderValue<u64>,
compaction_period: BuilderValue<Duration>,
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
wait_lsn_timeout: BuilderValue<Duration>, wait_lsn_timeout: BuilderValue<Duration>,
wal_redo_timeout: BuilderValue<Duration>, wal_redo_timeout: BuilderValue<Duration>,
@@ -194,13 +148,6 @@ impl Default for PageServerConfigBuilder {
Self { Self {
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()), listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()), listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE),
compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period")),
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT) wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")), .expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT) wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
@@ -229,26 +176,6 @@ impl PageServerConfigBuilder {
self.listen_http_addr = BuilderValue::Set(listen_http_addr) self.listen_http_addr = BuilderValue::Set(listen_http_addr)
} }
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
}
pub fn compaction_target_size(&mut self, compaction_target_size: u64) {
self.compaction_target_size = BuilderValue::Set(compaction_target_size)
}
pub fn compaction_period(&mut self, compaction_period: Duration) {
self.compaction_period = BuilderValue::Set(compaction_period)
}
pub fn gc_horizon(&mut self, gc_horizon: u64) {
self.gc_horizon = BuilderValue::Set(gc_horizon)
}
pub fn gc_period(&mut self, gc_period: Duration) {
self.gc_period = BuilderValue::Set(gc_period)
}
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) { pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout) self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
} }
@@ -304,19 +231,6 @@ impl PageServerConfigBuilder {
listen_http_addr: self listen_http_addr: self
.listen_http_addr .listen_http_addr
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?, .ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
checkpoint_distance: self
.checkpoint_distance
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
compaction_target_size: self
.compaction_target_size
.ok_or(anyhow::anyhow!("missing compaction_target_size"))?,
compaction_period: self
.compaction_period
.ok_or(anyhow::anyhow!("missing compaction_period"))?,
gc_horizon: self
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
wait_lsn_timeout: self wait_lsn_timeout: self
.wait_lsn_timeout .wait_lsn_timeout
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?, .ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
@@ -448,13 +362,6 @@ impl PageServerConf {
match key { match key {
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?), "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?), "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
"compaction_target_size" => {
builder.compaction_target_size(parse_toml_u64(key, item)?)
}
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?), "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?), "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?), "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
@@ -587,11 +494,6 @@ impl PageServerConf {
pub fn dummy_conf(repo_dir: PathBuf) -> Self { pub fn dummy_conf(repo_dir: PathBuf) -> Self {
PageServerConf { PageServerConf {
id: ZNodeId(0), id: ZNodeId(0),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
wait_lsn_timeout: Duration::from_secs(60), wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60), wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE, page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
@@ -658,14 +560,6 @@ mod tests {
listen_pg_addr = '127.0.0.1:64000' listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898' listen_http_addr = '127.0.0.1:9898'
checkpoint_distance = 111 # in bytes
compaction_target_size = 111 # in bytes
compaction_period = '111 s'
gc_period = '222 s'
gc_horizon = 222
wait_lsn_timeout = '111 s' wait_lsn_timeout = '111 s'
wal_redo_timeout = '111 s' wal_redo_timeout = '111 s'
@@ -697,11 +591,6 @@ id = 10
id: ZNodeId(10), id: ZNodeId(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?, wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?, wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
superuser: defaults::DEFAULT_SUPERUSER.to_string(), superuser: defaults::DEFAULT_SUPERUSER.to_string(),
@@ -742,11 +631,6 @@ id = 10
id: ZNodeId(10), id: ZNodeId(10),
listen_pg_addr: "127.0.0.1:64000".to_string(), listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(),
checkpoint_distance: 111,
compaction_target_size: 111,
compaction_period: Duration::from_secs(111),
gc_horizon: 222,
gc_period: Duration::from_secs(222),
wait_lsn_timeout: Duration::from_secs(111), wait_lsn_timeout: Duration::from_secs(111),
wal_redo_timeout: Duration::from_secs(111), wal_redo_timeout: Duration::from_secs(111),
superuser: "zzzz".to_string(), superuser: "zzzz".to_string(),

View File

@@ -25,6 +25,12 @@ pub struct TenantCreateRequest {
#[serde(default)] #[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")] #[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<ZTenantId>, pub new_tenant_id: Option<ZTenantId>,
pub checkpoint_distance: Option<u64>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub pitr_interval: Option<String>,
} }
#[serde_as] #[serde_as]
@@ -36,3 +42,17 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId
pub struct StatusResponse { pub struct StatusResponse {
pub id: ZNodeId, pub id: ZNodeId,
} }
impl TenantCreateRequest {
pub fn new(new_tenant_id: Option<ZTenantId>) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
checkpoint_distance: None,
compaction_target_size: None,
compaction_period: None,
gc_horizon: None,
gc_period: None,
pitr_interval: None,
}
}
}

View File

@@ -333,6 +333,16 @@ paths:
new_tenant_id: new_tenant_id:
type: string type: string
format: hex format: hex
gc_period:
type: string
gc_horizon:
type: integer
pitr_interval:
type: string
checkpoint_distance:
type: integer
compaction_period:
type: string
responses: responses:
"201": "201":
description: New tenant created successfully description: New tenant created successfully

View File

@@ -23,6 +23,7 @@ use super::models::{
}; };
use crate::remote_storage::{schedule_timeline_download, RemoteIndex}; use crate::remote_storage::{schedule_timeline_download, RemoteIndex};
use crate::repository::Repository; use crate::repository::Repository;
use crate::tenant_config::TenantConf;
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo}; use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId}; use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
@@ -290,6 +291,28 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?; let request_data: TenantCreateRequest = json_request(&mut request).await?;
let remote_index = get_state(&request).remote_index.clone(); let remote_index = get_state(&request).remote_index.clone();
let mut tenant_conf = TenantConf::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period =
humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?;
}
if let Some(gc_horizon) = request_data.gc_horizon {
tenant_conf.gc_horizon = gc_horizon;
}
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval =
humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?;
}
if let Some(checkpoint_distance) = request_data.checkpoint_distance {
tenant_conf.checkpoint_distance = checkpoint_distance;
}
if let Some(compaction_target_size) = request_data.compaction_target_size {
tenant_conf.compaction_target_size = compaction_target_size;
}
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?;
}
let target_tenant_id = request_data let target_tenant_id = request_data
.new_tenant_id .new_tenant_id
.map(ZTenantId::from) .map(ZTenantId::from)
@@ -297,8 +320,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let new_tenant_id = tokio::task::spawn_blocking(move || { let new_tenant_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered(); let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
let conf = get_config(&request);
tenant_mgr::create_tenant_repository(get_config(&request), target_tenant_id, remote_index) tenant_mgr::create_tenant_repository(conf, tenant_conf, target_tenant_id, remote_index)
}) })
.await .await
.map_err(ApiError::from_err)??; .map_err(ApiError::from_err)??;

View File

@@ -29,11 +29,13 @@ use std::ops::{Bound::Included, Deref, Range};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool}; use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::Instant; use std::time::{Duration, Instant, SystemTime};
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::keyspace::KeySpace; use crate::keyspace::KeySpace;
use crate::tenant_config::{TenantConf, TenantConfFile};
use crate::page_cache; use crate::page_cache;
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex}; use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
use crate::repository::{ use crate::repository::{
@@ -117,6 +119,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
/// ///
pub struct LayeredRepository { pub struct LayeredRepository {
pub conf: &'static PageServerConf, pub conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenantid: ZTenantId, tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>, timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
// This mutex prevents creation of new timelines during GC. // This mutex prevents creation of new timelines during GC.
@@ -140,6 +144,10 @@ pub struct LayeredRepository {
impl Repository for LayeredRepository { impl Repository for LayeredRepository {
type Timeline = LayeredTimeline; type Timeline = LayeredTimeline;
fn get_tenant_conf(&self) -> TenantConf {
self.tenant_conf
}
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> { fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> {
let timelines = self.timelines.lock().unwrap(); let timelines = self.timelines.lock().unwrap();
self.get_timeline_internal(timelineid, &timelines) self.get_timeline_internal(timelineid, &timelines)
@@ -269,6 +277,7 @@ impl Repository for LayeredRepository {
&self, &self,
target_timelineid: Option<ZTimelineId>, target_timelineid: Option<ZTimelineId>,
horizon: u64, horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool, checkpoint_before_gc: bool,
) -> Result<GcResult> { ) -> Result<GcResult> {
let timeline_str = target_timelineid let timeline_str = target_timelineid
@@ -278,7 +287,7 @@ impl Repository for LayeredRepository {
STORAGE_TIME STORAGE_TIME
.with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str]) .with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str])
.observe_closure_duration(|| { .observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc) self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
}) })
} }
@@ -540,6 +549,7 @@ impl LayeredRepository {
pub fn new( pub fn new(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_conf: TenantConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>, walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenantid: ZTenantId, tenantid: ZTenantId,
remote_index: RemoteIndex, remote_index: RemoteIndex,
@@ -548,6 +558,7 @@ impl LayeredRepository {
LayeredRepository { LayeredRepository {
tenantid, tenantid,
conf, conf,
tenant_conf,
timelines: Mutex::new(HashMap::new()), timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()), gc_cs: Mutex::new(()),
walredo_mgr, walredo_mgr,
@@ -556,6 +567,58 @@ impl LayeredRepository {
} }
} }
/// Save tenant's config to file
pub fn save_tenantconf(
conf: &'static PageServerConf,
tenantid: ZTenantId,
tenant_conf: TenantConf,
first_save: bool,
) -> Result<()> {
let _enter = info_span!("saving tenantconf").entered();
let path = TenantConf::tenantconf_path(conf, tenantid);
info!("save tenantconf to {}", path.display());
// use OpenOptions to ensure file presence is consistent with first_save
let mut file = VirtualFile::open_with_options(
&path,
OpenOptions::new().write(true).create_new(first_save),
)?;
let data = TenantConfFile::from(tenant_conf);
let tenantconf_bytes = data.to_bytes().context("Failed to get tenantconf bytes")?;
if file.write(&tenantconf_bytes)? != tenantconf_bytes.len() {
bail!("Could not write all the tenantconf bytes in a single call");
}
file.sync_all()?;
// fsync the parent directory to ensure the directory entry is durable
if first_save {
let tenant_dir = File::open(
&path
.parent()
.expect("Tenantconf should always have a parent dir"),
)?;
tenant_dir.sync_all()?;
}
Ok(())
}
pub fn load_tenantconf(
conf: &'static PageServerConf,
tenantid: ZTenantId,
) -> Result<TenantConf> {
let path = TenantConf::tenantconf_path(conf, tenantid);
info!("loading tenantconf from {}", path.display());
let tenantconf_bytes = std::fs::read(&path)?;
let tenant_conf_file = TenantConfFile::from_bytes(&tenantconf_bytes);
match tenant_conf_file {
Ok(tenant_conf) => return Ok(tenant_conf.body),
Err(err) => return Err(err),
};
}
/// Save timeline metadata to file /// Save timeline metadata to file
fn save_metadata( fn save_metadata(
conf: &'static PageServerConf, conf: &'static PageServerConf,
@@ -631,6 +694,7 @@ impl LayeredRepository {
&self, &self,
target_timelineid: Option<ZTimelineId>, target_timelineid: Option<ZTimelineId>,
horizon: u64, horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool, checkpoint_before_gc: bool,
) -> Result<GcResult> { ) -> Result<GcResult> {
let _span_guard = let _span_guard =
@@ -706,7 +770,7 @@ impl LayeredRepository {
timeline.checkpoint(CheckpointConfig::Forced)?; timeline.checkpoint(CheckpointConfig::Forced)?;
info!("timeline {} checkpoint_before_gc done", timelineid); info!("timeline {} checkpoint_before_gc done", timelineid);
} }
timeline.update_gc_info(branchpoints, cutoff); timeline.update_gc_info(branchpoints, cutoff, pitr);
let result = timeline.gc()?; let result = timeline.gc()?;
totals += result; totals += result;
@@ -823,6 +887,11 @@ struct GcInfo {
/// ///
/// FIXME: is this inclusive or exclusive? /// FIXME: is this inclusive or exclusive?
cutoff: Lsn, cutoff: Lsn,
/// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()'
/// minus 'pitr_interval'
///
pitr: Duration,
} }
/// Public interface functions /// Public interface functions
@@ -1028,6 +1097,7 @@ impl LayeredTimeline {
gc_info: RwLock::new(GcInfo { gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(), retain_lsns: Vec::new(),
cutoff: Lsn(0), cutoff: Lsn(0),
pitr: Duration::ZERO,
}), }),
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
@@ -1370,9 +1440,11 @@ impl LayeredTimeline {
/// ///
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> { pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn(); let last_lsn = self.get_last_record_lsn();
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
let tenant_conf = repo.get_tenant_conf();
let distance = last_lsn.widening_sub(self.last_freeze_at.load()); let distance = last_lsn.widening_sub(self.last_freeze_at.load());
if distance >= self.conf.checkpoint_distance.into() { if distance >= tenant_conf.checkpoint_distance.into() {
self.freeze_inmem_layer(true); self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn); self.last_freeze_at.store(last_lsn);
} }
@@ -1578,13 +1650,18 @@ impl LayeredTimeline {
// above. Rewrite it. // above. Rewrite it.
let _compaction_cs = self.compaction_cs.lock().unwrap(); let _compaction_cs = self.compaction_cs.lock().unwrap();
let target_file_size = self.conf.checkpoint_distance; let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
let tenant_conf = repo.get_tenant_conf();
let target_file_size = tenant_conf.checkpoint_distance;
// Define partitioning schema if needed // Define partitioning schema if needed
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid) if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
{ {
let (partitioning, lsn) = let (partitioning, lsn) = pgdir.repartition(
pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?; self.get_last_record_lsn(),
tenant_conf.compaction_target_size,
)?;
let timer = self.create_images_time_histo.start_timer(); let timer = self.create_images_time_histo.start_timer();
// 2. Create new image layers for partitions that have been modified // 2. Create new image layers for partitions that have been modified
// "enough". // "enough".
@@ -1810,10 +1887,11 @@ impl LayeredTimeline {
/// the latest LSN subtracted by a constant, and doesn't do anything smart /// the latest LSN subtracted by a constant, and doesn't do anything smart
/// to figure out what read-only nodes might actually need.) /// to figure out what read-only nodes might actually need.)
/// ///
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn) { fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn, pitr: Duration) {
let mut gc_info = self.gc_info.write().unwrap(); let mut gc_info = self.gc_info.write().unwrap();
gc_info.retain_lsns = retain_lsns; gc_info.retain_lsns = retain_lsns;
gc_info.cutoff = cutoff; gc_info.cutoff = cutoff;
gc_info.pitr = pitr;
} }
/// ///
@@ -1824,7 +1902,7 @@ impl LayeredTimeline {
/// obsolete. /// obsolete.
/// ///
fn gc(&self) -> Result<GcResult> { fn gc(&self) -> Result<GcResult> {
let now = Instant::now(); let now = SystemTime::now();
let mut result: GcResult = Default::default(); let mut result: GcResult = Default::default();
let disk_consistent_lsn = self.get_disk_consistent_lsn(); let disk_consistent_lsn = self.get_disk_consistent_lsn();
@@ -1833,6 +1911,7 @@ impl LayeredTimeline {
let gc_info = self.gc_info.read().unwrap(); let gc_info = self.gc_info.read().unwrap();
let retain_lsns = &gc_info.retain_lsns; let retain_lsns = &gc_info.retain_lsns;
let cutoff = gc_info.cutoff; let cutoff = gc_info.cutoff;
let pitr = gc_info.pitr;
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered(); let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
@@ -1850,8 +1929,9 @@ impl LayeredTimeline {
// //
// Garbage collect the layer if all conditions are satisfied: // Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN; // 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns'; // 2. it is older than PITR interval;
// 3. newer on-disk image layers cover the layer's whole key range // 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
// //
let mut layers = self.layers.lock().unwrap(); let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() { 'outer: for l in layers.iter_historic_layers() {
@@ -1877,8 +1957,27 @@ impl LayeredTimeline {
result.layers_needed_by_cutoff += 1; result.layers_needed_by_cutoff += 1;
continue 'outer; continue 'outer;
} }
// 2. It is newer than PiTR interval?
// 2. Is it needed by a child branch? // We use modification time of layer file to estimate update time.
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
// It is not expected that users will need high precision here. And this estimation
// is conservative: modification time of file is always newer than actual time of version
// creation. So it is safe for users.
//
if let Ok(metadata) = fs::metadata(&l.filename()) {
let last_modified = metadata.modified()?;
if now.duration_since(last_modified)? < pitr {
debug!(
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
l.filename().display(),
last_modified,
pitr
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
}
// 3. Is it needed by a child branch?
// NOTE With that wee would keep data that // NOTE With that wee would keep data that
// might be referenced by child branches forever. // might be referenced by child branches forever.
// We can track this in child timeline GC and delete parent layers when // We can track this in child timeline GC and delete parent layers when
@@ -1897,7 +1996,7 @@ impl LayeredTimeline {
} }
} }
// 3. Is there a later on-disk layer for this relation? // 4. Is there a later on-disk layer for this relation?
// //
// The end-LSN is exclusive, while disk_consistent_lsn is // The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is // inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -1938,7 +2037,7 @@ impl LayeredTimeline {
result.layers_removed += 1; result.layers_removed += 1;
} }
result.elapsed = now.elapsed(); result.elapsed = now.elapsed()?;
Ok(result) Ok(result)
} }
@@ -2215,7 +2314,8 @@ pub mod tests {
} }
let cutoff = tline.get_last_record_lsn(); let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?; tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?; tline.compact()?;
tline.gc()?; tline.gc()?;
@@ -2285,7 +2385,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC // Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn); println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn(); let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?; tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?; tline.compact()?;
tline.gc()?; tline.gc()?;
@@ -2362,7 +2462,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC // Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn); println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn(); let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?; tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?; tline.compact()?;
tline.gc()?; tline.gc()?;

View File

@@ -10,6 +10,7 @@ pub mod pgdatadir_mapping;
pub mod reltag; pub mod reltag;
pub mod remote_storage; pub mod remote_storage;
pub mod repository; pub mod repository;
pub mod tenant_config;
pub mod tenant_mgr; pub mod tenant_mgr;
pub mod tenant_threads; pub mod tenant_threads;
pub mod thread_mgr; pub mod thread_mgr;

View File

@@ -19,6 +19,7 @@ use std::net::TcpListener;
use std::str; use std::str;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, RwLockReadGuard}; use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
use tracing::*; use tracing::*;
use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::auth::{self, JwtAuth}; use zenith_utils::auth::{self, JwtAuth};
@@ -672,6 +673,37 @@ impl postgres_backend::Handler for PageServerHandler {
} }
} }
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
ensure!(params.len() == 1, "invalid param number for config command");
let tenantid = ZTenantId::from_str(params[0])?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"compaction_target_size"),
RowDescriptor::int8_col(b"compaction_period"),
RowDescriptor::int8_col(b"gc_horizon"),
RowDescriptor::int8_col(b"gc_period"),
RowDescriptor::int8_col(b"pitr_interval"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(tenant_conf.checkpoint_distance.to_string().as_bytes()),
Some(tenant_conf.compaction_target_size.to_string().as_bytes()),
Some(
tenant_conf
.compaction_period
.as_secs()
.to_string()
.as_bytes(),
),
Some(tenant_conf.gc_horizon.to_string().as_bytes()),
Some(tenant_conf.gc_period.as_secs().to_string().as_bytes()),
Some(tenant_conf.pitr_interval.as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") { } else if query_string.starts_with("do_gc ") {
// Run GC immediately on given timeline. // Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py. // FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.
@@ -689,16 +721,21 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
let gc_horizon: u64 = caps let gc_horizon: u64 = caps
.get(4) .get(4)
.map(|h| h.as_str().parse()) .map(|h| h.as_str().parse())
.unwrap_or(Ok(self.conf.gc_horizon))?; .unwrap_or(Ok(tenant_conf.gc_horizon))?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[ pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"), RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"), RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
RowDescriptor::int8_col(b"layers_needed_by_branches"), RowDescriptor::int8_col(b"layers_needed_by_branches"),
RowDescriptor::int8_col(b"layers_not_updated"), RowDescriptor::int8_col(b"layers_not_updated"),
RowDescriptor::int8_col(b"layers_removed"), RowDescriptor::int8_col(b"layers_removed"),
@@ -707,6 +744,7 @@ impl postgres_backend::Handler for PageServerHandler {
.write_message_noflush(&BeMessage::DataRow(&[ .write_message_noflush(&BeMessage::DataRow(&[
Some(result.layers_total.to_string().as_bytes()), Some(result.layers_total.to_string().as_bytes()),
Some(result.layers_needed_by_cutoff.to_string().as_bytes()), Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
Some(result.layers_needed_by_branches.to_string().as_bytes()), Some(result.layers_needed_by_branches.to_string().as_bytes()),
Some(result.layers_not_updated.to_string().as_bytes()), Some(result.layers_not_updated.to_string().as_bytes()),
Some(result.layers_removed.to_string().as_bytes()), Some(result.layers_removed.to_string().as_bytes()),

View File

@@ -1,5 +1,6 @@
use crate::layered_repository::metadata::TimelineMetadata; use crate::layered_repository::metadata::TimelineMetadata;
use crate::remote_storage::RemoteIndex; use crate::remote_storage::RemoteIndex;
use crate::tenant_config::TenantConf;
use crate::walrecord::ZenithWalRecord; use crate::walrecord::ZenithWalRecord;
use crate::CheckpointConfig; use crate::CheckpointConfig;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
@@ -249,6 +250,7 @@ pub trait Repository: Send + Sync {
&self, &self,
timelineid: Option<ZTimelineId>, timelineid: Option<ZTimelineId>,
horizon: u64, horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool, checkpoint_before_gc: bool,
) -> Result<GcResult>; ) -> Result<GcResult>;
@@ -261,6 +263,8 @@ pub trait Repository: Send + Sync {
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. // Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
fn get_remote_index(&self) -> &RemoteIndex; fn get_remote_index(&self) -> &RemoteIndex;
fn get_tenant_conf(&self) -> TenantConf;
} }
/// A timeline, that belongs to the current repository. /// A timeline, that belongs to the current repository.
@@ -303,6 +307,7 @@ impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
pub struct GcResult { pub struct GcResult {
pub layers_total: u64, pub layers_total: u64,
pub layers_needed_by_cutoff: u64, pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64, pub layers_needed_by_branches: u64,
pub layers_not_updated: u64, pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
@@ -313,6 +318,7 @@ pub struct GcResult {
impl AddAssign for GcResult { impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) { fn add_assign(&mut self, other: Self) {
self.layers_total += other.layers_total; self.layers_total += other.layers_total;
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches; self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_not_updated += other.layers_not_updated; self.layers_not_updated += other.layers_not_updated;
@@ -454,6 +460,7 @@ pub mod repo_harness {
pub struct RepoHarness<'a> { pub struct RepoHarness<'a> {
pub conf: &'static PageServerConf, pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_id: ZTenantId, pub tenant_id: ZTenantId,
pub lock_guard: ( pub lock_guard: (
@@ -485,12 +492,15 @@ pub mod repo_harness {
// OK in a test. // OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf)); let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let tenant_conf = TenantConf::dummy_conf();
let tenant_id = ZTenantId::generate(); let tenant_id = ZTenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenant_id))?; fs::create_dir_all(conf.tenant_path(&tenant_id))?;
fs::create_dir_all(conf.timelines_path(&tenant_id))?; fs::create_dir_all(conf.timelines_path(&tenant_id))?;
Ok(Self { Ok(Self {
conf, conf,
tenant_conf,
tenant_id, tenant_id,
lock_guard, lock_guard,
}) })
@@ -505,6 +515,7 @@ pub mod repo_harness {
let repo = LayeredRepository::new( let repo = LayeredRepository::new(
self.conf, self.conf,
self.tenant_conf,
walredo_mgr, walredo_mgr,
self.tenant_id, self.tenant_id,
RemoteIndex::empty(), RemoteIndex::empty(),
@@ -720,7 +731,7 @@ mod tests {
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing // FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check // and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail. // below should fail.
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// try to branch at lsn 25, should fail because we already garbage collected the data // try to branch at lsn 25, should fail because we already garbage collected the data
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) { match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
@@ -771,7 +782,7 @@ mod tests {
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?; make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn(); let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
assert!(*latest_gc_cutoff_lsn > Lsn(0x25)); assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
match tline.get(*TEST_KEY, Lsn(0x25)) { match tline.get(*TEST_KEY, Lsn(0x25)) {
@@ -794,7 +805,7 @@ mod tests {
.get_timeline_load(NEW_TIMELINE_ID) .get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline"); .expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok()); assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(()) Ok(())
@@ -813,7 +824,7 @@ mod tests {
make_some_layers(newtline.as_ref(), Lsn(0x60))?; make_some_layers(newtline.as_ref(), Lsn(0x60))?;
// run gc on parent // run gc on parent
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// Check that the data is still accessible on the branch. // Check that the data is still accessible on the branch.
assert_eq!( assert_eq!(

View File

@@ -0,0 +1,220 @@
//! Functions for handling per-tenant configuration options
//!
//! If tenant is created with --config option,
//! the tenant-specific config will be stored in tenant's directory.
//! Otherwise, global pageserver's config is used.
//!
//! If the tenant config file is corrupted, the tenant will be disabled.
//! We cannot use global or default config instead, because wrong settings
//! may lead to a data loss.
//!
use crate::config::PageServerConf;
use crate::STORAGE_FORMAT_VERSION;
use anyhow::ensure;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::zid::ZTenantId;
pub const TENANT_CONFIG_NAME: &str = "config";
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
pub checkpoint_distance: u64,
pub compaction_target_size: u64,
pub compaction_period: Duration,
pub gc_horizon: u64,
pub gc_period: Duration,
pub pitr_interval: Duration,
}
/// We assume that a write of up to TENANTCONF_MAX_SIZE bytes is atomic.
///
/// This is the same assumption that PostgreSQL makes with the control file,
/// see PG_CONTROL_MAX_SAFE_SIZE
const TENANTCONF_MAX_SIZE: usize = 512;
/// TenantConfFile is stored on disk in tenant's directory
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TenantConfFile {
hdr: TenantConfHeader,
pub body: TenantConf,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TenantConfHeader {
checksum: u32, // CRC of serialized tenantconf body
size: u16, // size of serialized tenantconf
format_version: u16, // storage format version (used for compatibility checks)
}
const TENANTCONF_HDR_SIZE: usize = std::mem::size_of::<TenantConfHeader>();
impl TenantConfFile {
pub fn new(
checkpoint_distance: u64,
compaction_target_size: u64,
compaction_period: Duration,
gc_horizon: u64,
gc_period: Duration,
pitr_interval: Duration,
) -> Self {
Self {
hdr: TenantConfHeader {
checksum: 0,
size: 0,
format_version: STORAGE_FORMAT_VERSION,
},
body: TenantConf {
gc_period,
gc_horizon,
pitr_interval,
checkpoint_distance,
compaction_period,
compaction_target_size,
},
}
}
pub fn from(tconf: TenantConf) -> Self {
Self {
hdr: TenantConfHeader {
checksum: 0,
size: 0,
format_version: STORAGE_FORMAT_VERSION,
},
body: TenantConf {
gc_period: tconf.gc_period,
gc_horizon: tconf.gc_horizon,
pitr_interval: tconf.pitr_interval,
checkpoint_distance: tconf.checkpoint_distance,
compaction_period: tconf.compaction_period,
compaction_target_size: tconf.compaction_target_size,
},
}
}
pub fn from_bytes(tenantconf_bytes: &[u8]) -> anyhow::Result<Self> {
ensure!(
tenantconf_bytes.len() == TENANTCONF_MAX_SIZE,
"tenantconf bytes size is wrong"
);
let hdr = TenantConfHeader::des(&tenantconf_bytes[0..TENANTCONF_HDR_SIZE])?;
ensure!(
hdr.format_version == STORAGE_FORMAT_VERSION,
"format version mismatch"
);
let tenantconf_size = hdr.size as usize;
ensure!(
tenantconf_size <= TENANTCONF_MAX_SIZE,
"corrupted tenantconf file"
);
let calculated_checksum =
crc32c::crc32c(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size]);
ensure!(
hdr.checksum == calculated_checksum,
"tenantconf checksum mismatch"
);
let body = TenantConf::des(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size])?;
Ok(TenantConfFile { hdr, body })
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let body_bytes = self.body.ser()?;
let tenantconf_size = TENANTCONF_HDR_SIZE + body_bytes.len();
let hdr = TenantConfHeader {
size: tenantconf_size as u16,
format_version: STORAGE_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut tenantconf_bytes = vec![0u8; TENANTCONF_MAX_SIZE];
tenantconf_bytes[0..TENANTCONF_HDR_SIZE].copy_from_slice(&hdr_bytes);
tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size].copy_from_slice(&body_bytes);
Ok(tenantconf_bytes)
}
}
impl TenantConf {
pub fn default() -> TenantConf {
use defaults::*;
TenantConf {
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
}
}
/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
pub fn tenantconf_path(conf: &'static PageServerConf, tenantid: ZTenantId) -> PathBuf {
conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME)
}
#[cfg(test)]
pub fn dummy_conf() -> Self {
TenantConf {
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
pitr_interval: Duration::from_secs(60 * 60),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tenantconf_serializes_correctly() {
let original_tenantconf = TenantConfFile::new(
111,
111,
Duration::from_secs(111),
222,
Duration::from_secs(111),
Duration::from_secs(60 * 60),
);
let tenantconf_bytes = original_tenantconf
.to_bytes()
.expect("Should serialize correct tenantconf to bytes");
let deserialized_tenantconf = TenantConfFile::from_bytes(&tenantconf_bytes)
.expect("Should deserialize its own bytes");
assert_eq!(
deserialized_tenantconf.body, original_tenantconf.body,
"Tenantconf that was serialized to bytes and deserialized back should not change"
);
}
}

View File

@@ -5,6 +5,7 @@ use crate::config::PageServerConf;
use crate::layered_repository::LayeredRepository; use crate::layered_repository::LayeredRepository;
use crate::remote_storage::RemoteIndex; use crate::remote_storage::RemoteIndex;
use crate::repository::{Repository, TimelineSyncStatusUpdate}; use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::tenant_config::TenantConf;
use crate::thread_mgr; use crate::thread_mgr;
use crate::thread_mgr::ThreadKind; use crate::thread_mgr::ThreadKind;
use crate::timelines; use crate::timelines;
@@ -63,7 +64,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
TENANTS.lock().unwrap() TENANTS.lock().unwrap()
} }
// Sets up wal redo manager and repository for tenant. Reduces code duplocation. // Sets up wal redo manager and repository for tenant. Reduces code duplication.
// Used during pageserver startup, or when new tenant is attached to pageserver. // Used during pageserver startup, or when new tenant is attached to pageserver.
pub fn load_local_repo( pub fn load_local_repo(
conf: &'static PageServerConf, conf: &'static PageServerConf,
@@ -75,9 +76,15 @@ pub fn load_local_repo(
// Set up a WAL redo manager, for applying WAL records. // Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Try to load config file
let tenant_conf = LayeredRepository::load_tenantconf(conf, tenant_id)
.with_context(|| format!("Failed to load tenant state for id {}", tenant_id))
.unwrap();
// Set up an object repository, for actual data storage. // Set up an object repository, for actual data storage.
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new( let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
conf, conf,
tenant_conf,
Arc::new(walredo_mgr), Arc::new(walredo_mgr),
tenant_id, tenant_id,
remote_index.clone(), remote_index.clone(),
@@ -174,6 +181,7 @@ pub fn shutdown_all_tenants() {
pub fn create_tenant_repository( pub fn create_tenant_repository(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenantid: ZTenantId, tenantid: ZTenantId,
remote_index: RemoteIndex, remote_index: RemoteIndex,
) -> Result<Option<ZTenantId>> { ) -> Result<Option<ZTenantId>> {
@@ -186,6 +194,7 @@ pub fn create_tenant_repository(
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = timelines::create_repo( let repo = timelines::create_repo(
conf, conf,
tenant_conf,
tenantid, tenantid,
CreateRepo::Real { CreateRepo::Real {
wal_redo_manager, wal_redo_manager,
@@ -210,7 +219,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
/// Change the state of a tenant to Active and launch its compactor and GC /// Change the state of a tenant to Active and launch its compactor and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing. /// threads. If the tenant was already in Active state or Stopping, does nothing.
/// ///
pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> { pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
let mut m = access_tenants(); let mut m = access_tenants();
let tenant = m let tenant = m
.get_mut(&tenant_id) .get_mut(&tenant_id)
@@ -230,7 +239,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None, None,
"Compactor thread", "Compactor thread",
true, true,
move || crate::tenant_threads::compact_loop(tenant_id, conf), move || crate::tenant_threads::compact_loop(tenant_id),
)?; )?;
let gc_spawn_result = thread_mgr::spawn( let gc_spawn_result = thread_mgr::spawn(
@@ -239,7 +248,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None, None,
"GC thread", "GC thread",
true, true,
move || crate::tenant_threads::gc_loop(tenant_id, conf), move || crate::tenant_threads::gc_loop(tenant_id),
) )
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id)); .with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
@@ -251,7 +260,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result; return gc_spawn_result;
} }
tenant.state = TenantState::Active; tenant.state = TenantState::Active;
} }
@@ -290,7 +298,7 @@ pub fn get_timeline_for_tenant_load(
.get_timeline_load(timelineid) .get_timeline_load(timelineid)
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?; .with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?;
let repartition_distance = tenant.repo.conf.checkpoint_distance / 10; let repartition_distance = tenant.repo.get_tenant_conf().checkpoint_distance / 10;
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance)); let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
page_tline.init_logical_size()?; page_tline.init_logical_size()?;

View File

@@ -1,6 +1,5 @@
//! This module contains functions to serve per-tenant background processes, //! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC //! such as compaction and GC
use crate::config::PageServerConf;
use crate::repository::Repository; use crate::repository::Repository;
use crate::tenant_mgr; use crate::tenant_mgr;
use crate::tenant_mgr::TenantState; use crate::tenant_mgr::TenantState;
@@ -12,8 +11,8 @@ use zenith_utils::zid::ZTenantId;
/// ///
/// Compaction thread's main loop /// Compaction thread's main loop
/// ///
pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid, conf) { if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err); error!("compact loop terminated with error: {:?}", err);
Err(err) Err(err)
} else { } else {
@@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
} }
} }
fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop { loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break; break;
} }
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
std::thread::sleep(conf.compaction_period); std::thread::sleep(tenant_conf.compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid); trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines // Compact timelines
@@ -46,23 +47,29 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
/// ///
/// GC thread's main loop /// GC thread's main loop
/// ///
pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop { loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break; break;
} }
trace!("gc thread for tenant {} waking up", tenantid); trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_conf = repo.get_tenant_conf();
// Garbage collect old files that are not needed for PITR anymore // Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 { if tenant_conf.gc_horizon > 0 {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; repo.gc_iteration(
repo.gc_iteration(None, conf.gc_horizon, false)?; None,
tenant_conf.gc_horizon,
tenant_conf.pitr_interval,
false,
)?;
} }
// TODO Write it in more adequate way using // TODO Write it in more adequate way using
// condvar.wait_timeout() or something // condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs(); let mut sleep_time = tenant_conf.gc_period.as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active) while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{ {
sleep_time -= 1; sleep_time -= 1;

View File

@@ -23,6 +23,7 @@ use crate::{
layered_repository::metadata::TimelineMetadata, layered_repository::metadata::TimelineMetadata,
remote_storage::RemoteIndex, remote_storage::RemoteIndex,
repository::{LocalTimelineState, Repository}, repository::{LocalTimelineState, Repository},
tenant_config::TenantConf,
DatadirTimeline, RepositoryImpl, DatadirTimeline, RepositoryImpl,
}; };
use crate::{import_datadir, LOG_FILE_NAME}; use crate::{import_datadir, LOG_FILE_NAME};
@@ -149,8 +150,8 @@ pub fn init_pageserver(
if let Some(tenant_id) = create_tenant { if let Some(tenant_id) = create_tenant {
println!("initializing tenantid {}", tenant_id); println!("initializing tenantid {}", tenant_id);
let repo = let repo = create_repo(conf, TenantConf::default(), tenant_id, CreateRepo::Dummy)
create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?; .context("failed to create repo")?;
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate); let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref()) bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
.context("failed to create initial timeline")?; .context("failed to create initial timeline")?;
@@ -173,6 +174,7 @@ pub enum CreateRepo {
pub fn create_repo( pub fn create_repo(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_conf: TenantConf,
tenant_id: ZTenantId, tenant_id: ZTenantId,
create_repo: CreateRepo, create_repo: CreateRepo,
) -> Result<Arc<RepositoryImpl>> { ) -> Result<Arc<RepositoryImpl>> {
@@ -209,8 +211,12 @@ pub fn create_repo(
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?; crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
info!("created directory structure in {}", repo_dir.display()); info!("created directory structure in {}", repo_dir.display());
// Save tenant's config
LayeredRepository::save_tenantconf(conf, tenant_id, tenant_conf, true)?;
Ok(Arc::new(LayeredRepository::new( Ok(Arc::new(LayeredRepository::new(
conf, conf,
tenant_conf,
wal_redo_manager, wal_redo_manager,
tenant_id, tenant_id,
remote_index, remote_index,

View File

@@ -93,7 +93,7 @@ pub fn launch_wal_receiver(
receivers.insert((tenantid, timelineid), receiver); receivers.insert((tenantid, timelineid), receiver);
// Update tenant state and start tenant threads, if they are not running yet. // Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::activate_tenant(conf, tenantid)?; tenant_mgr::activate_tenant(tenantid)?;
} }
}; };
Ok(()) Ok(())

View File

@@ -0,0 +1,31 @@
from contextlib import closing
import pytest
from fixtures.zenith_fixtures import ZenithEnvBuilder
def test_tenant_config(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init_start()
"""Test per tenant configuration"""
tenant = env.zenith_cli.create_tenant(
conf={
'gc_period': '100sec',
'gc_horizon': '1024',
'pitr_interval': '3600sec',
'checkpoint_distance': '10000',
'compaction_period': '60sec',
'compaction_target_size': '1048576'
})
env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant)
pg = env.postgres.create_start(
"test_tenant_conf",
"main",
tenant,
)
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"show {tenant.hex}")
assert pscur.fetchone() == (10000, 1048576, 60, 1024, 100, 3600)

View File

@@ -853,13 +853,20 @@ class ZenithCli:
self.env = env self.env = env
pass pass
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID: def create_tenant(self,
tenant_id: Optional[uuid.UUID] = None,
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
""" """
Creates a new tenant, returns its id and its initial timeline's id. Creates a new tenant, returns its id and its initial timeline's id.
""" """
if tenant_id is None: if tenant_id is None:
tenant_id = uuid.uuid4() tenant_id = uuid.uuid4()
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex]) if conf is None:
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
else:
res = self.raw_cli(
['tenant', 'create', '--tenant-id', tenant_id.hex] +
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res.check_returncode() res.check_returncode()
return tenant_id return tenant_id

View File

@@ -164,7 +164,8 @@ fn main() -> Result<()> {
.subcommand(App::new("create") .subcommand(App::new("create")
.arg(tenant_id_arg.clone()) .arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
) .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
) )
.subcommand( .subcommand(
App::new("pageserver") App::new("pageserver")
@@ -521,8 +522,12 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
} }
Some(("create", create_match)) => { Some(("create", create_match)) => {
let initial_tenant_id = parse_tenant_id(create_match)?; let initial_tenant_id = parse_tenant_id(create_match)?;
let tenant_conf: HashMap<_, _> = create_match
.values_of("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let new_tenant_id = pageserver let new_tenant_id = pageserver
.tenant_create(initial_tenant_id)? .tenant_create(initial_tenant_id, tenant_conf)?
.ok_or_else(|| { .ok_or_else(|| {
anyhow!("Tenant with id {:?} was already created", initial_tenant_id) anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
})?; })?;