Make it possible to specify per-tenant configuration parameters

Add tenant config API and 'zenith tenant config' CLI command.
Add 'show' query to pageserver protocol for tenantspecific config parameters

Refactoring: move tenant_config code to a separate module.
Save tenant conf file to tenant's directory, when tenant is created to recover it on pageserver restart.
Ignore error during tenant config loading, while it is not supported by console

Define PiTR interval for GC.

refer #1320
This commit is contained in:
Konstantin Knizhnik
2022-02-25 19:33:44 +03:00
committed by Anastasia Lubennikova
parent a4700c9bbe
commit 5f83c9290b
18 changed files with 915 additions and 208 deletions

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
@@ -9,7 +10,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest};
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
use pageserver::timelines::TimelineInfo;
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -344,10 +345,32 @@ impl PageServerNode {
pub fn tenant_create(
&self,
new_tenant_id: Option<ZTenantId>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<Option<ZTenantId>> {
let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest { new_tenant_id })
.json(&TenantCreateRequest {
new_tenant_id,
checkpoint_distance: settings
.get("checkpoint_distance")
.map(|x| x.parse::<u64>())
.transpose()?,
compaction_target_size: settings
.get("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()?,
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.get("compaction_threshold")
.map(|x| x.parse::<usize>())
.transpose()?,
gc_horizon: settings
.get("gc_horizon")
.map(|x| x.parse::<u64>())
.transpose()?,
gc_period: settings.get("gc_period").map(|x| x.to_string()),
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
})
.send()?
.error_from_body()?
.json::<Option<String>>()?;
@@ -364,6 +387,32 @@ impl PageServerNode {
.transpose()
}
pub fn tenant_config(&self, tenant_id: ZTenantId, settings: HashMap<&str, &str>) -> Result<()> {
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))
.json(&TenantConfigRequest {
tenant_id,
checkpoint_distance: settings
.get("checkpoint_distance")
.map(|x| x.parse::<u64>().unwrap()),
compaction_target_size: settings
.get("compaction_target_size")
.map(|x| x.parse::<u64>().unwrap()),
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.get("compaction_threshold")
.map(|x| x.parse::<usize>().unwrap()),
gc_horizon: settings
.get("gc_horizon")
.map(|x| x.parse::<u64>().unwrap()),
gc_period: settings.get("gc_period").map(|x| x.to_string()),
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
})
.send()?
.error_from_body()?;
Ok(())
}
pub fn timeline_list(&self, tenant_id: &ZTenantId) -> anyhow::Result<Vec<TimelineInfo>> {
let timeline_infos: Vec<TimelineInfo> = self
.http_request(

View File

@@ -246,11 +246,12 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
// initialize local tenant
let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index);
let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index)
.with_context(|| format!("Failed to load repo for tenant {}", tenant_id))?;
for (timeline_id, init_status) in local_timeline_init_statuses {
match init_status {
remote_storage::LocalTimelineInitStatus::LocallyComplete => {
debug!("timeline {} for tenant {} is locally complete, registering it in repository", tenant_id, timeline_id);
debug!("timeline {} for tenant {} is locally complete, registering it in repository", timeline_id, tenant_id);
// Lets fail here loudly to be on the safe side.
// XXX: It may be a better api to actually distinguish between repository startup
// and processing of newly downloaded timelines.

View File

@@ -5,6 +5,12 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{bail, ensure, Context, Result};
use std::convert::TryInto;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
use toml_edit;
use toml_edit::{Document, Item};
use utils::{
@@ -12,16 +18,11 @@ use utils::{
zid::{ZNodeId, ZTenantId, ZTimelineId},
};
use std::convert::TryInto;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
use crate::tenant_config::{TenantConf, TenantConfOpt};
pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
@@ -29,21 +30,6 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -64,14 +50,6 @@ pub mod defaults {
#listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
@@ -80,6 +58,16 @@ pub mod defaults {
# initial superuser role name to use when creating a new tenant
#initial_superuser_name = '{DEFAULT_SUPERUSER}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
# [remote_storage]
"###
@@ -97,25 +85,6 @@ pub struct PageServerConf {
/// Example (default): 127.0.0.1:9898
pub listen_http_addr: String,
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
pub compaction_period: Duration,
// Level0 delta layer threshold for compaction.
pub compaction_threshold: usize,
pub gc_horizon: u64,
pub gc_period: Duration,
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
pub wait_lsn_timeout: Duration,
// How long to wait for WAL redo to complete.
@@ -142,6 +111,7 @@ pub struct PageServerConf {
pub remote_storage_config: Option<RemoteStorageConfig>,
pub profiling: ProfilingConfig,
pub default_tenant_conf: TenantConf,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -185,15 +155,6 @@ struct PageServerConfigBuilder {
listen_http_addr: BuilderValue<String>,
checkpoint_distance: BuilderValue<u64>,
compaction_target_size: BuilderValue<u64>,
compaction_period: BuilderValue<Duration>,
compaction_threshold: BuilderValue<usize>,
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
wait_lsn_timeout: BuilderValue<Duration>,
wal_redo_timeout: BuilderValue<Duration>,
@@ -224,14 +185,6 @@ impl Default for PageServerConfigBuilder {
Self {
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE),
compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period")),
compaction_threshold: Set(DEFAULT_COMPACTION_THRESHOLD),
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
@@ -261,30 +214,6 @@ impl PageServerConfigBuilder {
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
}
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
}
pub fn compaction_target_size(&mut self, compaction_target_size: u64) {
self.compaction_target_size = BuilderValue::Set(compaction_target_size)
}
pub fn compaction_period(&mut self, compaction_period: Duration) {
self.compaction_period = BuilderValue::Set(compaction_period)
}
pub fn compaction_threshold(&mut self, compaction_threshold: usize) {
self.compaction_threshold = BuilderValue::Set(compaction_threshold)
}
pub fn gc_horizon(&mut self, gc_horizon: u64) {
self.gc_horizon = BuilderValue::Set(gc_horizon)
}
pub fn gc_period(&mut self, gc_period: Duration) {
self.gc_period = BuilderValue::Set(gc_period)
}
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
}
@@ -344,22 +273,6 @@ impl PageServerConfigBuilder {
listen_http_addr: self
.listen_http_addr
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
checkpoint_distance: self
.checkpoint_distance
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
compaction_target_size: self
.compaction_target_size
.ok_or(anyhow::anyhow!("missing compaction_target_size"))?,
compaction_period: self
.compaction_period
.ok_or(anyhow::anyhow!("missing compaction_period"))?,
compaction_threshold: self
.compaction_threshold
.ok_or(anyhow::anyhow!("missing compaction_threshold"))?,
gc_horizon: self
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
@@ -386,6 +299,8 @@ impl PageServerConfigBuilder {
.ok_or(anyhow::anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow::anyhow!("missing id"))?,
profiling: self.profiling.ok_or(anyhow::anyhow!("missing profiling"))?,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
})
}
}
@@ -488,20 +403,12 @@ impl PageServerConf {
let mut builder = PageServerConfigBuilder::default();
builder.workdir(workdir.to_owned());
let mut t_conf: TenantConfOpt = Default::default();
for (key, item) in toml.iter() {
match key {
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
"compaction_target_size" => {
builder.compaction_target_size(parse_toml_u64(key, item)?)
}
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
"compaction_threshold" => {
builder.compaction_threshold(parse_toml_u64(key, item)? as usize)
}
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
@@ -519,6 +426,9 @@ impl PageServerConf {
"remote_storage" => {
builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?))
}
"tenant_conf" => {
t_conf = Self::parse_toml_tenant_conf(item)?;
}
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
_ => bail!("unrecognized pageserver option '{}'", key),
@@ -547,9 +457,42 @@ impl PageServerConf {
);
}
conf.default_tenant_conf = t_conf.merge(TenantConf::default());
Ok(conf)
}
// subroutine of parse_and_validate to parse `[tenant_conf]` section
pub fn parse_toml_tenant_conf(item: &toml_edit::Item) -> Result<TenantConfOpt> {
let mut t_conf: TenantConfOpt = Default::default();
for (key, item) in item
.as_table()
.ok_or(anyhow::anyhow!("invalid tenant config"))?
.iter()
{
match key {
"checkpoint_distance" => {
t_conf.checkpoint_distance = Some(parse_toml_u64(key, item)?)
}
"compaction_target_size" => {
t_conf.compaction_target_size = Some(parse_toml_u64(key, item)?)
}
"compaction_period" => {
t_conf.compaction_period = Some(parse_toml_duration(key, item)?)
}
"compaction_threshold" => {
t_conf.compaction_threshold = Some(parse_toml_u64(key, item)? as usize)
}
"gc_horizon" => t_conf.gc_horizon = Some(parse_toml_u64(key, item)?),
"gc_period" => t_conf.gc_period = Some(parse_toml_duration(key, item)?),
"pitr_interval" => t_conf.pitr_interval = Some(parse_toml_duration(key, item)?),
_ => bail!("unrecognized tenant config option '{}'", key),
}
}
Ok(t_conf)
}
/// subroutine of parse_config(), to parse the `[remote_storage]` table.
fn parse_remote_storage_config(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
let local_path = toml.get("local_path");
@@ -635,12 +578,6 @@ impl PageServerConf {
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
PageServerConf {
id: ZNodeId(0),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
@@ -654,6 +591,7 @@ impl PageServerConf {
auth_validation_public_key_path: None,
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::dummy_conf(),
}
}
}
@@ -711,15 +649,6 @@ mod tests {
listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
checkpoint_distance = 111 # in bytes
compaction_target_size = 111 # in bytes
compaction_period = '111 s'
compaction_threshold = 2
gc_period = '222 s'
gc_horizon = 222
wait_lsn_timeout = '111 s'
wal_redo_timeout = '111 s'
@@ -751,12 +680,6 @@ id = 10
id: ZNodeId(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
@@ -768,6 +691,7 @@ id = 10
auth_validation_public_key_path: None,
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -798,12 +722,6 @@ id = 10
id: ZNodeId(10),
listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
checkpoint_distance: 111,
compaction_target_size: 111,
compaction_period: Duration::from_secs(111),
compaction_threshold: 2,
gc_horizon: 222,
gc_period: Duration::from_secs(222),
wait_lsn_timeout: Duration::from_secs(111),
wal_redo_timeout: Duration::from_secs(111),
superuser: "zzzz".to_string(),
@@ -815,6 +733,7 @@ id = 10
auth_validation_public_key_path: None,
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -20,11 +20,18 @@ pub struct TimelineCreateRequest {
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Default)]
pub struct TenantCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<ZTenantId>,
pub checkpoint_distance: Option<u64>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub pitr_interval: Option<String>,
}
#[serde_as]
@@ -36,3 +43,42 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId
pub struct StatusResponse {
pub id: ZNodeId,
}
impl TenantCreateRequest {
pub fn new(new_tenant_id: Option<ZTenantId>) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
..Default::default()
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TenantConfigRequest {
pub tenant_id: ZTenantId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub checkpoint_distance: Option<u64>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub pitr_interval: Option<String>,
}
impl TenantConfigRequest {
pub fn new(tenant_id: ZTenantId) -> TenantConfigRequest {
TenantConfigRequest {
tenant_id,
checkpoint_distance: None,
compaction_target_size: None,
compaction_period: None,
compaction_threshold: None,
gc_horizon: None,
gc_period: None,
pitr_interval: None,
}
}
}

View File

@@ -328,11 +328,7 @@ paths:
content:
application/json:
schema:
type: object
properties:
new_tenant_id:
type: string
format: hex
$ref: "#/components/schemas/TenantCreateInfo"
responses:
"201":
description: New tenant created successfully
@@ -371,7 +367,48 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/config:
put:
description: |
Update tenant's config.
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/TenantConfigInfo"
responses:
"200":
description: OK
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/TenantInfo"
"400":
description: Malformed tenant config request
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
components:
securitySchemes:
JWT:
@@ -389,6 +426,45 @@ components:
type: string
state:
type: string
TenantCreateInfo:
type: object
properties:
new_tenant_id:
type: string
format: hex
tenant_id:
type: string
format: hex
gc_period:
type: string
gc_horizon:
type: integer
pitr_interval:
type: string
checkpoint_distance:
type: integer
compaction_period:
type: string
compaction_threshold:
type: string
TenantConfigInfo:
type: object
properties:
tenant_id:
type: string
format: hex
gc_period:
type: string
gc_horizon:
type: integer
pitr_interval:
type: string
checkpoint_distance:
type: integer
compaction_period:
type: string
compaction_threshold:
type: string
TimelineInfo:
type: object
required:

View File

@@ -6,13 +6,15 @@ use hyper::{Body, Request, Response, Uri};
use tracing::*;
use super::models::{
StatusResponse, TenantCreateRequest, TenantCreateResponse, TimelineCreateRequest,
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse,
TimelineCreateRequest,
};
use crate::config::RemoteStorageKind;
use crate::remote_storage::{
download_index_part, schedule_timeline_download, LocalFs, RemoteIndex, RemoteTimeline, S3Bucket,
};
use crate::repository::Repository;
use crate::tenant_config::TenantConfOpt;
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use crate::{config::PageServerConf, tenant_mgr, timelines};
use utils::{
@@ -375,6 +377,27 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let remote_index = get_state(&request).remote_index.clone();
let mut tenant_conf: TenantConfOpt = Default::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period =
Some(humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval =
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?);
}
let target_tenant_id = request_data
.new_tenant_id
.map(ZTenantId::from)
@@ -382,8 +405,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let new_tenant_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
let conf = get_config(&request);
tenant_mgr::create_tenant_repository(get_config(&request), target_tenant_id, remote_index)
tenant_mgr::create_tenant_repository(conf, tenant_conf, target_tenant_id, remote_index)
})
.await
.map_err(ApiError::from_err)??;
@@ -394,6 +418,44 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
})
}
async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TenantConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
// check for management permission
check_permission(&request, Some(tenant_id))?;
let mut tenant_conf: TenantConfOpt = Default::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period =
Some(humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval =
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period =
Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?);
}
tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_config", tenant = ?tenant_id).entered();
tenant_mgr::update_tenant_config(tenant_conf, tenant_id)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, ())?)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -426,6 +488,7 @@ pub fn make_router(
.get("/v1/status", status_handler)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
.put("/v1/tenant/config", tenant_config_handler)
.get("/v1/tenant/:tenant_id/timeline", timeline_list_handler)
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
.get(

View File

@@ -29,11 +29,13 @@ use std::ops::{Bound::Included, Deref, Range};
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::Instant;
use std::time::{Duration, Instant, SystemTime};
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config::PageServerConf;
use crate::keyspace::KeySpace;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::page_cache;
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
use crate::repository::{
@@ -51,6 +53,7 @@ use metrics::{
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
};
use toml_edit;
use utils::{
crashsafe_dir,
lsn::{AtomicLsn, Lsn, RecordLsn},
@@ -149,7 +152,15 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
/// Repository consists of multiple timelines. Keep them in a hash table.
///
pub struct LayeredRepository {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
// Overridden tenant-specific config parameters.
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
// This is necessary to allow global config updates.
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
// This mutex prevents creation of new timelines during GC.
@@ -219,6 +230,7 @@ impl Repository for LayeredRepository {
let timeline = LayeredTimeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
metadata,
None,
timelineid,
@@ -302,6 +314,7 @@ impl Repository for LayeredRepository {
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
let timeline_str = target_timelineid
@@ -311,7 +324,7 @@ impl Repository for LayeredRepository {
STORAGE_TIME
.with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
})
}
@@ -480,6 +493,64 @@ impl From<LayeredTimelineEntry> for RepositoryTimeline<LayeredTimeline> {
/// Private functions
impl LayeredRepository {
pub fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
pub fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
pub fn get_compaction_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_period
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
}
pub fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_gc_horizon(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.gc_horizon
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
}
pub fn get_gc_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.gc_period
.unwrap_or(self.conf.default_tenant_conf.gc_period)
}
pub fn get_pitr_interval(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.pitr_interval
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> {
let mut tenant_conf = self.tenant_conf.write().unwrap();
tenant_conf.update(&new_tenant_conf);
LayeredRepository::persist_tenant_config(self.conf, self.tenantid, *tenant_conf)?;
Ok(())
}
// Implementation of the public `get_timeline` function.
// Differences from the public:
// * interface in that the caller must already hold the mutex on the 'timelines' hashmap.
@@ -553,8 +624,10 @@ impl LayeredRepository {
.flatten()
.map(LayeredTimelineEntry::Loaded);
let _enter = info_span!("loading local timeline").entered();
let timeline = LayeredTimeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
metadata,
ancestor,
timelineid,
@@ -571,6 +644,7 @@ impl LayeredRepository {
pub fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenantid: ZTenantId,
remote_index: RemoteIndex,
@@ -579,6 +653,7 @@ impl LayeredRepository {
LayeredRepository {
tenantid,
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
@@ -587,6 +662,71 @@ impl LayeredRepository {
}
}
/// Locate and load config
pub fn load_tenant_config(
conf: &'static PageServerConf,
tenantid: ZTenantId,
) -> anyhow::Result<TenantConfOpt> {
let target_config_path = TenantConf::path(conf, tenantid);
info!("load tenantconf from {}", target_config_path.display());
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
if !target_config_path.exists() {
info!(
"Zenith tenant config is not found in {}",
target_config_path.display()
);
return Ok(Default::default());
}
// load and parse file
let config = fs::read_to_string(target_config_path)?;
let toml = config.parse::<toml_edit::Document>()?;
let mut tenant_conf: TenantConfOpt = Default::default();
for (key, item) in toml.iter() {
match key {
"tenant_conf" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item)?;
}
_ => bail!("unrecognized pageserver option '{}'", key),
}
}
Ok(tenant_conf)
}
pub fn persist_tenant_config(
conf: &'static PageServerConf,
tenantid: ZTenantId,
tenant_conf: TenantConfOpt,
) -> anyhow::Result<()> {
let _enter = info_span!("saving tenantconf").entered();
let target_config_path = TenantConf::path(conf, tenantid);
info!("save tenantconf to {}", target_config_path.display());
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
# [tenant_config]
"#
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
fs::write(&target_config_path, conf_content).with_context(|| {
format!(
"Failed to write config file into path '{}'",
target_config_path.display()
)
})
}
/// Save timeline metadata to file
fn save_metadata(
conf: &'static PageServerConf,
@@ -662,6 +802,7 @@ impl LayeredRepository {
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
let _span_guard =
@@ -738,7 +879,7 @@ impl LayeredRepository {
timeline.checkpoint(CheckpointConfig::Forced)?;
info!("timeline {} checkpoint_before_gc done", timelineid);
}
timeline.update_gc_info(branchpoints, cutoff);
timeline.update_gc_info(branchpoints, cutoff, pitr);
let result = timeline.gc()?;
totals += result;
@@ -753,6 +894,7 @@ impl LayeredRepository {
pub struct LayeredTimeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenantid: ZTenantId,
timelineid: ZTimelineId,
@@ -857,6 +999,11 @@ struct GcInfo {
///
/// FIXME: is this inclusive or exclusive?
cutoff: Lsn,
/// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()'
/// minus 'pitr_interval'
///
pitr: Duration,
}
/// Public interface functions
@@ -987,12 +1134,34 @@ impl Timeline for LayeredTimeline {
}
impl LayeredTimeline {
fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
#[allow(clippy::too_many_arguments)]
fn new(
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
metadata: TimelineMetadata,
ancestor: Option<LayeredTimelineEntry>,
timelineid: ZTimelineId,
@@ -1036,6 +1205,7 @@ impl LayeredTimeline {
LayeredTimeline {
conf,
tenant_conf,
timelineid,
tenantid,
layers: RwLock::new(LayerMap::default()),
@@ -1071,6 +1241,7 @@ impl LayeredTimeline {
gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(),
cutoff: Lsn(0),
pitr: Duration::ZERO,
}),
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
@@ -1431,7 +1602,7 @@ impl LayeredTimeline {
let last_lsn = self.get_last_record_lsn();
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
if distance >= self.conf.checkpoint_distance.into() {
if distance >= self.get_checkpoint_distance().into() {
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
}
@@ -1640,13 +1811,15 @@ impl LayeredTimeline {
// above. Rewrite it.
let _compaction_cs = self.compaction_cs.lock().unwrap();
let target_file_size = self.conf.checkpoint_distance;
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
{
let (partitioning, lsn) =
pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?;
let (partitioning, lsn) = pgdir.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
)?;
let timer = self.create_images_time_histo.start_timer();
// 2. Create new image layers for partitions that have been modified
// "enough".
@@ -1747,7 +1920,7 @@ impl LayeredTimeline {
// We compact or "shuffle" the level-0 delta layers when they've
// accumulated over the compaction threshold.
if level0_deltas.len() < self.conf.compaction_threshold {
if level0_deltas.len() < self.get_compaction_threshold() {
return Ok(());
}
drop(layers);
@@ -1870,10 +2043,11 @@ impl LayeredTimeline {
/// the latest LSN subtracted by a constant, and doesn't do anything smart
/// to figure out what read-only nodes might actually need.)
///
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn) {
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn, pitr: Duration) {
let mut gc_info = self.gc_info.write().unwrap();
gc_info.retain_lsns = retain_lsns;
gc_info.cutoff = cutoff;
gc_info.pitr = pitr;
}
///
@@ -1884,7 +2058,7 @@ impl LayeredTimeline {
/// obsolete.
///
fn gc(&self) -> Result<GcResult> {
let now = Instant::now();
let now = SystemTime::now();
let mut result: GcResult = Default::default();
let disk_consistent_lsn = self.get_disk_consistent_lsn();
@@ -1893,6 +2067,7 @@ impl LayeredTimeline {
let gc_info = self.gc_info.read().unwrap();
let retain_lsns = &gc_info.retain_lsns;
let cutoff = gc_info.cutoff;
let pitr = gc_info.pitr;
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
@@ -1910,8 +2085,9 @@ impl LayeredTimeline {
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk image layers cover the layer's whole key range
// 2. it is older than PITR interval;
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
//
let mut layers = self.layers.write().unwrap();
'outer: for l in layers.iter_historic_layers() {
@@ -1937,8 +2113,31 @@ impl LayeredTimeline {
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// 2. Is it needed by a child branch?
// 2. It is newer than PiTR interval?
// We use modification time of layer file to estimate update time.
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
// It is not expected that users will need high precision here. And this estimation
// is conservative: modification time of file is always newer than actual time of version
// creation. So it is safe for users.
// TODO A possible "bloat" issue still persists here.
// If modification time changes because of layer upload/download, we will keep these files
// longer than necessary.
// https://github.com/neondatabase/neon/issues/1554
//
if let Ok(metadata) = fs::metadata(&l.filename()) {
let last_modified = metadata.modified()?;
if now.duration_since(last_modified)? < pitr {
debug!(
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
l.filename().display(),
last_modified,
pitr
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
}
// 3. Is it needed by a child branch?
// NOTE With that wee would keep data that
// might be referenced by child branches forever.
// We can track this in child timeline GC and delete parent layers when
@@ -1957,7 +2156,7 @@ impl LayeredTimeline {
}
}
// 3. Is there a later on-disk layer for this relation?
// 4. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -1998,7 +2197,7 @@ impl LayeredTimeline {
result.layers_removed += 1;
}
result.elapsed = now.elapsed();
result.elapsed = now.elapsed()?;
Ok(result)
}
@@ -2275,7 +2474,8 @@ pub mod tests {
}
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
@@ -2345,7 +2545,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
@@ -2422,7 +2622,7 @@ pub mod tests {
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff);
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;

View File

@@ -11,6 +11,7 @@ pub mod profiling;
pub mod reltag;
pub mod remote_storage;
pub mod repository;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;

View File

@@ -19,6 +19,7 @@ use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
use tracing::*;
use utils::{
auth::{self, Claims, JwtAuth, Scope},
@@ -676,6 +677,37 @@ impl postgres_backend::Handler for PageServerHandler {
}
}
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
ensure!(params.len() == 1, "invalid param number for config command");
let tenantid = ZTenantId::from_str(params[0])?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"compaction_target_size"),
RowDescriptor::int8_col(b"compaction_period"),
RowDescriptor::int8_col(b"compaction_threshold"),
RowDescriptor::int8_col(b"gc_horizon"),
RowDescriptor::int8_col(b"gc_period"),
RowDescriptor::int8_col(b"pitr_interval"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(repo.get_checkpoint_distance().to_string().as_bytes()),
Some(repo.get_compaction_target_size().to_string().as_bytes()),
Some(
repo.get_compaction_period()
.as_secs()
.to_string()
.as_bytes(),
),
Some(repo.get_compaction_threshold().to_string().as_bytes()),
Some(repo.get_gc_horizon().to_string().as_bytes()),
Some(repo.get_gc_period().as_secs().to_string().as_bytes()),
Some(repo.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") {
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.
@@ -693,16 +725,20 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon: u64 = caps
.get(4)
.map(|h| h.as_str().parse())
.unwrap_or(Ok(self.conf.gc_horizon))?;
.unwrap_or_else(|| Ok(repo.get_gc_horizon()))?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
RowDescriptor::int8_col(b"layers_needed_by_branches"),
RowDescriptor::int8_col(b"layers_not_updated"),
RowDescriptor::int8_col(b"layers_removed"),
@@ -711,6 +747,7 @@ impl postgres_backend::Handler for PageServerHandler {
.write_message_noflush(&BeMessage::DataRow(&[
Some(result.layers_total.to_string().as_bytes()),
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
Some(result.layers_needed_by_branches.to_string().as_bytes()),
Some(result.layers_not_updated.to_string().as_bytes()),
Some(result.layers_removed.to_string().as_bytes()),

View File

@@ -249,6 +249,7 @@ pub trait Repository: Send + Sync {
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
@@ -305,6 +306,7 @@ impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
pub struct GcResult {
pub layers_total: u64,
pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64,
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
@@ -315,6 +317,7 @@ pub struct GcResult {
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.layers_total += other.layers_total;
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_not_updated += other.layers_not_updated;
@@ -432,6 +435,7 @@ pub mod repo_harness {
};
use super::*;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use hex_literal::hex;
use utils::zid::ZTenantId;
@@ -454,8 +458,23 @@ pub mod repo_harness {
static ref LOCK: RwLock<()> = RwLock::new(());
}
impl From<TenantConf> for TenantConfOpt {
fn from(tenant_conf: TenantConf) -> Self {
Self {
checkpoint_distance: Some(tenant_conf.checkpoint_distance),
compaction_target_size: Some(tenant_conf.compaction_target_size),
compaction_period: Some(tenant_conf.compaction_period),
compaction_threshold: Some(tenant_conf.compaction_threshold),
gc_horizon: Some(tenant_conf.gc_horizon),
gc_period: Some(tenant_conf.gc_period),
pitr_interval: Some(tenant_conf.pitr_interval),
}
}
}
pub struct RepoHarness<'a> {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_id: ZTenantId,
pub lock_guard: (
@@ -487,12 +506,15 @@ pub mod repo_harness {
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let tenant_conf = TenantConf::dummy_conf();
let tenant_id = ZTenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
fs::create_dir_all(conf.timelines_path(&tenant_id))?;
Ok(Self {
conf,
tenant_conf,
tenant_id,
lock_guard,
})
@@ -507,6 +529,7 @@ pub mod repo_harness {
let repo = LayeredRepository::new(
self.conf,
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,
self.tenant_id,
RemoteIndex::empty(),
@@ -722,7 +745,7 @@ mod tests {
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
@@ -773,7 +796,7 @@ mod tests {
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
match tline.get(*TEST_KEY, Lsn(0x25)) {
@@ -796,7 +819,7 @@ mod tests {
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(())
@@ -815,7 +838,7 @@ mod tests {
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
// run gc on parent
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// Check that the data is still accessible on the branch.
assert_eq!(

View File

@@ -0,0 +1,162 @@
//! Functions for handling per-tenant configuration options
//!
//! If tenant is created with --config option,
//! the tenant-specific config will be stored in tenant's directory.
//! Otherwise, global pageserver's config is used.
//!
//! If the tenant config file is corrupted, the tenant will be disabled.
//! We cannot use global or default config instead, because wrong settings
//! may lead to a data loss.
//!
use crate::config::PageServerConf;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
use utils::zid::ZTenantId;
pub const TENANT_CONFIG_NAME: &str = "config";
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
}
/// Per-tenant configuration options
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
pub compaction_period: Duration,
// Level0 delta layer threshold for compaction.
pub compaction_threshold: usize,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
// Page versions older than this are garbage collected away.
pub gc_horizon: u64,
// Interval at which garbage collection is triggered.
pub gc_period: Duration,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is time.
// Page versions older than this are garbage collected away.
pub pitr_interval: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
/// which parameters are set and which are not.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct TenantConfOpt {
pub checkpoint_distance: Option<u64>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<Duration>,
pub compaction_threshold: Option<usize>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<Duration>,
pub pitr_interval: Option<Duration>,
}
impl TenantConfOpt {
pub fn merge(&self, global_conf: TenantConf) -> TenantConf {
TenantConf {
checkpoint_distance: self
.checkpoint_distance
.unwrap_or(global_conf.checkpoint_distance),
compaction_target_size: self
.compaction_target_size
.unwrap_or(global_conf.compaction_target_size),
compaction_period: self
.compaction_period
.unwrap_or(global_conf.compaction_period),
compaction_threshold: self
.compaction_threshold
.unwrap_or(global_conf.compaction_threshold),
gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
}
}
pub fn update(&mut self, other: &TenantConfOpt) {
if let Some(checkpoint_distance) = other.checkpoint_distance {
self.checkpoint_distance = Some(checkpoint_distance);
}
if let Some(compaction_target_size) = other.compaction_target_size {
self.compaction_target_size = Some(compaction_target_size);
}
if let Some(compaction_period) = other.compaction_period {
self.compaction_period = Some(compaction_period);
}
if let Some(compaction_threshold) = other.compaction_threshold {
self.compaction_threshold = Some(compaction_threshold);
}
if let Some(gc_horizon) = other.gc_horizon {
self.gc_horizon = Some(gc_horizon);
}
if let Some(gc_period) = other.gc_period {
self.gc_period = Some(gc_period);
}
if let Some(pitr_interval) = other.pitr_interval {
self.pitr_interval = Some(pitr_interval);
}
}
}
impl TenantConf {
pub fn default() -> TenantConf {
use defaults::*;
TenantConf {
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
}
}
/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
pub fn path(conf: &'static PageServerConf, tenantid: ZTenantId) -> PathBuf {
conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME)
}
#[cfg(test)]
pub fn dummy_conf() -> Self {
TenantConf {
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
pitr_interval: Duration::from_secs(60 * 60),
}
}
}

View File

@@ -5,6 +5,7 @@ use crate::config::PageServerConf;
use crate::layered_repository::LayeredRepository;
use crate::remote_storage::RemoteIndex;
use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
@@ -63,13 +64,13 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
TENANTS.lock().unwrap()
}
// Sets up wal redo manager and repository for tenant. Reduces code duplocation.
// Sets up wal redo manager and repository for tenant. Reduces code duplication.
// Used during pageserver startup, or when new tenant is attached to pageserver.
pub fn load_local_repo(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> Arc<RepositoryImpl> {
) -> Result<Arc<RepositoryImpl>> {
let mut m = access_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| {
// Set up a WAL redo manager, for applying WAL records.
@@ -78,6 +79,7 @@ pub fn load_local_repo(
// Set up an object repository, for actual data storage.
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
conf,
Default::default(),
Arc::new(walredo_mgr),
tenant_id,
remote_index.clone(),
@@ -89,7 +91,12 @@ pub fn load_local_repo(
timelines: HashMap::new(),
}
});
Arc::clone(&tenant.repo)
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
tenant.repo.update_tenant_config(tenant_conf)?;
Ok(Arc::clone(&tenant.repo))
}
/// Updates tenants' repositories, changing their timelines state in memory.
@@ -109,7 +116,16 @@ pub fn apply_timeline_sync_status_updates(
trace!("Sync status updates: {:?}", sync_status_updates);
for (tenant_id, tenant_timelines_sync_status_updates) in sync_status_updates {
let repo = load_local_repo(conf, tenant_id, remote_index);
let repo = match load_local_repo(conf, tenant_id, remote_index) {
Ok(repo) => repo,
Err(e) => {
error!(
"Failed to load repo for tenant {} Error: {:#}",
tenant_id, e
);
continue;
}
};
for (timeline_id, timeline_sync_status_update) in tenant_timelines_sync_status_updates {
match repo.apply_timeline_remote_sync_status_update(timeline_id, timeline_sync_status_update)
@@ -174,6 +190,7 @@ pub fn shutdown_all_tenants() {
pub fn create_tenant_repository(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenantid: ZTenantId,
remote_index: RemoteIndex,
) -> Result<Option<ZTenantId>> {
@@ -186,6 +203,7 @@ pub fn create_tenant_repository(
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = timelines::create_repo(
conf,
tenant_conf,
tenantid,
CreateRepo::Real {
wal_redo_manager,
@@ -202,6 +220,14 @@ pub fn create_tenant_repository(
}
}
pub fn update_tenant_config(tenant_conf: TenantConfOpt, tenantid: ZTenantId) -> Result<()> {
info!("configuring tenant {}", tenantid);
let repo = get_repository_for_tenant(tenantid)?;
repo.update_tenant_config(tenant_conf)?;
Ok(())
}
pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
Some(access_tenants().get(&tenantid)?.state)
}
@@ -210,7 +236,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
/// Change the state of a tenant to Active and launch its compactor and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> {
pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
let mut m = access_tenants();
let tenant = m
.get_mut(&tenant_id)
@@ -230,7 +256,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None,
"Compactor thread",
true,
move || crate::tenant_threads::compact_loop(tenant_id, conf),
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
let gc_spawn_result = thread_mgr::spawn(
@@ -239,7 +265,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
None,
"GC thread",
true,
move || crate::tenant_threads::gc_loop(tenant_id, conf),
move || crate::tenant_threads::gc_loop(tenant_id),
)
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
@@ -251,7 +277,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
tenant.state = TenantState::Active;
}
@@ -290,7 +315,7 @@ pub fn get_timeline_for_tenant_load(
.get_timeline_load(timelineid)
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?;
let repartition_distance = tenant.repo.conf.checkpoint_distance / 10;
let repartition_distance = tenant.repo.get_checkpoint_distance() / 10;
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
page_tline.init_logical_size()?;

View File

@@ -1,6 +1,5 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::config::PageServerConf;
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
@@ -12,8 +11,8 @@ use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid, conf) {
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
@@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
}
}
fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(conf.compaction_period);
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
@@ -46,23 +47,23 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.gc_iteration(None, conf.gc_horizon, false)?;
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;

View File

@@ -25,6 +25,7 @@ use crate::{
layered_repository::metadata::TimelineMetadata,
remote_storage::RemoteIndex,
repository::{LocalTimelineState, Repository},
tenant_config::TenantConfOpt,
DatadirTimeline, RepositoryImpl,
};
use crate::{import_datadir, LOG_FILE_NAME};
@@ -151,8 +152,8 @@ pub fn init_pageserver(
if let Some(tenant_id) = create_tenant {
println!("initializing tenantid {}", tenant_id);
let repo =
create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?;
let repo = create_repo(conf, Default::default(), tenant_id, CreateRepo::Dummy)
.context("failed to create repo")?;
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
.context("failed to create initial timeline")?;
@@ -175,6 +176,7 @@ pub enum CreateRepo {
pub fn create_repo(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: ZTenantId,
create_repo: CreateRepo,
) -> Result<Arc<RepositoryImpl>> {
@@ -211,8 +213,12 @@ pub fn create_repo(
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
info!("created directory structure in {}", repo_dir.display());
// Save tenant's config
LayeredRepository::persist_tenant_config(conf, tenant_id, tenant_conf)?;
Ok(Arc::new(LayeredRepository::new(
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_index,

View File

@@ -93,7 +93,7 @@ pub fn launch_wal_receiver(
receivers.insert((tenantid, timelineid), receiver);
// Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::activate_tenant(conf, tenantid)?;
tenant_mgr::activate_tenant(tenantid)?;
}
};
Ok(())

View File

@@ -0,0 +1,49 @@
from contextlib import closing
import pytest
from fixtures.zenith_fixtures import ZenithEnvBuilder
def test_tenant_config(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init_start()
"""Test per tenant configuration"""
tenant = env.zenith_cli.create_tenant(
conf={
'checkpoint_distance': '10000',
'compaction_target_size': '1048576',
'compaction_period': '60sec',
'compaction_threshold': '20',
'gc_horizon': '1024',
'gc_period': '100sec',
'pitr_interval': '3600sec',
})
env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant)
pg = env.postgres.create_start(
"test_tenant_conf",
"main",
tenant,
)
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"show {tenant.hex}")
assert pscur.fetchone() == (10000, 1048576, 60, 20, 1024, 100, 3600)
# update the config and ensure that it has changed
env.zenith_cli.config_tenant(tenant_id=tenant,
conf={
'checkpoint_distance': '100000',
'compaction_target_size': '1048576',
'compaction_period': '30sec',
'compaction_threshold': '15',
'gc_horizon': '256',
'gc_period': '10sec',
'pitr_interval': '360sec',
})
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"show {tenant.hex}")
assert pscur.fetchone() == (100000, 1048576, 30, 15, 256, 10, 360)

View File

@@ -835,16 +835,35 @@ class ZenithCli:
self.env = env
pass
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
def create_tenant(self,
tenant_id: Optional[uuid.UUID] = None,
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
"""
Creates a new tenant, returns its id and its initial timeline's id.
"""
if tenant_id is None:
tenant_id = uuid.uuid4()
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
if conf is None:
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
else:
res = self.raw_cli(
['tenant', 'create', '--tenant-id', tenant_id.hex] +
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res.check_returncode()
return tenant_id
def config_tenant(self, tenant_id: uuid.UUID, conf: Dict[str, str]):
"""
Update tenant config.
"""
if conf is None:
res = self.raw_cli(['tenant', 'config', '--tenant-id', tenant_id.hex])
else:
res = self.raw_cli(
['tenant', 'config', '--tenant-id', tenant_id.hex] +
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res.check_returncode()
def list_tenants(self) -> 'subprocess.CompletedProcess[str]':
res = self.raw_cli(['tenant', 'list'])
res.check_returncode()

View File

@@ -166,7 +166,12 @@ fn main() -> Result<()> {
.subcommand(App::new("create")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
)
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
.subcommand(App::new("config")
.arg(tenant_id_arg.clone())
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
)
.subcommand(
App::new("pageserver")
@@ -523,8 +528,12 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
}
Some(("create", create_match)) => {
let initial_tenant_id = parse_tenant_id(create_match)?;
let tenant_conf: HashMap<_, _> = create_match
.values_of("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let new_tenant_id = pageserver
.tenant_create(initial_tenant_id)?
.tenant_create(initial_tenant_id, tenant_conf)?
.ok_or_else(|| {
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
})?;
@@ -533,6 +542,27 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
new_tenant_id
);
}
Some(("config", create_match)) => {
let tenant_id = get_tenant_id(create_match, env)?;
let tenant_conf: HashMap<_, _> = create_match
.values_of("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
pageserver
.tenant_config(tenant_id, tenant_conf)
.unwrap_or_else(|e| {
anyhow!(
"Tenant config failed for tenant with id {} : {}",
tenant_id,
e
);
});
println!(
"tenant {} successfully configured on the pageserver",
tenant_id
);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
None => bail!("no tenant subcommand provided"),
}