diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index a01ffd30f6..7520ad9304 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::io::Write; use std::net::TcpStream; use std::path::PathBuf; @@ -9,7 +10,7 @@ use anyhow::{bail, Context}; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; -use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest}; +use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest}; use pageserver::timelines::TimelineInfo; use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -344,10 +345,32 @@ impl PageServerNode { pub fn tenant_create( &self, new_tenant_id: Option, + settings: HashMap<&str, &str>, ) -> anyhow::Result> { let tenant_id_string = self .http_request(Method::POST, format!("{}/tenant", self.http_base_url)) - .json(&TenantCreateRequest { new_tenant_id }) + .json(&TenantCreateRequest { + new_tenant_id, + checkpoint_distance: settings + .get("checkpoint_distance") + .map(|x| x.parse::()) + .transpose()?, + compaction_target_size: settings + .get("compaction_target_size") + .map(|x| x.parse::()) + .transpose()?, + compaction_period: settings.get("compaction_period").map(|x| x.to_string()), + compaction_threshold: settings + .get("compaction_threshold") + .map(|x| x.parse::()) + .transpose()?, + gc_horizon: settings + .get("gc_horizon") + .map(|x| x.parse::()) + .transpose()?, + gc_period: settings.get("gc_period").map(|x| x.to_string()), + pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()), + }) .send()? .error_from_body()? .json::>()?; @@ -364,6 +387,32 @@ impl PageServerNode { .transpose() } + pub fn tenant_config(&self, tenant_id: ZTenantId, settings: HashMap<&str, &str>) -> Result<()> { + self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url)) + .json(&TenantConfigRequest { + tenant_id, + checkpoint_distance: settings + .get("checkpoint_distance") + .map(|x| x.parse::().unwrap()), + compaction_target_size: settings + .get("compaction_target_size") + .map(|x| x.parse::().unwrap()), + compaction_period: settings.get("compaction_period").map(|x| x.to_string()), + compaction_threshold: settings + .get("compaction_threshold") + .map(|x| x.parse::().unwrap()), + gc_horizon: settings + .get("gc_horizon") + .map(|x| x.parse::().unwrap()), + gc_period: settings.get("gc_period").map(|x| x.to_string()), + pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()), + }) + .send()? + .error_from_body()?; + + Ok(()) + } + pub fn timeline_list(&self, tenant_id: &ZTenantId) -> anyhow::Result> { let timeline_infos: Vec = self .http_request( diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9b944cc2ec..5c135e4eb4 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -246,11 +246,12 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses { // initialize local tenant - let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index); + let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index) + .with_context(|| format!("Failed to load repo for tenant {}", tenant_id))?; for (timeline_id, init_status) in local_timeline_init_statuses { match init_status { remote_storage::LocalTimelineInitStatus::LocallyComplete => { - debug!("timeline {} for tenant {} is locally complete, registering it in repository", tenant_id, timeline_id); + debug!("timeline {} for tenant {} is locally complete, registering it in repository", timeline_id, tenant_id); // Lets fail here loudly to be on the safe side. // XXX: It may be a better api to actually distinguish between repository startup // and processing of newly downloaded timelines. diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 24ab45386d..b2c4a62796 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -5,6 +5,12 @@ //! See also `settings.md` for better description on every parameter. use anyhow::{bail, ensure, Context, Result}; +use std::convert::TryInto; +use std::env; +use std::num::{NonZeroU32, NonZeroUsize}; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; use toml_edit; use toml_edit::{Document, Item}; use utils::{ @@ -12,16 +18,11 @@ use utils::{ zid::{ZNodeId, ZTenantId, ZTimelineId}, }; -use std::convert::TryInto; -use std::env; -use std::num::{NonZeroU32, NonZeroUsize}; -use std::path::{Path, PathBuf}; -use std::str::FromStr; -use std::time::Duration; - use crate::layered_repository::TIMELINES_SEGMENT_NAME; +use crate::tenant_config::{TenantConf, TenantConfOpt}; pub mod defaults { + use crate::tenant_config::defaults::*; use const_format::formatcp; pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; @@ -29,21 +30,6 @@ pub mod defaults { pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); - // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB - // would be more appropriate. But a low value forces the code to be exercised more, - // which is good for now to trigger bugs. - // This parameter actually determines L0 layer file size. - pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; - - // Target file size, when creating image and delta layers. - // This parameter determines L1 layer file size. - pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024; - pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s"; - pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10; - - pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; - pub const DEFAULT_GC_PERIOD: &str = "100 s"; - pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; @@ -64,14 +50,6 @@ pub mod defaults { #listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}' #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}' -#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes -#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes -#compaction_period = '{DEFAULT_COMPACTION_PERIOD}' -#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}' - -#gc_period = '{DEFAULT_GC_PERIOD}' -#gc_horizon = {DEFAULT_GC_HORIZON} - #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}' #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}' @@ -80,6 +58,16 @@ pub mod defaults { # initial superuser role name to use when creating a new tenant #initial_superuser_name = '{DEFAULT_SUPERUSER}' +# [tenant_config] +#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes +#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes +#compaction_period = '{DEFAULT_COMPACTION_PERIOD}' +#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}' + +#gc_period = '{DEFAULT_GC_PERIOD}' +#gc_horizon = {DEFAULT_GC_HORIZON} +#pitr_interval = '{DEFAULT_PITR_INTERVAL}' + # [remote_storage] "### @@ -97,25 +85,6 @@ pub struct PageServerConf { /// Example (default): 127.0.0.1:9898 pub listen_http_addr: String, - // Flush out an inmemory layer, if it's holding WAL older than this - // This puts a backstop on how much WAL needs to be re-digested if the - // page server crashes. - // This parameter actually determines L0 layer file size. - pub checkpoint_distance: u64, - - // Target file size, when creating image and delta layers. - // This parameter determines L1 layer file size. - pub compaction_target_size: u64, - - // How often to check if there's compaction work to be done. - pub compaction_period: Duration, - - // Level0 delta layer threshold for compaction. - pub compaction_threshold: usize, - - pub gc_horizon: u64, - pub gc_period: Duration, - // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. pub wait_lsn_timeout: Duration, // How long to wait for WAL redo to complete. @@ -142,6 +111,7 @@ pub struct PageServerConf { pub remote_storage_config: Option, pub profiling: ProfilingConfig, + pub default_tenant_conf: TenantConf, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -185,15 +155,6 @@ struct PageServerConfigBuilder { listen_http_addr: BuilderValue, - checkpoint_distance: BuilderValue, - - compaction_target_size: BuilderValue, - compaction_period: BuilderValue, - compaction_threshold: BuilderValue, - - gc_horizon: BuilderValue, - gc_period: BuilderValue, - wait_lsn_timeout: BuilderValue, wal_redo_timeout: BuilderValue, @@ -224,14 +185,6 @@ impl Default for PageServerConfigBuilder { Self { listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()), listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()), - checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE), - compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE), - compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD) - .expect("cannot parse default compaction period")), - compaction_threshold: Set(DEFAULT_COMPACTION_THRESHOLD), - gc_horizon: Set(DEFAULT_GC_HORIZON), - gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD) - .expect("cannot parse default gc period")), wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT) .expect("cannot parse default wait lsn timeout")), wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT) @@ -261,30 +214,6 @@ impl PageServerConfigBuilder { self.listen_http_addr = BuilderValue::Set(listen_http_addr) } - pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) { - self.checkpoint_distance = BuilderValue::Set(checkpoint_distance) - } - - pub fn compaction_target_size(&mut self, compaction_target_size: u64) { - self.compaction_target_size = BuilderValue::Set(compaction_target_size) - } - - pub fn compaction_period(&mut self, compaction_period: Duration) { - self.compaction_period = BuilderValue::Set(compaction_period) - } - - pub fn compaction_threshold(&mut self, compaction_threshold: usize) { - self.compaction_threshold = BuilderValue::Set(compaction_threshold) - } - - pub fn gc_horizon(&mut self, gc_horizon: u64) { - self.gc_horizon = BuilderValue::Set(gc_horizon) - } - - pub fn gc_period(&mut self, gc_period: Duration) { - self.gc_period = BuilderValue::Set(gc_period) - } - pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) { self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout) } @@ -344,22 +273,6 @@ impl PageServerConfigBuilder { listen_http_addr: self .listen_http_addr .ok_or(anyhow::anyhow!("missing listen_http_addr"))?, - checkpoint_distance: self - .checkpoint_distance - .ok_or(anyhow::anyhow!("missing checkpoint_distance"))?, - compaction_target_size: self - .compaction_target_size - .ok_or(anyhow::anyhow!("missing compaction_target_size"))?, - compaction_period: self - .compaction_period - .ok_or(anyhow::anyhow!("missing compaction_period"))?, - compaction_threshold: self - .compaction_threshold - .ok_or(anyhow::anyhow!("missing compaction_threshold"))?, - gc_horizon: self - .gc_horizon - .ok_or(anyhow::anyhow!("missing gc_horizon"))?, - gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?, wait_lsn_timeout: self .wait_lsn_timeout .ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?, @@ -386,6 +299,8 @@ impl PageServerConfigBuilder { .ok_or(anyhow::anyhow!("missing remote_storage_config"))?, id: self.id.ok_or(anyhow::anyhow!("missing id"))?, profiling: self.profiling.ok_or(anyhow::anyhow!("missing profiling"))?, + // TenantConf is handled separately + default_tenant_conf: TenantConf::default(), }) } } @@ -488,20 +403,12 @@ impl PageServerConf { let mut builder = PageServerConfigBuilder::default(); builder.workdir(workdir.to_owned()); + let mut t_conf: TenantConfOpt = Default::default(); + for (key, item) in toml.iter() { match key { "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?), "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?), - "checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?), - "compaction_target_size" => { - builder.compaction_target_size(parse_toml_u64(key, item)?) - } - "compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?), - "compaction_threshold" => { - builder.compaction_threshold(parse_toml_u64(key, item)? as usize) - } - "gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?), - "gc_period" => builder.gc_period(parse_toml_duration(key, item)?), "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?), "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?), "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?), @@ -519,6 +426,9 @@ impl PageServerConf { "remote_storage" => { builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?)) } + "tenant_conf" => { + t_conf = Self::parse_toml_tenant_conf(item)?; + } "id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)), "profiling" => builder.profiling(parse_toml_from_str(key, item)?), _ => bail!("unrecognized pageserver option '{}'", key), @@ -547,9 +457,42 @@ impl PageServerConf { ); } + conf.default_tenant_conf = t_conf.merge(TenantConf::default()); + Ok(conf) } + // subroutine of parse_and_validate to parse `[tenant_conf]` section + + pub fn parse_toml_tenant_conf(item: &toml_edit::Item) -> Result { + let mut t_conf: TenantConfOpt = Default::default(); + for (key, item) in item + .as_table() + .ok_or(anyhow::anyhow!("invalid tenant config"))? + .iter() + { + match key { + "checkpoint_distance" => { + t_conf.checkpoint_distance = Some(parse_toml_u64(key, item)?) + } + "compaction_target_size" => { + t_conf.compaction_target_size = Some(parse_toml_u64(key, item)?) + } + "compaction_period" => { + t_conf.compaction_period = Some(parse_toml_duration(key, item)?) + } + "compaction_threshold" => { + t_conf.compaction_threshold = Some(parse_toml_u64(key, item)? as usize) + } + "gc_horizon" => t_conf.gc_horizon = Some(parse_toml_u64(key, item)?), + "gc_period" => t_conf.gc_period = Some(parse_toml_duration(key, item)?), + "pitr_interval" => t_conf.pitr_interval = Some(parse_toml_duration(key, item)?), + _ => bail!("unrecognized tenant config option '{}'", key), + } + } + Ok(t_conf) + } + /// subroutine of parse_config(), to parse the `[remote_storage]` table. fn parse_remote_storage_config(toml: &toml_edit::Item) -> anyhow::Result { let local_path = toml.get("local_path"); @@ -635,12 +578,6 @@ impl PageServerConf { pub fn dummy_conf(repo_dir: PathBuf) -> Self { PageServerConf { id: ZNodeId(0), - checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, - compaction_target_size: 4 * 1024 * 1024, - compaction_period: Duration::from_secs(10), - compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD, - gc_horizon: defaults::DEFAULT_GC_HORIZON, - gc_period: Duration::from_secs(10), wait_lsn_timeout: Duration::from_secs(60), wal_redo_timeout: Duration::from_secs(60), page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE, @@ -654,6 +591,7 @@ impl PageServerConf { auth_validation_public_key_path: None, remote_storage_config: None, profiling: ProfilingConfig::Disabled, + default_tenant_conf: TenantConf::dummy_conf(), } } } @@ -711,15 +649,6 @@ mod tests { listen_pg_addr = '127.0.0.1:64000' listen_http_addr = '127.0.0.1:9898' -checkpoint_distance = 111 # in bytes - -compaction_target_size = 111 # in bytes -compaction_period = '111 s' -compaction_threshold = 2 - -gc_period = '222 s' -gc_horizon = 222 - wait_lsn_timeout = '111 s' wal_redo_timeout = '111 s' @@ -751,12 +680,6 @@ id = 10 id: ZNodeId(10), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), - checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, - compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE, - compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?, - compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD, - gc_horizon: defaults::DEFAULT_GC_HORIZON, - gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?, wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?, wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?, superuser: defaults::DEFAULT_SUPERUSER.to_string(), @@ -768,6 +691,7 @@ id = 10 auth_validation_public_key_path: None, remote_storage_config: None, profiling: ProfilingConfig::Disabled, + default_tenant_conf: TenantConf::default(), }, "Correct defaults should be used when no config values are provided" ); @@ -798,12 +722,6 @@ id = 10 id: ZNodeId(10), listen_pg_addr: "127.0.0.1:64000".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(), - checkpoint_distance: 111, - compaction_target_size: 111, - compaction_period: Duration::from_secs(111), - compaction_threshold: 2, - gc_horizon: 222, - gc_period: Duration::from_secs(222), wait_lsn_timeout: Duration::from_secs(111), wal_redo_timeout: Duration::from_secs(111), superuser: "zzzz".to_string(), @@ -815,6 +733,7 @@ id = 10 auth_validation_public_key_path: None, remote_storage_config: None, profiling: ProfilingConfig::Disabled, + default_tenant_conf: TenantConf::default(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 9b51e48477..b24b3dc316 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -20,11 +20,18 @@ pub struct TimelineCreateRequest { } #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Default)] pub struct TenantCreateRequest { #[serde(default)] #[serde_as(as = "Option")] pub new_tenant_id: Option, + pub checkpoint_distance: Option, + pub compaction_target_size: Option, + pub compaction_period: Option, + pub compaction_threshold: Option, + pub gc_horizon: Option, + pub gc_period: Option, + pub pitr_interval: Option, } #[serde_as] @@ -36,3 +43,42 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId pub struct StatusResponse { pub id: ZNodeId, } + +impl TenantCreateRequest { + pub fn new(new_tenant_id: Option) -> TenantCreateRequest { + TenantCreateRequest { + new_tenant_id, + ..Default::default() + } + } +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct TenantConfigRequest { + pub tenant_id: ZTenantId, + #[serde(default)] + #[serde_as(as = "Option")] + pub checkpoint_distance: Option, + pub compaction_target_size: Option, + pub compaction_period: Option, + pub compaction_threshold: Option, + pub gc_horizon: Option, + pub gc_period: Option, + pub pitr_interval: Option, +} + +impl TenantConfigRequest { + pub fn new(tenant_id: ZTenantId) -> TenantConfigRequest { + TenantConfigRequest { + tenant_id, + checkpoint_distance: None, + compaction_target_size: None, + compaction_period: None, + compaction_threshold: None, + gc_horizon: None, + gc_period: None, + pitr_interval: None, + } + } +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index c0b07418f3..9932a2d08d 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -328,11 +328,7 @@ paths: content: application/json: schema: - type: object - properties: - new_tenant_id: - type: string - format: hex + $ref: "#/components/schemas/TenantCreateInfo" responses: "201": description: New tenant created successfully @@ -371,7 +367,48 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" - + /v1/tenant/config: + put: + description: | + Update tenant's config. + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/TenantConfigInfo" + responses: + "200": + description: OK + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/TenantInfo" + "400": + description: Malformed tenant config request + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" components: securitySchemes: JWT: @@ -389,6 +426,45 @@ components: type: string state: type: string + TenantCreateInfo: + type: object + properties: + new_tenant_id: + type: string + format: hex + tenant_id: + type: string + format: hex + gc_period: + type: string + gc_horizon: + type: integer + pitr_interval: + type: string + checkpoint_distance: + type: integer + compaction_period: + type: string + compaction_threshold: + type: string + TenantConfigInfo: + type: object + properties: + tenant_id: + type: string + format: hex + gc_period: + type: string + gc_horizon: + type: integer + pitr_interval: + type: string + checkpoint_distance: + type: integer + compaction_period: + type: string + compaction_threshold: + type: string TimelineInfo: type: object required: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 82ea5d1d09..2db56015ad 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -6,13 +6,15 @@ use hyper::{Body, Request, Response, Uri}; use tracing::*; use super::models::{ - StatusResponse, TenantCreateRequest, TenantCreateResponse, TimelineCreateRequest, + StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, + TimelineCreateRequest, }; use crate::config::RemoteStorageKind; use crate::remote_storage::{ download_index_part, schedule_timeline_download, LocalFs, RemoteIndex, RemoteTimeline, S3Bucket, }; use crate::repository::Repository; +use crate::tenant_config::TenantConfOpt; use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo}; use crate::{config::PageServerConf, tenant_mgr, timelines}; use utils::{ @@ -375,6 +377,27 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result) -> Result) -> Result, ApiError> { + let request_data: TenantConfigRequest = json_request(&mut request).await?; + let tenant_id = request_data.tenant_id; + // check for management permission + check_permission(&request, Some(tenant_id))?; + + let mut tenant_conf: TenantConfOpt = Default::default(); + if let Some(gc_period) = request_data.gc_period { + tenant_conf.gc_period = + Some(humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?); + } + tenant_conf.gc_horizon = request_data.gc_horizon; + + if let Some(pitr_interval) = request_data.pitr_interval { + tenant_conf.pitr_interval = + Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?); + } + + tenant_conf.checkpoint_distance = request_data.checkpoint_distance; + tenant_conf.compaction_target_size = request_data.compaction_target_size; + tenant_conf.compaction_threshold = request_data.compaction_threshold; + + if let Some(compaction_period) = request_data.compaction_period { + tenant_conf.compaction_period = + Some(humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?); + } + + tokio::task::spawn_blocking(move || { + let _enter = info_span!("tenant_config", tenant = ?tenant_id).entered(); + + tenant_mgr::update_tenant_config(tenant_conf, tenant_id) + }) + .await + .map_err(ApiError::from_err)??; + + Ok(json_response(StatusCode::OK, ())?) +} + async fn handler_404(_: Request) -> Result, ApiError> { json_response( StatusCode::NOT_FOUND, @@ -426,6 +488,7 @@ pub fn make_router( .get("/v1/status", status_handler) .get("/v1/tenant", tenant_list_handler) .post("/v1/tenant", tenant_create_handler) + .put("/v1/tenant/config", tenant_config_handler) .get("/v1/tenant/:tenant_id/timeline", timeline_list_handler) .post("/v1/tenant/:tenant_id/timeline", timeline_create_handler) .get( diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index ff6498a489..3afef51a23 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -29,11 +29,13 @@ use std::ops::{Bound::Included, Deref, Range}; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError}; -use std::time::Instant; +use std::time::{Duration, Instant, SystemTime}; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use crate::config::PageServerConf; use crate::keyspace::KeySpace; +use crate::tenant_config::{TenantConf, TenantConfOpt}; + use crate::page_cache; use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex}; use crate::repository::{ @@ -51,6 +53,7 @@ use metrics::{ register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, }; +use toml_edit; use utils::{ crashsafe_dir, lsn::{AtomicLsn, Lsn, RecordLsn}, @@ -149,7 +152,15 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; /// Repository consists of multiple timelines. Keep them in a hash table. /// pub struct LayeredRepository { + // Global pageserver config parameters pub conf: &'static PageServerConf, + + // Overridden tenant-specific config parameters. + // We keep TenantConfOpt sturct here to preserve the information + // about parameters that are not set. + // This is necessary to allow global config updates. + tenant_conf: Arc>, + tenantid: ZTenantId, timelines: Mutex>, // This mutex prevents creation of new timelines during GC. @@ -219,6 +230,7 @@ impl Repository for LayeredRepository { let timeline = LayeredTimeline::new( self.conf, + Arc::clone(&self.tenant_conf), metadata, None, timelineid, @@ -302,6 +314,7 @@ impl Repository for LayeredRepository { &self, target_timelineid: Option, horizon: u64, + pitr: Duration, checkpoint_before_gc: bool, ) -> Result { let timeline_str = target_timelineid @@ -311,7 +324,7 @@ impl Repository for LayeredRepository { STORAGE_TIME .with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str]) .observe_closure_duration(|| { - self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc) + self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc) }) } @@ -480,6 +493,64 @@ impl From for RepositoryTimeline { /// Private functions impl LayeredRepository { + pub fn get_checkpoint_distance(&self) -> u64 { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .checkpoint_distance + .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance) + } + + pub fn get_compaction_target_size(&self) -> u64 { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .compaction_target_size + .unwrap_or(self.conf.default_tenant_conf.compaction_target_size) + } + + pub fn get_compaction_period(&self) -> Duration { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .compaction_period + .unwrap_or(self.conf.default_tenant_conf.compaction_period) + } + + pub fn get_compaction_threshold(&self) -> usize { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .compaction_threshold + .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) + } + + pub fn get_gc_horizon(&self) -> u64 { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .gc_horizon + .unwrap_or(self.conf.default_tenant_conf.gc_horizon) + } + + pub fn get_gc_period(&self) -> Duration { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .gc_period + .unwrap_or(self.conf.default_tenant_conf.gc_period) + } + + pub fn get_pitr_interval(&self) -> Duration { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .pitr_interval + .unwrap_or(self.conf.default_tenant_conf.pitr_interval) + } + + pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> { + let mut tenant_conf = self.tenant_conf.write().unwrap(); + + tenant_conf.update(&new_tenant_conf); + + LayeredRepository::persist_tenant_config(self.conf, self.tenantid, *tenant_conf)?; + Ok(()) + } + // Implementation of the public `get_timeline` function. // Differences from the public: // * interface in that the caller must already hold the mutex on the 'timelines' hashmap. @@ -553,8 +624,10 @@ impl LayeredRepository { .flatten() .map(LayeredTimelineEntry::Loaded); let _enter = info_span!("loading local timeline").entered(); + let timeline = LayeredTimeline::new( self.conf, + Arc::clone(&self.tenant_conf), metadata, ancestor, timelineid, @@ -571,6 +644,7 @@ impl LayeredRepository { pub fn new( conf: &'static PageServerConf, + tenant_conf: TenantConfOpt, walredo_mgr: Arc, tenantid: ZTenantId, remote_index: RemoteIndex, @@ -579,6 +653,7 @@ impl LayeredRepository { LayeredRepository { tenantid, conf, + tenant_conf: Arc::new(RwLock::new(tenant_conf)), timelines: Mutex::new(HashMap::new()), gc_cs: Mutex::new(()), walredo_mgr, @@ -587,6 +662,71 @@ impl LayeredRepository { } } + /// Locate and load config + pub fn load_tenant_config( + conf: &'static PageServerConf, + tenantid: ZTenantId, + ) -> anyhow::Result { + let target_config_path = TenantConf::path(conf, tenantid); + + info!("load tenantconf from {}", target_config_path.display()); + + // FIXME If the config file is not found, assume that we're attaching + // a detached tenant and config is passed via attach command. + // https://github.com/neondatabase/neon/issues/1555 + if !target_config_path.exists() { + info!( + "Zenith tenant config is not found in {}", + target_config_path.display() + ); + return Ok(Default::default()); + } + + // load and parse file + let config = fs::read_to_string(target_config_path)?; + + let toml = config.parse::()?; + + let mut tenant_conf: TenantConfOpt = Default::default(); + for (key, item) in toml.iter() { + match key { + "tenant_conf" => { + tenant_conf = PageServerConf::parse_toml_tenant_conf(item)?; + } + _ => bail!("unrecognized pageserver option '{}'", key), + } + } + + Ok(tenant_conf) + } + + pub fn persist_tenant_config( + conf: &'static PageServerConf, + tenantid: ZTenantId, + tenant_conf: TenantConfOpt, + ) -> anyhow::Result<()> { + let _enter = info_span!("saving tenantconf").entered(); + let target_config_path = TenantConf::path(conf, tenantid); + info!("save tenantconf to {}", target_config_path.display()); + + let mut conf_content = r#"# This file contains a specific per-tenant's config. +# It is read in case of pageserver restart. + +# [tenant_config] +"# + .to_string(); + + // Convert the config to a toml file. + conf_content += &toml_edit::easy::to_string(&tenant_conf)?; + + fs::write(&target_config_path, conf_content).with_context(|| { + format!( + "Failed to write config file into path '{}'", + target_config_path.display() + ) + }) + } + /// Save timeline metadata to file fn save_metadata( conf: &'static PageServerConf, @@ -662,6 +802,7 @@ impl LayeredRepository { &self, target_timelineid: Option, horizon: u64, + pitr: Duration, checkpoint_before_gc: bool, ) -> Result { let _span_guard = @@ -738,7 +879,7 @@ impl LayeredRepository { timeline.checkpoint(CheckpointConfig::Forced)?; info!("timeline {} checkpoint_before_gc done", timelineid); } - timeline.update_gc_info(branchpoints, cutoff); + timeline.update_gc_info(branchpoints, cutoff, pitr); let result = timeline.gc()?; totals += result; @@ -753,6 +894,7 @@ impl LayeredRepository { pub struct LayeredTimeline { conf: &'static PageServerConf, + tenant_conf: Arc>, tenantid: ZTenantId, timelineid: ZTimelineId, @@ -857,6 +999,11 @@ struct GcInfo { /// /// FIXME: is this inclusive or exclusive? cutoff: Lsn, + + /// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()' + /// minus 'pitr_interval' + /// + pitr: Duration, } /// Public interface functions @@ -987,12 +1134,34 @@ impl Timeline for LayeredTimeline { } impl LayeredTimeline { + fn get_checkpoint_distance(&self) -> u64 { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .checkpoint_distance + .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance) + } + + fn get_compaction_target_size(&self) -> u64 { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .compaction_target_size + .unwrap_or(self.conf.default_tenant_conf.compaction_target_size) + } + + fn get_compaction_threshold(&self) -> usize { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .compaction_threshold + .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) + } + /// Open a Timeline handle. /// /// Loads the metadata for the timeline into memory, but not the layer map. #[allow(clippy::too_many_arguments)] fn new( conf: &'static PageServerConf, + tenant_conf: Arc>, metadata: TimelineMetadata, ancestor: Option, timelineid: ZTimelineId, @@ -1036,6 +1205,7 @@ impl LayeredTimeline { LayeredTimeline { conf, + tenant_conf, timelineid, tenantid, layers: RwLock::new(LayerMap::default()), @@ -1071,6 +1241,7 @@ impl LayeredTimeline { gc_info: RwLock::new(GcInfo { retain_lsns: Vec::new(), cutoff: Lsn(0), + pitr: Duration::ZERO, }), latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), @@ -1431,7 +1602,7 @@ impl LayeredTimeline { let last_lsn = self.get_last_record_lsn(); let distance = last_lsn.widening_sub(self.last_freeze_at.load()); - if distance >= self.conf.checkpoint_distance.into() { + if distance >= self.get_checkpoint_distance().into() { self.freeze_inmem_layer(true); self.last_freeze_at.store(last_lsn); } @@ -1640,13 +1811,15 @@ impl LayeredTimeline { // above. Rewrite it. let _compaction_cs = self.compaction_cs.lock().unwrap(); - let target_file_size = self.conf.checkpoint_distance; + let target_file_size = self.get_checkpoint_distance(); // Define partitioning schema if needed if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid) { - let (partitioning, lsn) = - pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?; + let (partitioning, lsn) = pgdir.repartition( + self.get_last_record_lsn(), + self.get_compaction_target_size(), + )?; let timer = self.create_images_time_histo.start_timer(); // 2. Create new image layers for partitions that have been modified // "enough". @@ -1747,7 +1920,7 @@ impl LayeredTimeline { // We compact or "shuffle" the level-0 delta layers when they've // accumulated over the compaction threshold. - if level0_deltas.len() < self.conf.compaction_threshold { + if level0_deltas.len() < self.get_compaction_threshold() { return Ok(()); } drop(layers); @@ -1870,10 +2043,11 @@ impl LayeredTimeline { /// the latest LSN subtracted by a constant, and doesn't do anything smart /// to figure out what read-only nodes might actually need.) /// - fn update_gc_info(&self, retain_lsns: Vec, cutoff: Lsn) { + fn update_gc_info(&self, retain_lsns: Vec, cutoff: Lsn, pitr: Duration) { let mut gc_info = self.gc_info.write().unwrap(); gc_info.retain_lsns = retain_lsns; gc_info.cutoff = cutoff; + gc_info.pitr = pitr; } /// @@ -1884,7 +2058,7 @@ impl LayeredTimeline { /// obsolete. /// fn gc(&self) -> Result { - let now = Instant::now(); + let now = SystemTime::now(); let mut result: GcResult = Default::default(); let disk_consistent_lsn = self.get_disk_consistent_lsn(); @@ -1893,6 +2067,7 @@ impl LayeredTimeline { let gc_info = self.gc_info.read().unwrap(); let retain_lsns = &gc_info.retain_lsns; let cutoff = gc_info.cutoff; + let pitr = gc_info.pitr; let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered(); @@ -1910,8 +2085,9 @@ impl LayeredTimeline { // // Garbage collect the layer if all conditions are satisfied: // 1. it is older than cutoff LSN; - // 2. it doesn't need to be retained for 'retain_lsns'; - // 3. newer on-disk image layers cover the layer's whole key range + // 2. it is older than PITR interval; + // 3. it doesn't need to be retained for 'retain_lsns'; + // 4. newer on-disk image layers cover the layer's whole key range // let mut layers = self.layers.write().unwrap(); 'outer: for l in layers.iter_historic_layers() { @@ -1937,8 +2113,31 @@ impl LayeredTimeline { result.layers_needed_by_cutoff += 1; continue 'outer; } - - // 2. Is it needed by a child branch? + // 2. It is newer than PiTR interval? + // We use modification time of layer file to estimate update time. + // This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill. + // It is not expected that users will need high precision here. And this estimation + // is conservative: modification time of file is always newer than actual time of version + // creation. So it is safe for users. + // TODO A possible "bloat" issue still persists here. + // If modification time changes because of layer upload/download, we will keep these files + // longer than necessary. + // https://github.com/neondatabase/neon/issues/1554 + // + if let Ok(metadata) = fs::metadata(&l.filename()) { + let last_modified = metadata.modified()?; + if now.duration_since(last_modified)? < pitr { + debug!( + "keeping {} because it's modification time {:?} is newer than PITR {:?}", + l.filename().display(), + last_modified, + pitr + ); + result.layers_needed_by_pitr += 1; + continue 'outer; + } + } + // 3. Is it needed by a child branch? // NOTE With that wee would keep data that // might be referenced by child branches forever. // We can track this in child timeline GC and delete parent layers when @@ -1957,7 +2156,7 @@ impl LayeredTimeline { } } - // 3. Is there a later on-disk layer for this relation? + // 4. Is there a later on-disk layer for this relation? // // The end-LSN is exclusive, while disk_consistent_lsn is // inclusive. For example, if disk_consistent_lsn is 100, it is @@ -1998,7 +2197,7 @@ impl LayeredTimeline { result.layers_removed += 1; } - result.elapsed = now.elapsed(); + result.elapsed = now.elapsed()?; Ok(result) } @@ -2275,7 +2474,8 @@ pub mod tests { } let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff); + + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2345,7 +2545,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2422,7 +2622,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index a761f0dfe2..94219c7840 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -11,6 +11,7 @@ pub mod profiling; pub mod reltag; pub mod remote_storage; pub mod repository; +pub mod tenant_config; pub mod tenant_mgr; pub mod tenant_threads; pub mod thread_mgr; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8c90195131..58d617448a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -19,6 +19,7 @@ use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::{Arc, RwLockReadGuard}; +use std::time::Duration; use tracing::*; use utils::{ auth::{self, Claims, JwtAuth, Scope}, @@ -676,6 +677,37 @@ impl postgres_backend::Handler for PageServerHandler { } } pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("show ") { + // show + let (_, params_raw) = query_string.split_at("show ".len()); + let params = params_raw.split(' ').collect::>(); + ensure!(params.len() == 1, "invalid param number for config command"); + let tenantid = ZTenantId::from_str(params[0])?; + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + pgb.write_message_noflush(&BeMessage::RowDescription(&[ + RowDescriptor::int8_col(b"checkpoint_distance"), + RowDescriptor::int8_col(b"compaction_target_size"), + RowDescriptor::int8_col(b"compaction_period"), + RowDescriptor::int8_col(b"compaction_threshold"), + RowDescriptor::int8_col(b"gc_horizon"), + RowDescriptor::int8_col(b"gc_period"), + RowDescriptor::int8_col(b"pitr_interval"), + ]))? + .write_message_noflush(&BeMessage::DataRow(&[ + Some(repo.get_checkpoint_distance().to_string().as_bytes()), + Some(repo.get_compaction_target_size().to_string().as_bytes()), + Some( + repo.get_compaction_period() + .as_secs() + .to_string() + .as_bytes(), + ), + Some(repo.get_compaction_threshold().to_string().as_bytes()), + Some(repo.get_gc_horizon().to_string().as_bytes()), + Some(repo.get_gc_period().as_secs().to_string().as_bytes()), + Some(repo.get_pitr_interval().as_secs().to_string().as_bytes()), + ]))? + .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("do_gc ") { // Run GC immediately on given timeline. // FIXME: This is just for tests. See test_runner/batch_others/test_gc.py. @@ -693,16 +725,20 @@ impl postgres_backend::Handler for PageServerHandler { let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let gc_horizon: u64 = caps .get(4) .map(|h| h.as_str().parse()) - .unwrap_or(Ok(self.conf.gc_horizon))?; + .unwrap_or_else(|| Ok(repo.get_gc_horizon()))?; let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; + let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"layers_total"), RowDescriptor::int8_col(b"layers_needed_by_cutoff"), + RowDescriptor::int8_col(b"layers_needed_by_pitr"), RowDescriptor::int8_col(b"layers_needed_by_branches"), RowDescriptor::int8_col(b"layers_not_updated"), RowDescriptor::int8_col(b"layers_removed"), @@ -711,6 +747,7 @@ impl postgres_backend::Handler for PageServerHandler { .write_message_noflush(&BeMessage::DataRow(&[ Some(result.layers_total.to_string().as_bytes()), Some(result.layers_needed_by_cutoff.to_string().as_bytes()), + Some(result.layers_needed_by_pitr.to_string().as_bytes()), Some(result.layers_needed_by_branches.to_string().as_bytes()), Some(result.layers_not_updated.to_string().as_bytes()), Some(result.layers_removed.to_string().as_bytes()), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index fc438cce9c..f7c2f036a6 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -249,6 +249,7 @@ pub trait Repository: Send + Sync { &self, timelineid: Option, horizon: u64, + pitr: Duration, checkpoint_before_gc: bool, ) -> Result; @@ -305,6 +306,7 @@ impl<'a, T> From<&'a RepositoryTimeline> for LocalTimelineState { pub struct GcResult { pub layers_total: u64, pub layers_needed_by_cutoff: u64, + pub layers_needed_by_pitr: u64, pub layers_needed_by_branches: u64, pub layers_not_updated: u64, pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. @@ -315,6 +317,7 @@ pub struct GcResult { impl AddAssign for GcResult { fn add_assign(&mut self, other: Self) { self.layers_total += other.layers_total; + self.layers_needed_by_pitr += other.layers_needed_by_pitr; self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; self.layers_needed_by_branches += other.layers_needed_by_branches; self.layers_not_updated += other.layers_not_updated; @@ -432,6 +435,7 @@ pub mod repo_harness { }; use super::*; + use crate::tenant_config::{TenantConf, TenantConfOpt}; use hex_literal::hex; use utils::zid::ZTenantId; @@ -454,8 +458,23 @@ pub mod repo_harness { static ref LOCK: RwLock<()> = RwLock::new(()); } + impl From for TenantConfOpt { + fn from(tenant_conf: TenantConf) -> Self { + Self { + checkpoint_distance: Some(tenant_conf.checkpoint_distance), + compaction_target_size: Some(tenant_conf.compaction_target_size), + compaction_period: Some(tenant_conf.compaction_period), + compaction_threshold: Some(tenant_conf.compaction_threshold), + gc_horizon: Some(tenant_conf.gc_horizon), + gc_period: Some(tenant_conf.gc_period), + pitr_interval: Some(tenant_conf.pitr_interval), + } + } + } + pub struct RepoHarness<'a> { pub conf: &'static PageServerConf, + pub tenant_conf: TenantConf, pub tenant_id: ZTenantId, pub lock_guard: ( @@ -487,12 +506,15 @@ pub mod repo_harness { // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + let tenant_conf = TenantConf::dummy_conf(); + let tenant_id = ZTenantId::generate(); fs::create_dir_all(conf.tenant_path(&tenant_id))?; fs::create_dir_all(conf.timelines_path(&tenant_id))?; Ok(Self { conf, + tenant_conf, tenant_id, lock_guard, }) @@ -507,6 +529,7 @@ pub mod repo_harness { let repo = LayeredRepository::new( self.conf, + TenantConfOpt::from(self.tenant_conf), walredo_mgr, self.tenant_id, RemoteIndex::empty(), @@ -722,7 +745,7 @@ mod tests { // FIXME: this doesn't actually remove any layer currently, given how the checkpointing // and compaction works. But it does set the 'cutoff' point so that the cross check // below should fail. - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; // try to branch at lsn 25, should fail because we already garbage collected the data match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) { @@ -773,7 +796,7 @@ mod tests { let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; make_some_layers(tline.as_ref(), Lsn(0x20))?; - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn(); assert!(*latest_gc_cutoff_lsn > Lsn(0x25)); match tline.get(*TEST_KEY, Lsn(0x25)) { @@ -796,7 +819,7 @@ mod tests { .get_timeline_load(NEW_TIMELINE_ID) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok()); Ok(()) @@ -815,7 +838,7 @@ mod tests { make_some_layers(newtline.as_ref(), Lsn(0x60))?; // run gc on parent - repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; // Check that the data is still accessible on the branch. assert_eq!( diff --git a/pageserver/src/tenant_config.rs b/pageserver/src/tenant_config.rs new file mode 100644 index 0000000000..818b6de1b1 --- /dev/null +++ b/pageserver/src/tenant_config.rs @@ -0,0 +1,162 @@ +//! Functions for handling per-tenant configuration options +//! +//! If tenant is created with --config option, +//! the tenant-specific config will be stored in tenant's directory. +//! Otherwise, global pageserver's config is used. +//! +//! If the tenant config file is corrupted, the tenant will be disabled. +//! We cannot use global or default config instead, because wrong settings +//! may lead to a data loss. +//! +use crate::config::PageServerConf; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::time::Duration; +use utils::zid::ZTenantId; + +pub const TENANT_CONFIG_NAME: &str = "config"; + +pub mod defaults { + // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB + // would be more appropriate. But a low value forces the code to be exercised more, + // which is good for now to trigger bugs. + // This parameter actually determines L0 layer file size. + pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; + + // Target file size, when creating image and delta layers. + // This parameter determines L1 layer file size. + pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024; + + pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s"; + pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10; + + pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; + pub const DEFAULT_GC_PERIOD: &str = "100 s"; + pub const DEFAULT_PITR_INTERVAL: &str = "30 days"; +} + +/// Per-tenant configuration options +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenantConf { + // Flush out an inmemory layer, if it's holding WAL older than this + // This puts a backstop on how much WAL needs to be re-digested if the + // page server crashes. + // This parameter actually determines L0 layer file size. + pub checkpoint_distance: u64, + // Target file size, when creating image and delta layers. + // This parameter determines L1 layer file size. + pub compaction_target_size: u64, + // How often to check if there's compaction work to be done. + pub compaction_period: Duration, + // Level0 delta layer threshold for compaction. + pub compaction_threshold: usize, + // Determines how much history is retained, to allow + // branching and read replicas at an older point in time. + // The unit is #of bytes of WAL. + // Page versions older than this are garbage collected away. + pub gc_horizon: u64, + // Interval at which garbage collection is triggered. + pub gc_period: Duration, + // Determines how much history is retained, to allow + // branching and read replicas at an older point in time. + // The unit is time. + // Page versions older than this are garbage collected away. + pub pitr_interval: Duration, +} + +/// Same as TenantConf, but this struct preserves the information about +/// which parameters are set and which are not. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct TenantConfOpt { + pub checkpoint_distance: Option, + pub compaction_target_size: Option, + pub compaction_period: Option, + pub compaction_threshold: Option, + pub gc_horizon: Option, + pub gc_period: Option, + pub pitr_interval: Option, +} + +impl TenantConfOpt { + pub fn merge(&self, global_conf: TenantConf) -> TenantConf { + TenantConf { + checkpoint_distance: self + .checkpoint_distance + .unwrap_or(global_conf.checkpoint_distance), + compaction_target_size: self + .compaction_target_size + .unwrap_or(global_conf.compaction_target_size), + compaction_period: self + .compaction_period + .unwrap_or(global_conf.compaction_period), + compaction_threshold: self + .compaction_threshold + .unwrap_or(global_conf.compaction_threshold), + gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon), + gc_period: self.gc_period.unwrap_or(global_conf.gc_period), + pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval), + } + } + + pub fn update(&mut self, other: &TenantConfOpt) { + if let Some(checkpoint_distance) = other.checkpoint_distance { + self.checkpoint_distance = Some(checkpoint_distance); + } + if let Some(compaction_target_size) = other.compaction_target_size { + self.compaction_target_size = Some(compaction_target_size); + } + if let Some(compaction_period) = other.compaction_period { + self.compaction_period = Some(compaction_period); + } + if let Some(compaction_threshold) = other.compaction_threshold { + self.compaction_threshold = Some(compaction_threshold); + } + if let Some(gc_horizon) = other.gc_horizon { + self.gc_horizon = Some(gc_horizon); + } + if let Some(gc_period) = other.gc_period { + self.gc_period = Some(gc_period); + } + if let Some(pitr_interval) = other.pitr_interval { + self.pitr_interval = Some(pitr_interval); + } + } +} + +impl TenantConf { + pub fn default() -> TenantConf { + use defaults::*; + + TenantConf { + checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE, + compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE, + compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD) + .expect("cannot parse default compaction period"), + compaction_threshold: DEFAULT_COMPACTION_THRESHOLD, + gc_horizon: DEFAULT_GC_HORIZON, + gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD) + .expect("cannot parse default gc period"), + pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL) + .expect("cannot parse default PITR interval"), + } + } + + /// Points to a place in pageserver's local directory, + /// where certain tenant's tenantconf file should be located. + pub fn path(conf: &'static PageServerConf, tenantid: ZTenantId) -> PathBuf { + conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME) + } + + #[cfg(test)] + pub fn dummy_conf() -> Self { + TenantConf { + checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, + compaction_target_size: 4 * 1024 * 1024, + compaction_period: Duration::from_secs(10), + compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD, + gc_horizon: defaults::DEFAULT_GC_HORIZON, + gc_period: Duration::from_secs(10), + pitr_interval: Duration::from_secs(60 * 60), + } + } +} diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 33bb4dc2e0..8a69062dba 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -5,6 +5,7 @@ use crate::config::PageServerConf; use crate::layered_repository::LayeredRepository; use crate::remote_storage::RemoteIndex; use crate::repository::{Repository, TimelineSyncStatusUpdate}; +use crate::tenant_config::TenantConfOpt; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::timelines; @@ -63,13 +64,13 @@ fn access_tenants() -> MutexGuard<'static, HashMap> { TENANTS.lock().unwrap() } -// Sets up wal redo manager and repository for tenant. Reduces code duplocation. +// Sets up wal redo manager and repository for tenant. Reduces code duplication. // Used during pageserver startup, or when new tenant is attached to pageserver. pub fn load_local_repo( conf: &'static PageServerConf, tenant_id: ZTenantId, remote_index: &RemoteIndex, -) -> Arc { +) -> Result> { let mut m = access_tenants(); let tenant = m.entry(tenant_id).or_insert_with(|| { // Set up a WAL redo manager, for applying WAL records. @@ -78,6 +79,7 @@ pub fn load_local_repo( // Set up an object repository, for actual data storage. let repo: Arc = Arc::new(LayeredRepository::new( conf, + Default::default(), Arc::new(walredo_mgr), tenant_id, remote_index.clone(), @@ -89,7 +91,12 @@ pub fn load_local_repo( timelines: HashMap::new(), } }); - Arc::clone(&tenant.repo) + + // Restore tenant config + let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?; + tenant.repo.update_tenant_config(tenant_conf)?; + + Ok(Arc::clone(&tenant.repo)) } /// Updates tenants' repositories, changing their timelines state in memory. @@ -109,7 +116,16 @@ pub fn apply_timeline_sync_status_updates( trace!("Sync status updates: {:?}", sync_status_updates); for (tenant_id, tenant_timelines_sync_status_updates) in sync_status_updates { - let repo = load_local_repo(conf, tenant_id, remote_index); + let repo = match load_local_repo(conf, tenant_id, remote_index) { + Ok(repo) => repo, + Err(e) => { + error!( + "Failed to load repo for tenant {} Error: {:#}", + tenant_id, e + ); + continue; + } + }; for (timeline_id, timeline_sync_status_update) in tenant_timelines_sync_status_updates { match repo.apply_timeline_remote_sync_status_update(timeline_id, timeline_sync_status_update) @@ -174,6 +190,7 @@ pub fn shutdown_all_tenants() { pub fn create_tenant_repository( conf: &'static PageServerConf, + tenant_conf: TenantConfOpt, tenantid: ZTenantId, remote_index: RemoteIndex, ) -> Result> { @@ -186,6 +203,7 @@ pub fn create_tenant_repository( let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); let repo = timelines::create_repo( conf, + tenant_conf, tenantid, CreateRepo::Real { wal_redo_manager, @@ -202,6 +220,14 @@ pub fn create_tenant_repository( } } +pub fn update_tenant_config(tenant_conf: TenantConfOpt, tenantid: ZTenantId) -> Result<()> { + info!("configuring tenant {}", tenantid); + let repo = get_repository_for_tenant(tenantid)?; + + repo.update_tenant_config(tenant_conf)?; + Ok(()) +} + pub fn get_tenant_state(tenantid: ZTenantId) -> Option { Some(access_tenants().get(&tenantid)?.state) } @@ -210,7 +236,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option { /// Change the state of a tenant to Active and launch its compactor and GC /// threads. If the tenant was already in Active state or Stopping, does nothing. /// -pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> { +pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> { let mut m = access_tenants(); let tenant = m .get_mut(&tenant_id) @@ -230,7 +256,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R None, "Compactor thread", true, - move || crate::tenant_threads::compact_loop(tenant_id, conf), + move || crate::tenant_threads::compact_loop(tenant_id), )?; let gc_spawn_result = thread_mgr::spawn( @@ -239,7 +265,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R None, "GC thread", true, - move || crate::tenant_threads::gc_loop(tenant_id, conf), + move || crate::tenant_threads::gc_loop(tenant_id), ) .with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id)); @@ -251,7 +277,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); return gc_spawn_result; } - tenant.state = TenantState::Active; } @@ -290,7 +315,7 @@ pub fn get_timeline_for_tenant_load( .get_timeline_load(timelineid) .with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?; - let repartition_distance = tenant.repo.conf.checkpoint_distance / 10; + let repartition_distance = tenant.repo.get_checkpoint_distance() / 10; let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance)); page_tline.init_logical_size()?; diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 4dcc15f817..b904d9040d 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,6 +1,5 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC -use crate::config::PageServerConf; use crate::repository::Repository; use crate::tenant_mgr; use crate::tenant_mgr::TenantState; @@ -12,8 +11,8 @@ use utils::zid::ZTenantId; /// /// Compaction thread's main loop /// -pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { - if let Err(err) = compact_loop_ext(tenantid, conf) { +pub fn compact_loop(tenantid: ZTenantId) -> Result<()> { + if let Err(err) = compact_loop_ext(tenantid) { error!("compact loop terminated with error: {:?}", err); Err(err) } else { @@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul } } -fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { +fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { loop { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { break; } + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let compaction_period = repo.get_compaction_period(); - std::thread::sleep(conf.compaction_period); + std::thread::sleep(compaction_period); trace!("compaction thread for tenant {} waking up", tenantid); // Compact timelines @@ -46,23 +47,23 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul /// /// GC thread's main loop /// -pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { +pub fn gc_loop(tenantid: ZTenantId) -> Result<()> { loop { if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { break; } trace!("gc thread for tenant {} waking up", tenantid); - + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let gc_horizon = repo.get_gc_horizon(); // Garbage collect old files that are not needed for PITR anymore - if conf.gc_horizon > 0 { - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.gc_iteration(None, conf.gc_horizon, false)?; + if gc_horizon > 0 { + repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; } // TODO Write it in more adequate way using // condvar.wait_timeout() or something - let mut sleep_time = conf.gc_period.as_secs(); + let mut sleep_time = repo.get_gc_period().as_secs(); while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active) { sleep_time -= 1; diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index abbabc8b31..adc531e6bb 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -25,6 +25,7 @@ use crate::{ layered_repository::metadata::TimelineMetadata, remote_storage::RemoteIndex, repository::{LocalTimelineState, Repository}, + tenant_config::TenantConfOpt, DatadirTimeline, RepositoryImpl, }; use crate::{import_datadir, LOG_FILE_NAME}; @@ -151,8 +152,8 @@ pub fn init_pageserver( if let Some(tenant_id) = create_tenant { println!("initializing tenantid {}", tenant_id); - let repo = - create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?; + let repo = create_repo(conf, Default::default(), tenant_id, CreateRepo::Dummy) + .context("failed to create repo")?; let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate); bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref()) .context("failed to create initial timeline")?; @@ -175,6 +176,7 @@ pub enum CreateRepo { pub fn create_repo( conf: &'static PageServerConf, + tenant_conf: TenantConfOpt, tenant_id: ZTenantId, create_repo: CreateRepo, ) -> Result> { @@ -211,8 +213,12 @@ pub fn create_repo( crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?; info!("created directory structure in {}", repo_dir.display()); + // Save tenant's config + LayeredRepository::persist_tenant_config(conf, tenant_id, tenant_conf)?; + Ok(Arc::new(LayeredRepository::new( conf, + tenant_conf, wal_redo_manager, tenant_id, remote_index, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index ce4e4d45fb..357aab7221 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -93,7 +93,7 @@ pub fn launch_wal_receiver( receivers.insert((tenantid, timelineid), receiver); // Update tenant state and start tenant threads, if they are not running yet. - tenant_mgr::activate_tenant(conf, tenantid)?; + tenant_mgr::activate_tenant(tenantid)?; } }; Ok(()) diff --git a/test_runner/batch_others/test_tenant_conf.py b/test_runner/batch_others/test_tenant_conf.py new file mode 100644 index 0000000000..f74e6aad1d --- /dev/null +++ b/test_runner/batch_others/test_tenant_conf.py @@ -0,0 +1,49 @@ +from contextlib import closing + +import pytest + +from fixtures.zenith_fixtures import ZenithEnvBuilder + + +def test_tenant_config(zenith_env_builder: ZenithEnvBuilder): + env = zenith_env_builder.init_start() + """Test per tenant configuration""" + tenant = env.zenith_cli.create_tenant( + conf={ + 'checkpoint_distance': '10000', + 'compaction_target_size': '1048576', + 'compaction_period': '60sec', + 'compaction_threshold': '20', + 'gc_horizon': '1024', + 'gc_period': '100sec', + 'pitr_interval': '3600sec', + }) + + env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant) + pg = env.postgres.create_start( + "test_tenant_conf", + "main", + tenant, + ) + + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor() as pscur: + pscur.execute(f"show {tenant.hex}") + assert pscur.fetchone() == (10000, 1048576, 60, 20, 1024, 100, 3600) + + # update the config and ensure that it has changed + env.zenith_cli.config_tenant(tenant_id=tenant, + conf={ + 'checkpoint_distance': '100000', + 'compaction_target_size': '1048576', + 'compaction_period': '30sec', + 'compaction_threshold': '15', + 'gc_horizon': '256', + 'gc_period': '10sec', + 'pitr_interval': '360sec', + }) + + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor() as pscur: + pscur.execute(f"show {tenant.hex}") + assert pscur.fetchone() == (100000, 1048576, 30, 15, 256, 10, 360) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 9a2d6cdc88..d295a79953 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -835,16 +835,35 @@ class ZenithCli: self.env = env pass - def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID: + def create_tenant(self, + tenant_id: Optional[uuid.UUID] = None, + conf: Optional[Dict[str, str]] = None) -> uuid.UUID: """ Creates a new tenant, returns its id and its initial timeline's id. """ if tenant_id is None: tenant_id = uuid.uuid4() - res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex]) + if conf is None: + res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex]) + else: + res = self.raw_cli( + ['tenant', 'create', '--tenant-id', tenant_id.hex] + + sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), [])) res.check_returncode() return tenant_id + def config_tenant(self, tenant_id: uuid.UUID, conf: Dict[str, str]): + """ + Update tenant config. + """ + if conf is None: + res = self.raw_cli(['tenant', 'config', '--tenant-id', tenant_id.hex]) + else: + res = self.raw_cli( + ['tenant', 'config', '--tenant-id', tenant_id.hex] + + sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), [])) + res.check_returncode() + def list_tenants(self) -> 'subprocess.CompletedProcess[str]': res = self.raw_cli(['tenant', 'list']) res.check_returncode() diff --git a/zenith/src/main.rs b/zenith/src/main.rs index afbbbe395b..cd0cf470e8 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -166,7 +166,12 @@ fn main() -> Result<()> { .subcommand(App::new("create") .arg(tenant_id_arg.clone()) .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) - ) + .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) + ) + .subcommand(App::new("config") + .arg(tenant_id_arg.clone()) + .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) + ) ) .subcommand( App::new("pageserver") @@ -523,8 +528,12 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re } Some(("create", create_match)) => { let initial_tenant_id = parse_tenant_id(create_match)?; + let tenant_conf: HashMap<_, _> = create_match + .values_of("config") + .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) + .unwrap_or_default(); let new_tenant_id = pageserver - .tenant_create(initial_tenant_id)? + .tenant_create(initial_tenant_id, tenant_conf)? .ok_or_else(|| { anyhow!("Tenant with id {:?} was already created", initial_tenant_id) })?; @@ -533,6 +542,27 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re new_tenant_id ); } + Some(("config", create_match)) => { + let tenant_id = get_tenant_id(create_match, env)?; + let tenant_conf: HashMap<_, _> = create_match + .values_of("config") + .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) + .unwrap_or_default(); + + pageserver + .tenant_config(tenant_id, tenant_conf) + .unwrap_or_else(|e| { + anyhow!( + "Tenant config failed for tenant with id {} : {}", + tenant_id, + e + ); + }); + println!( + "tenant {} successfully configured on the pageserver", + tenant_id + ); + } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), }