mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-07 04:30:36 +00:00
Compare commits
9 Commits
mx_offset_
...
tenant_con
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d3b15fc3e | ||
|
|
c93d31f9db | ||
|
|
47ece2be8b | ||
|
|
3da8233f08 | ||
|
|
51c7ceb1d9 | ||
|
|
5169d664c5 | ||
|
|
1bb364b5bc | ||
|
|
7e6eff4969 | ||
|
|
baac8ac410 |
@@ -1,3 +1,4 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::TcpStream;
|
use std::net::TcpStream;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -342,10 +343,25 @@ impl PageServerNode {
|
|||||||
pub fn tenant_create(
|
pub fn tenant_create(
|
||||||
&self,
|
&self,
|
||||||
new_tenant_id: Option<ZTenantId>,
|
new_tenant_id: Option<ZTenantId>,
|
||||||
|
settings: HashMap<&str, &str>,
|
||||||
) -> anyhow::Result<Option<ZTenantId>> {
|
) -> anyhow::Result<Option<ZTenantId>> {
|
||||||
let tenant_id_string = self
|
let tenant_id_string = self
|
||||||
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
|
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
|
||||||
.json(&TenantCreateRequest { new_tenant_id })
|
.json(&TenantCreateRequest {
|
||||||
|
new_tenant_id,
|
||||||
|
checkpoint_distance: settings
|
||||||
|
.get("checkpoint_distance")
|
||||||
|
.map(|x| x.parse::<u64>().unwrap()),
|
||||||
|
compaction_target_size: settings
|
||||||
|
.get("compaction_target_size")
|
||||||
|
.map(|x| x.parse::<u64>().unwrap()),
|
||||||
|
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
|
||||||
|
gc_horizon: settings
|
||||||
|
.get("gc_horizon")
|
||||||
|
.map(|x| x.parse::<u64>().unwrap()),
|
||||||
|
gc_period: settings.get("gc_period").map(|x| x.to_string()),
|
||||||
|
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
|
||||||
|
})
|
||||||
.send()?
|
.send()?
|
||||||
.error_from_body()?
|
.error_from_body()?
|
||||||
.json::<Option<String>>()?;
|
.json::<Option<String>>()?;
|
||||||
|
|||||||
@@ -27,21 +27,6 @@ pub mod defaults {
|
|||||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||||
|
|
||||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
|
||||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
|
||||||
// which is good for now to trigger bugs.
|
|
||||||
// This parameter actually determines L0 layer file size.
|
|
||||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
|
||||||
|
|
||||||
// Target file size, when creating image and delta layers.
|
|
||||||
// This parameter determines L1 layer file size.
|
|
||||||
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
|
||||||
|
|
||||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
|
|
||||||
|
|
||||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
|
||||||
pub const DEFAULT_GC_PERIOD: &str = "100 s";
|
|
||||||
|
|
||||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||||
|
|
||||||
@@ -62,13 +47,6 @@ pub mod defaults {
|
|||||||
#listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
|
#listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
|
||||||
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
|
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
|
||||||
|
|
||||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
|
||||||
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
|
|
||||||
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
|
|
||||||
|
|
||||||
#gc_period = '{DEFAULT_GC_PERIOD}'
|
|
||||||
#gc_horizon = {DEFAULT_GC_HORIZON}
|
|
||||||
|
|
||||||
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
|
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
|
||||||
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
|
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
|
||||||
|
|
||||||
@@ -94,22 +72,6 @@ pub struct PageServerConf {
|
|||||||
/// Example (default): 127.0.0.1:9898
|
/// Example (default): 127.0.0.1:9898
|
||||||
pub listen_http_addr: String,
|
pub listen_http_addr: String,
|
||||||
|
|
||||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
|
||||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
|
||||||
// page server crashes.
|
|
||||||
// This parameter actually determines L0 layer file size.
|
|
||||||
pub checkpoint_distance: u64,
|
|
||||||
|
|
||||||
// Target file size, when creating image and delta layers.
|
|
||||||
// This parameter determines L1 layer file size.
|
|
||||||
pub compaction_target_size: u64,
|
|
||||||
|
|
||||||
// How often to check if there's compaction work to be done.
|
|
||||||
pub compaction_period: Duration,
|
|
||||||
|
|
||||||
pub gc_horizon: u64,
|
|
||||||
pub gc_period: Duration,
|
|
||||||
|
|
||||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||||
pub wait_lsn_timeout: Duration,
|
pub wait_lsn_timeout: Duration,
|
||||||
// How long to wait for WAL redo to complete.
|
// How long to wait for WAL redo to complete.
|
||||||
@@ -158,14 +120,6 @@ struct PageServerConfigBuilder {
|
|||||||
|
|
||||||
listen_http_addr: BuilderValue<String>,
|
listen_http_addr: BuilderValue<String>,
|
||||||
|
|
||||||
checkpoint_distance: BuilderValue<u64>,
|
|
||||||
|
|
||||||
compaction_target_size: BuilderValue<u64>,
|
|
||||||
compaction_period: BuilderValue<Duration>,
|
|
||||||
|
|
||||||
gc_horizon: BuilderValue<u64>,
|
|
||||||
gc_period: BuilderValue<Duration>,
|
|
||||||
|
|
||||||
wait_lsn_timeout: BuilderValue<Duration>,
|
wait_lsn_timeout: BuilderValue<Duration>,
|
||||||
wal_redo_timeout: BuilderValue<Duration>,
|
wal_redo_timeout: BuilderValue<Duration>,
|
||||||
|
|
||||||
@@ -194,13 +148,6 @@ impl Default for PageServerConfigBuilder {
|
|||||||
Self {
|
Self {
|
||||||
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
|
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||||
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||||
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
|
|
||||||
compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE),
|
|
||||||
compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
|
||||||
.expect("cannot parse default compaction period")),
|
|
||||||
gc_horizon: Set(DEFAULT_GC_HORIZON),
|
|
||||||
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
|
|
||||||
.expect("cannot parse default gc period")),
|
|
||||||
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||||
.expect("cannot parse default wait lsn timeout")),
|
.expect("cannot parse default wait lsn timeout")),
|
||||||
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||||
@@ -229,26 +176,6 @@ impl PageServerConfigBuilder {
|
|||||||
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
|
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
|
|
||||||
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn compaction_target_size(&mut self, compaction_target_size: u64) {
|
|
||||||
self.compaction_target_size = BuilderValue::Set(compaction_target_size)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn compaction_period(&mut self, compaction_period: Duration) {
|
|
||||||
self.compaction_period = BuilderValue::Set(compaction_period)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn gc_horizon(&mut self, gc_horizon: u64) {
|
|
||||||
self.gc_horizon = BuilderValue::Set(gc_horizon)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn gc_period(&mut self, gc_period: Duration) {
|
|
||||||
self.gc_period = BuilderValue::Set(gc_period)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
|
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
|
||||||
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
|
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
|
||||||
}
|
}
|
||||||
@@ -304,19 +231,6 @@ impl PageServerConfigBuilder {
|
|||||||
listen_http_addr: self
|
listen_http_addr: self
|
||||||
.listen_http_addr
|
.listen_http_addr
|
||||||
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
|
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
|
||||||
checkpoint_distance: self
|
|
||||||
.checkpoint_distance
|
|
||||||
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
|
|
||||||
compaction_target_size: self
|
|
||||||
.compaction_target_size
|
|
||||||
.ok_or(anyhow::anyhow!("missing compaction_target_size"))?,
|
|
||||||
compaction_period: self
|
|
||||||
.compaction_period
|
|
||||||
.ok_or(anyhow::anyhow!("missing compaction_period"))?,
|
|
||||||
gc_horizon: self
|
|
||||||
.gc_horizon
|
|
||||||
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
|
|
||||||
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
|
|
||||||
wait_lsn_timeout: self
|
wait_lsn_timeout: self
|
||||||
.wait_lsn_timeout
|
.wait_lsn_timeout
|
||||||
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
|
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
|
||||||
@@ -448,13 +362,6 @@ impl PageServerConf {
|
|||||||
match key {
|
match key {
|
||||||
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
|
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
|
||||||
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
|
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
|
||||||
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
|
|
||||||
"compaction_target_size" => {
|
|
||||||
builder.compaction_target_size(parse_toml_u64(key, item)?)
|
|
||||||
}
|
|
||||||
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
|
|
||||||
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
|
|
||||||
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
|
|
||||||
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
|
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
|
||||||
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
|
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
|
||||||
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
||||||
@@ -587,11 +494,6 @@ impl PageServerConf {
|
|||||||
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
|
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
|
||||||
PageServerConf {
|
PageServerConf {
|
||||||
id: ZNodeId(0),
|
id: ZNodeId(0),
|
||||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
|
||||||
compaction_target_size: 4 * 1024 * 1024,
|
|
||||||
compaction_period: Duration::from_secs(10),
|
|
||||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
|
||||||
gc_period: Duration::from_secs(10),
|
|
||||||
wait_lsn_timeout: Duration::from_secs(60),
|
wait_lsn_timeout: Duration::from_secs(60),
|
||||||
wal_redo_timeout: Duration::from_secs(60),
|
wal_redo_timeout: Duration::from_secs(60),
|
||||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||||
@@ -658,14 +560,6 @@ mod tests {
|
|||||||
listen_pg_addr = '127.0.0.1:64000'
|
listen_pg_addr = '127.0.0.1:64000'
|
||||||
listen_http_addr = '127.0.0.1:9898'
|
listen_http_addr = '127.0.0.1:9898'
|
||||||
|
|
||||||
checkpoint_distance = 111 # in bytes
|
|
||||||
|
|
||||||
compaction_target_size = 111 # in bytes
|
|
||||||
compaction_period = '111 s'
|
|
||||||
|
|
||||||
gc_period = '222 s'
|
|
||||||
gc_horizon = 222
|
|
||||||
|
|
||||||
wait_lsn_timeout = '111 s'
|
wait_lsn_timeout = '111 s'
|
||||||
wal_redo_timeout = '111 s'
|
wal_redo_timeout = '111 s'
|
||||||
|
|
||||||
@@ -697,11 +591,6 @@ id = 10
|
|||||||
id: ZNodeId(10),
|
id: ZNodeId(10),
|
||||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
|
||||||
compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE,
|
|
||||||
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
|
|
||||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
|
||||||
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
|
|
||||||
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
|
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
|
||||||
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
|
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
|
||||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||||
@@ -742,11 +631,6 @@ id = 10
|
|||||||
id: ZNodeId(10),
|
id: ZNodeId(10),
|
||||||
listen_pg_addr: "127.0.0.1:64000".to_string(),
|
listen_pg_addr: "127.0.0.1:64000".to_string(),
|
||||||
listen_http_addr: "127.0.0.1:9898".to_string(),
|
listen_http_addr: "127.0.0.1:9898".to_string(),
|
||||||
checkpoint_distance: 111,
|
|
||||||
compaction_target_size: 111,
|
|
||||||
compaction_period: Duration::from_secs(111),
|
|
||||||
gc_horizon: 222,
|
|
||||||
gc_period: Duration::from_secs(222),
|
|
||||||
wait_lsn_timeout: Duration::from_secs(111),
|
wait_lsn_timeout: Duration::from_secs(111),
|
||||||
wal_redo_timeout: Duration::from_secs(111),
|
wal_redo_timeout: Duration::from_secs(111),
|
||||||
superuser: "zzzz".to_string(),
|
superuser: "zzzz".to_string(),
|
||||||
|
|||||||
@@ -25,6 +25,12 @@ pub struct TenantCreateRequest {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
pub new_tenant_id: Option<ZTenantId>,
|
pub new_tenant_id: Option<ZTenantId>,
|
||||||
|
pub checkpoint_distance: Option<u64>,
|
||||||
|
pub compaction_target_size: Option<u64>,
|
||||||
|
pub compaction_period: Option<String>,
|
||||||
|
pub gc_horizon: Option<u64>,
|
||||||
|
pub gc_period: Option<String>,
|
||||||
|
pub pitr_interval: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
@@ -36,3 +42,17 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId
|
|||||||
pub struct StatusResponse {
|
pub struct StatusResponse {
|
||||||
pub id: ZNodeId,
|
pub id: ZNodeId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TenantCreateRequest {
|
||||||
|
pub fn new(new_tenant_id: Option<ZTenantId>) -> TenantCreateRequest {
|
||||||
|
TenantCreateRequest {
|
||||||
|
new_tenant_id,
|
||||||
|
checkpoint_distance: None,
|
||||||
|
compaction_target_size: None,
|
||||||
|
compaction_period: None,
|
||||||
|
gc_horizon: None,
|
||||||
|
gc_period: None,
|
||||||
|
pitr_interval: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -333,6 +333,16 @@ paths:
|
|||||||
new_tenant_id:
|
new_tenant_id:
|
||||||
type: string
|
type: string
|
||||||
format: hex
|
format: hex
|
||||||
|
gc_period:
|
||||||
|
type: string
|
||||||
|
gc_horizon:
|
||||||
|
type: integer
|
||||||
|
pitr_interval:
|
||||||
|
type: string
|
||||||
|
checkpoint_distance:
|
||||||
|
type: integer
|
||||||
|
compaction_period:
|
||||||
|
type: string
|
||||||
responses:
|
responses:
|
||||||
"201":
|
"201":
|
||||||
description: New tenant created successfully
|
description: New tenant created successfully
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ use super::models::{
|
|||||||
};
|
};
|
||||||
use crate::remote_storage::{schedule_timeline_download, RemoteIndex};
|
use crate::remote_storage::{schedule_timeline_download, RemoteIndex};
|
||||||
use crate::repository::Repository;
|
use crate::repository::Repository;
|
||||||
|
use crate::tenant_config::TenantConf;
|
||||||
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
|
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
|
||||||
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
|
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
|
||||||
|
|
||||||
@@ -290,6 +291,28 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||||
let remote_index = get_state(&request).remote_index.clone();
|
let remote_index = get_state(&request).remote_index.clone();
|
||||||
|
|
||||||
|
let mut tenant_conf = TenantConf::default();
|
||||||
|
if let Some(gc_period) = request_data.gc_period {
|
||||||
|
tenant_conf.gc_period =
|
||||||
|
humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?;
|
||||||
|
}
|
||||||
|
if let Some(gc_horizon) = request_data.gc_horizon {
|
||||||
|
tenant_conf.gc_horizon = gc_horizon;
|
||||||
|
}
|
||||||
|
if let Some(pitr_interval) = request_data.pitr_interval {
|
||||||
|
tenant_conf.pitr_interval =
|
||||||
|
humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?;
|
||||||
|
}
|
||||||
|
if let Some(checkpoint_distance) = request_data.checkpoint_distance {
|
||||||
|
tenant_conf.checkpoint_distance = checkpoint_distance;
|
||||||
|
}
|
||||||
|
if let Some(compaction_target_size) = request_data.compaction_target_size {
|
||||||
|
tenant_conf.compaction_target_size = compaction_target_size;
|
||||||
|
}
|
||||||
|
if let Some(compaction_period) = request_data.compaction_period {
|
||||||
|
tenant_conf.compaction_period =
|
||||||
|
humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?;
|
||||||
|
}
|
||||||
let target_tenant_id = request_data
|
let target_tenant_id = request_data
|
||||||
.new_tenant_id
|
.new_tenant_id
|
||||||
.map(ZTenantId::from)
|
.map(ZTenantId::from)
|
||||||
@@ -297,8 +320,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
|
|
||||||
let new_tenant_id = tokio::task::spawn_blocking(move || {
|
let new_tenant_id = tokio::task::spawn_blocking(move || {
|
||||||
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
|
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
|
||||||
|
let conf = get_config(&request);
|
||||||
|
|
||||||
tenant_mgr::create_tenant_repository(get_config(&request), target_tenant_id, remote_index)
|
tenant_mgr::create_tenant_repository(conf, tenant_conf, target_tenant_id, remote_index)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::from_err)??;
|
.map_err(ApiError::from_err)??;
|
||||||
|
|||||||
@@ -29,11 +29,13 @@ use std::ops::{Bound::Included, Deref, Range};
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{self, AtomicBool};
|
use std::sync::atomic::{self, AtomicBool};
|
||||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
|
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
|
||||||
use std::time::Instant;
|
use std::time::{Duration, Instant, SystemTime};
|
||||||
|
|
||||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||||
use crate::config::PageServerConf;
|
use crate::config::PageServerConf;
|
||||||
use crate::keyspace::KeySpace;
|
use crate::keyspace::KeySpace;
|
||||||
|
use crate::tenant_config::{TenantConf, TenantConfFile};
|
||||||
|
|
||||||
use crate::page_cache;
|
use crate::page_cache;
|
||||||
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
|
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
|
||||||
use crate::repository::{
|
use crate::repository::{
|
||||||
@@ -117,6 +119,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
|||||||
///
|
///
|
||||||
pub struct LayeredRepository {
|
pub struct LayeredRepository {
|
||||||
pub conf: &'static PageServerConf,
|
pub conf: &'static PageServerConf,
|
||||||
|
tenant_conf: TenantConf,
|
||||||
|
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
|
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
|
||||||
// This mutex prevents creation of new timelines during GC.
|
// This mutex prevents creation of new timelines during GC.
|
||||||
@@ -140,6 +144,10 @@ pub struct LayeredRepository {
|
|||||||
impl Repository for LayeredRepository {
|
impl Repository for LayeredRepository {
|
||||||
type Timeline = LayeredTimeline;
|
type Timeline = LayeredTimeline;
|
||||||
|
|
||||||
|
fn get_tenant_conf(&self) -> TenantConf {
|
||||||
|
self.tenant_conf
|
||||||
|
}
|
||||||
|
|
||||||
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> {
|
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> {
|
||||||
let timelines = self.timelines.lock().unwrap();
|
let timelines = self.timelines.lock().unwrap();
|
||||||
self.get_timeline_internal(timelineid, &timelines)
|
self.get_timeline_internal(timelineid, &timelines)
|
||||||
@@ -269,6 +277,7 @@ impl Repository for LayeredRepository {
|
|||||||
&self,
|
&self,
|
||||||
target_timelineid: Option<ZTimelineId>,
|
target_timelineid: Option<ZTimelineId>,
|
||||||
horizon: u64,
|
horizon: u64,
|
||||||
|
pitr: Duration,
|
||||||
checkpoint_before_gc: bool,
|
checkpoint_before_gc: bool,
|
||||||
) -> Result<GcResult> {
|
) -> Result<GcResult> {
|
||||||
let timeline_str = target_timelineid
|
let timeline_str = target_timelineid
|
||||||
@@ -278,7 +287,7 @@ impl Repository for LayeredRepository {
|
|||||||
STORAGE_TIME
|
STORAGE_TIME
|
||||||
.with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str])
|
.with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str])
|
||||||
.observe_closure_duration(|| {
|
.observe_closure_duration(|| {
|
||||||
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
|
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -540,6 +549,7 @@ impl LayeredRepository {
|
|||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
|
tenant_conf: TenantConf,
|
||||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
remote_index: RemoteIndex,
|
remote_index: RemoteIndex,
|
||||||
@@ -548,6 +558,7 @@ impl LayeredRepository {
|
|||||||
LayeredRepository {
|
LayeredRepository {
|
||||||
tenantid,
|
tenantid,
|
||||||
conf,
|
conf,
|
||||||
|
tenant_conf,
|
||||||
timelines: Mutex::new(HashMap::new()),
|
timelines: Mutex::new(HashMap::new()),
|
||||||
gc_cs: Mutex::new(()),
|
gc_cs: Mutex::new(()),
|
||||||
walredo_mgr,
|
walredo_mgr,
|
||||||
@@ -556,6 +567,58 @@ impl LayeredRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Save tenant's config to file
|
||||||
|
pub fn save_tenantconf(
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
tenantid: ZTenantId,
|
||||||
|
tenant_conf: TenantConf,
|
||||||
|
first_save: bool,
|
||||||
|
) -> Result<()> {
|
||||||
|
let _enter = info_span!("saving tenantconf").entered();
|
||||||
|
let path = TenantConf::tenantconf_path(conf, tenantid);
|
||||||
|
info!("save tenantconf to {}", path.display());
|
||||||
|
|
||||||
|
// use OpenOptions to ensure file presence is consistent with first_save
|
||||||
|
let mut file = VirtualFile::open_with_options(
|
||||||
|
&path,
|
||||||
|
OpenOptions::new().write(true).create_new(first_save),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let data = TenantConfFile::from(tenant_conf);
|
||||||
|
let tenantconf_bytes = data.to_bytes().context("Failed to get tenantconf bytes")?;
|
||||||
|
|
||||||
|
if file.write(&tenantconf_bytes)? != tenantconf_bytes.len() {
|
||||||
|
bail!("Could not write all the tenantconf bytes in a single call");
|
||||||
|
}
|
||||||
|
file.sync_all()?;
|
||||||
|
|
||||||
|
// fsync the parent directory to ensure the directory entry is durable
|
||||||
|
if first_save {
|
||||||
|
let tenant_dir = File::open(
|
||||||
|
&path
|
||||||
|
.parent()
|
||||||
|
.expect("Tenantconf should always have a parent dir"),
|
||||||
|
)?;
|
||||||
|
tenant_dir.sync_all()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load_tenantconf(
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
tenantid: ZTenantId,
|
||||||
|
) -> Result<TenantConf> {
|
||||||
|
let path = TenantConf::tenantconf_path(conf, tenantid);
|
||||||
|
info!("loading tenantconf from {}", path.display());
|
||||||
|
let tenantconf_bytes = std::fs::read(&path)?;
|
||||||
|
let tenant_conf_file = TenantConfFile::from_bytes(&tenantconf_bytes);
|
||||||
|
match tenant_conf_file {
|
||||||
|
Ok(tenant_conf) => return Ok(tenant_conf.body),
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Save timeline metadata to file
|
/// Save timeline metadata to file
|
||||||
fn save_metadata(
|
fn save_metadata(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
@@ -631,6 +694,7 @@ impl LayeredRepository {
|
|||||||
&self,
|
&self,
|
||||||
target_timelineid: Option<ZTimelineId>,
|
target_timelineid: Option<ZTimelineId>,
|
||||||
horizon: u64,
|
horizon: u64,
|
||||||
|
pitr: Duration,
|
||||||
checkpoint_before_gc: bool,
|
checkpoint_before_gc: bool,
|
||||||
) -> Result<GcResult> {
|
) -> Result<GcResult> {
|
||||||
let _span_guard =
|
let _span_guard =
|
||||||
@@ -706,7 +770,7 @@ impl LayeredRepository {
|
|||||||
timeline.checkpoint(CheckpointConfig::Forced)?;
|
timeline.checkpoint(CheckpointConfig::Forced)?;
|
||||||
info!("timeline {} checkpoint_before_gc done", timelineid);
|
info!("timeline {} checkpoint_before_gc done", timelineid);
|
||||||
}
|
}
|
||||||
timeline.update_gc_info(branchpoints, cutoff);
|
timeline.update_gc_info(branchpoints, cutoff, pitr);
|
||||||
let result = timeline.gc()?;
|
let result = timeline.gc()?;
|
||||||
|
|
||||||
totals += result;
|
totals += result;
|
||||||
@@ -823,6 +887,11 @@ struct GcInfo {
|
|||||||
///
|
///
|
||||||
/// FIXME: is this inclusive or exclusive?
|
/// FIXME: is this inclusive or exclusive?
|
||||||
cutoff: Lsn,
|
cutoff: Lsn,
|
||||||
|
|
||||||
|
/// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()'
|
||||||
|
/// minus 'pitr_interval'
|
||||||
|
///
|
||||||
|
pitr: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Public interface functions
|
/// Public interface functions
|
||||||
@@ -1028,6 +1097,7 @@ impl LayeredTimeline {
|
|||||||
gc_info: RwLock::new(GcInfo {
|
gc_info: RwLock::new(GcInfo {
|
||||||
retain_lsns: Vec::new(),
|
retain_lsns: Vec::new(),
|
||||||
cutoff: Lsn(0),
|
cutoff: Lsn(0),
|
||||||
|
pitr: Duration::ZERO,
|
||||||
}),
|
}),
|
||||||
|
|
||||||
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
|
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
|
||||||
@@ -1370,9 +1440,11 @@ impl LayeredTimeline {
|
|||||||
///
|
///
|
||||||
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
|
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
|
||||||
let last_lsn = self.get_last_record_lsn();
|
let last_lsn = self.get_last_record_lsn();
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
|
||||||
|
let tenant_conf = repo.get_tenant_conf();
|
||||||
|
|
||||||
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
|
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
|
||||||
if distance >= self.conf.checkpoint_distance.into() {
|
if distance >= tenant_conf.checkpoint_distance.into() {
|
||||||
self.freeze_inmem_layer(true);
|
self.freeze_inmem_layer(true);
|
||||||
self.last_freeze_at.store(last_lsn);
|
self.last_freeze_at.store(last_lsn);
|
||||||
}
|
}
|
||||||
@@ -1578,13 +1650,18 @@ impl LayeredTimeline {
|
|||||||
// above. Rewrite it.
|
// above. Rewrite it.
|
||||||
let _compaction_cs = self.compaction_cs.lock().unwrap();
|
let _compaction_cs = self.compaction_cs.lock().unwrap();
|
||||||
|
|
||||||
let target_file_size = self.conf.checkpoint_distance;
|
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
|
||||||
|
let tenant_conf = repo.get_tenant_conf();
|
||||||
|
|
||||||
|
let target_file_size = tenant_conf.checkpoint_distance;
|
||||||
|
|
||||||
// Define partitioning schema if needed
|
// Define partitioning schema if needed
|
||||||
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
|
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
|
||||||
{
|
{
|
||||||
let (partitioning, lsn) =
|
let (partitioning, lsn) = pgdir.repartition(
|
||||||
pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?;
|
self.get_last_record_lsn(),
|
||||||
|
tenant_conf.compaction_target_size,
|
||||||
|
)?;
|
||||||
let timer = self.create_images_time_histo.start_timer();
|
let timer = self.create_images_time_histo.start_timer();
|
||||||
// 2. Create new image layers for partitions that have been modified
|
// 2. Create new image layers for partitions that have been modified
|
||||||
// "enough".
|
// "enough".
|
||||||
@@ -1810,10 +1887,11 @@ impl LayeredTimeline {
|
|||||||
/// the latest LSN subtracted by a constant, and doesn't do anything smart
|
/// the latest LSN subtracted by a constant, and doesn't do anything smart
|
||||||
/// to figure out what read-only nodes might actually need.)
|
/// to figure out what read-only nodes might actually need.)
|
||||||
///
|
///
|
||||||
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn) {
|
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn, pitr: Duration) {
|
||||||
let mut gc_info = self.gc_info.write().unwrap();
|
let mut gc_info = self.gc_info.write().unwrap();
|
||||||
gc_info.retain_lsns = retain_lsns;
|
gc_info.retain_lsns = retain_lsns;
|
||||||
gc_info.cutoff = cutoff;
|
gc_info.cutoff = cutoff;
|
||||||
|
gc_info.pitr = pitr;
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
@@ -1824,7 +1902,7 @@ impl LayeredTimeline {
|
|||||||
/// obsolete.
|
/// obsolete.
|
||||||
///
|
///
|
||||||
fn gc(&self) -> Result<GcResult> {
|
fn gc(&self) -> Result<GcResult> {
|
||||||
let now = Instant::now();
|
let now = SystemTime::now();
|
||||||
let mut result: GcResult = Default::default();
|
let mut result: GcResult = Default::default();
|
||||||
let disk_consistent_lsn = self.get_disk_consistent_lsn();
|
let disk_consistent_lsn = self.get_disk_consistent_lsn();
|
||||||
|
|
||||||
@@ -1833,6 +1911,7 @@ impl LayeredTimeline {
|
|||||||
let gc_info = self.gc_info.read().unwrap();
|
let gc_info = self.gc_info.read().unwrap();
|
||||||
let retain_lsns = &gc_info.retain_lsns;
|
let retain_lsns = &gc_info.retain_lsns;
|
||||||
let cutoff = gc_info.cutoff;
|
let cutoff = gc_info.cutoff;
|
||||||
|
let pitr = gc_info.pitr;
|
||||||
|
|
||||||
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
|
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
|
||||||
|
|
||||||
@@ -1850,8 +1929,9 @@ impl LayeredTimeline {
|
|||||||
//
|
//
|
||||||
// Garbage collect the layer if all conditions are satisfied:
|
// Garbage collect the layer if all conditions are satisfied:
|
||||||
// 1. it is older than cutoff LSN;
|
// 1. it is older than cutoff LSN;
|
||||||
// 2. it doesn't need to be retained for 'retain_lsns';
|
// 2. it is older than PITR interval;
|
||||||
// 3. newer on-disk image layers cover the layer's whole key range
|
// 3. it doesn't need to be retained for 'retain_lsns';
|
||||||
|
// 4. newer on-disk image layers cover the layer's whole key range
|
||||||
//
|
//
|
||||||
let mut layers = self.layers.lock().unwrap();
|
let mut layers = self.layers.lock().unwrap();
|
||||||
'outer: for l in layers.iter_historic_layers() {
|
'outer: for l in layers.iter_historic_layers() {
|
||||||
@@ -1877,8 +1957,27 @@ impl LayeredTimeline {
|
|||||||
result.layers_needed_by_cutoff += 1;
|
result.layers_needed_by_cutoff += 1;
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
}
|
}
|
||||||
|
// 2. It is newer than PiTR interval?
|
||||||
// 2. Is it needed by a child branch?
|
// We use modification time of layer file to estimate update time.
|
||||||
|
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
|
||||||
|
// It is not expected that users will need high precision here. And this estimation
|
||||||
|
// is conservative: modification time of file is always newer than actual time of version
|
||||||
|
// creation. So it is safe for users.
|
||||||
|
//
|
||||||
|
if let Ok(metadata) = fs::metadata(&l.filename()) {
|
||||||
|
let last_modified = metadata.modified()?;
|
||||||
|
if now.duration_since(last_modified)? < pitr {
|
||||||
|
debug!(
|
||||||
|
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
|
||||||
|
l.filename().display(),
|
||||||
|
last_modified,
|
||||||
|
pitr
|
||||||
|
);
|
||||||
|
result.layers_needed_by_pitr += 1;
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 3. Is it needed by a child branch?
|
||||||
// NOTE With that wee would keep data that
|
// NOTE With that wee would keep data that
|
||||||
// might be referenced by child branches forever.
|
// might be referenced by child branches forever.
|
||||||
// We can track this in child timeline GC and delete parent layers when
|
// We can track this in child timeline GC and delete parent layers when
|
||||||
@@ -1897,7 +1996,7 @@ impl LayeredTimeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Is there a later on-disk layer for this relation?
|
// 4. Is there a later on-disk layer for this relation?
|
||||||
//
|
//
|
||||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||||
@@ -1938,7 +2037,7 @@ impl LayeredTimeline {
|
|||||||
result.layers_removed += 1;
|
result.layers_removed += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
result.elapsed = now.elapsed();
|
result.elapsed = now.elapsed()?;
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2215,7 +2314,8 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let cutoff = tline.get_last_record_lsn();
|
let cutoff = tline.get_last_record_lsn();
|
||||||
tline.update_gc_info(Vec::new(), cutoff);
|
|
||||||
|
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||||
tline.compact()?;
|
tline.compact()?;
|
||||||
tline.gc()?;
|
tline.gc()?;
|
||||||
@@ -2285,7 +2385,7 @@ pub mod tests {
|
|||||||
// Perform a cycle of checkpoint, compaction, and GC
|
// Perform a cycle of checkpoint, compaction, and GC
|
||||||
println!("checkpointing {}", lsn);
|
println!("checkpointing {}", lsn);
|
||||||
let cutoff = tline.get_last_record_lsn();
|
let cutoff = tline.get_last_record_lsn();
|
||||||
tline.update_gc_info(Vec::new(), cutoff);
|
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||||
tline.compact()?;
|
tline.compact()?;
|
||||||
tline.gc()?;
|
tline.gc()?;
|
||||||
@@ -2362,7 +2462,7 @@ pub mod tests {
|
|||||||
// Perform a cycle of checkpoint, compaction, and GC
|
// Perform a cycle of checkpoint, compaction, and GC
|
||||||
println!("checkpointing {}", lsn);
|
println!("checkpointing {}", lsn);
|
||||||
let cutoff = tline.get_last_record_lsn();
|
let cutoff = tline.get_last_record_lsn();
|
||||||
tline.update_gc_info(Vec::new(), cutoff);
|
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||||
tline.compact()?;
|
tline.compact()?;
|
||||||
tline.gc()?;
|
tline.gc()?;
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ pub mod pgdatadir_mapping;
|
|||||||
pub mod reltag;
|
pub mod reltag;
|
||||||
pub mod remote_storage;
|
pub mod remote_storage;
|
||||||
pub mod repository;
|
pub mod repository;
|
||||||
|
pub mod tenant_config;
|
||||||
pub mod tenant_mgr;
|
pub mod tenant_mgr;
|
||||||
pub mod tenant_threads;
|
pub mod tenant_threads;
|
||||||
pub mod thread_mgr;
|
pub mod thread_mgr;
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use std::net::TcpListener;
|
|||||||
use std::str;
|
use std::str;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::{Arc, RwLockReadGuard};
|
use std::sync::{Arc, RwLockReadGuard};
|
||||||
|
use std::time::Duration;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use zenith_metrics::{register_histogram_vec, HistogramVec};
|
use zenith_metrics::{register_histogram_vec, HistogramVec};
|
||||||
use zenith_utils::auth::{self, JwtAuth};
|
use zenith_utils::auth::{self, JwtAuth};
|
||||||
@@ -672,6 +673,37 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||||
|
} else if query_string.starts_with("show ") {
|
||||||
|
// show <tenant_id>
|
||||||
|
let (_, params_raw) = query_string.split_at("show ".len());
|
||||||
|
let params = params_raw.split(' ').collect::<Vec<_>>();
|
||||||
|
ensure!(params.len() == 1, "invalid param number for config command");
|
||||||
|
let tenantid = ZTenantId::from_str(params[0])?;
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||||
|
let tenant_conf = repo.get_tenant_conf();
|
||||||
|
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||||
|
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||||
|
RowDescriptor::int8_col(b"compaction_target_size"),
|
||||||
|
RowDescriptor::int8_col(b"compaction_period"),
|
||||||
|
RowDescriptor::int8_col(b"gc_horizon"),
|
||||||
|
RowDescriptor::int8_col(b"gc_period"),
|
||||||
|
RowDescriptor::int8_col(b"pitr_interval"),
|
||||||
|
]))?
|
||||||
|
.write_message_noflush(&BeMessage::DataRow(&[
|
||||||
|
Some(tenant_conf.checkpoint_distance.to_string().as_bytes()),
|
||||||
|
Some(tenant_conf.compaction_target_size.to_string().as_bytes()),
|
||||||
|
Some(
|
||||||
|
tenant_conf
|
||||||
|
.compaction_period
|
||||||
|
.as_secs()
|
||||||
|
.to_string()
|
||||||
|
.as_bytes(),
|
||||||
|
),
|
||||||
|
Some(tenant_conf.gc_horizon.to_string().as_bytes()),
|
||||||
|
Some(tenant_conf.gc_period.as_secs().to_string().as_bytes()),
|
||||||
|
Some(tenant_conf.pitr_interval.as_secs().to_string().as_bytes()),
|
||||||
|
]))?
|
||||||
|
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||||
} else if query_string.starts_with("do_gc ") {
|
} else if query_string.starts_with("do_gc ") {
|
||||||
// Run GC immediately on given timeline.
|
// Run GC immediately on given timeline.
|
||||||
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.
|
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.
|
||||||
@@ -689,16 +721,21 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
|
|
||||||
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
|
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||||
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||||
|
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||||
|
let tenant_conf = repo.get_tenant_conf();
|
||||||
|
|
||||||
let gc_horizon: u64 = caps
|
let gc_horizon: u64 = caps
|
||||||
.get(4)
|
.get(4)
|
||||||
.map(|h| h.as_str().parse())
|
.map(|h| h.as_str().parse())
|
||||||
.unwrap_or(Ok(self.conf.gc_horizon))?;
|
.unwrap_or(Ok(tenant_conf.gc_horizon))?;
|
||||||
|
|
||||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
|
let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
|
||||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||||
RowDescriptor::int8_col(b"layers_total"),
|
RowDescriptor::int8_col(b"layers_total"),
|
||||||
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
|
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
|
||||||
|
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
|
||||||
RowDescriptor::int8_col(b"layers_needed_by_branches"),
|
RowDescriptor::int8_col(b"layers_needed_by_branches"),
|
||||||
RowDescriptor::int8_col(b"layers_not_updated"),
|
RowDescriptor::int8_col(b"layers_not_updated"),
|
||||||
RowDescriptor::int8_col(b"layers_removed"),
|
RowDescriptor::int8_col(b"layers_removed"),
|
||||||
@@ -707,6 +744,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
.write_message_noflush(&BeMessage::DataRow(&[
|
.write_message_noflush(&BeMessage::DataRow(&[
|
||||||
Some(result.layers_total.to_string().as_bytes()),
|
Some(result.layers_total.to_string().as_bytes()),
|
||||||
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
|
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
|
||||||
|
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
|
||||||
Some(result.layers_needed_by_branches.to_string().as_bytes()),
|
Some(result.layers_needed_by_branches.to_string().as_bytes()),
|
||||||
Some(result.layers_not_updated.to_string().as_bytes()),
|
Some(result.layers_not_updated.to_string().as_bytes()),
|
||||||
Some(result.layers_removed.to_string().as_bytes()),
|
Some(result.layers_removed.to_string().as_bytes()),
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use crate::layered_repository::metadata::TimelineMetadata;
|
use crate::layered_repository::metadata::TimelineMetadata;
|
||||||
use crate::remote_storage::RemoteIndex;
|
use crate::remote_storage::RemoteIndex;
|
||||||
|
use crate::tenant_config::TenantConf;
|
||||||
use crate::walrecord::ZenithWalRecord;
|
use crate::walrecord::ZenithWalRecord;
|
||||||
use crate::CheckpointConfig;
|
use crate::CheckpointConfig;
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
@@ -249,6 +250,7 @@ pub trait Repository: Send + Sync {
|
|||||||
&self,
|
&self,
|
||||||
timelineid: Option<ZTimelineId>,
|
timelineid: Option<ZTimelineId>,
|
||||||
horizon: u64,
|
horizon: u64,
|
||||||
|
pitr: Duration,
|
||||||
checkpoint_before_gc: bool,
|
checkpoint_before_gc: bool,
|
||||||
) -> Result<GcResult>;
|
) -> Result<GcResult>;
|
||||||
|
|
||||||
@@ -261,6 +263,8 @@ pub trait Repository: Send + Sync {
|
|||||||
|
|
||||||
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
|
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
|
||||||
fn get_remote_index(&self) -> &RemoteIndex;
|
fn get_remote_index(&self) -> &RemoteIndex;
|
||||||
|
|
||||||
|
fn get_tenant_conf(&self) -> TenantConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A timeline, that belongs to the current repository.
|
/// A timeline, that belongs to the current repository.
|
||||||
@@ -303,6 +307,7 @@ impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
|
|||||||
pub struct GcResult {
|
pub struct GcResult {
|
||||||
pub layers_total: u64,
|
pub layers_total: u64,
|
||||||
pub layers_needed_by_cutoff: u64,
|
pub layers_needed_by_cutoff: u64,
|
||||||
|
pub layers_needed_by_pitr: u64,
|
||||||
pub layers_needed_by_branches: u64,
|
pub layers_needed_by_branches: u64,
|
||||||
pub layers_not_updated: u64,
|
pub layers_not_updated: u64,
|
||||||
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
|
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
|
||||||
@@ -313,6 +318,7 @@ pub struct GcResult {
|
|||||||
impl AddAssign for GcResult {
|
impl AddAssign for GcResult {
|
||||||
fn add_assign(&mut self, other: Self) {
|
fn add_assign(&mut self, other: Self) {
|
||||||
self.layers_total += other.layers_total;
|
self.layers_total += other.layers_total;
|
||||||
|
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
|
||||||
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
|
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
|
||||||
self.layers_needed_by_branches += other.layers_needed_by_branches;
|
self.layers_needed_by_branches += other.layers_needed_by_branches;
|
||||||
self.layers_not_updated += other.layers_not_updated;
|
self.layers_not_updated += other.layers_not_updated;
|
||||||
@@ -454,6 +460,7 @@ pub mod repo_harness {
|
|||||||
|
|
||||||
pub struct RepoHarness<'a> {
|
pub struct RepoHarness<'a> {
|
||||||
pub conf: &'static PageServerConf,
|
pub conf: &'static PageServerConf,
|
||||||
|
pub tenant_conf: TenantConf,
|
||||||
pub tenant_id: ZTenantId,
|
pub tenant_id: ZTenantId,
|
||||||
|
|
||||||
pub lock_guard: (
|
pub lock_guard: (
|
||||||
@@ -485,12 +492,15 @@ pub mod repo_harness {
|
|||||||
// OK in a test.
|
// OK in a test.
|
||||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||||
|
|
||||||
|
let tenant_conf = TenantConf::dummy_conf();
|
||||||
|
|
||||||
let tenant_id = ZTenantId::generate();
|
let tenant_id = ZTenantId::generate();
|
||||||
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
|
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
|
||||||
fs::create_dir_all(conf.timelines_path(&tenant_id))?;
|
fs::create_dir_all(conf.timelines_path(&tenant_id))?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
conf,
|
conf,
|
||||||
|
tenant_conf,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
lock_guard,
|
lock_guard,
|
||||||
})
|
})
|
||||||
@@ -505,6 +515,7 @@ pub mod repo_harness {
|
|||||||
|
|
||||||
let repo = LayeredRepository::new(
|
let repo = LayeredRepository::new(
|
||||||
self.conf,
|
self.conf,
|
||||||
|
self.tenant_conf,
|
||||||
walredo_mgr,
|
walredo_mgr,
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
RemoteIndex::empty(),
|
RemoteIndex::empty(),
|
||||||
@@ -720,7 +731,7 @@ mod tests {
|
|||||||
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
|
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
|
||||||
// and compaction works. But it does set the 'cutoff' point so that the cross check
|
// and compaction works. But it does set the 'cutoff' point so that the cross check
|
||||||
// below should fail.
|
// below should fail.
|
||||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||||
|
|
||||||
// try to branch at lsn 25, should fail because we already garbage collected the data
|
// try to branch at lsn 25, should fail because we already garbage collected the data
|
||||||
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
|
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
|
||||||
@@ -771,7 +782,7 @@ mod tests {
|
|||||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||||
make_some_layers(tline.as_ref(), Lsn(0x20))?;
|
make_some_layers(tline.as_ref(), Lsn(0x20))?;
|
||||||
|
|
||||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||||
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
|
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
|
||||||
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
|
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
|
||||||
match tline.get(*TEST_KEY, Lsn(0x25)) {
|
match tline.get(*TEST_KEY, Lsn(0x25)) {
|
||||||
@@ -794,7 +805,7 @@ mod tests {
|
|||||||
.get_timeline_load(NEW_TIMELINE_ID)
|
.get_timeline_load(NEW_TIMELINE_ID)
|
||||||
.expect("Should have a local timeline");
|
.expect("Should have a local timeline");
|
||||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||||
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
|
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -813,7 +824,7 @@ mod tests {
|
|||||||
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
|
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
|
||||||
|
|
||||||
// run gc on parent
|
// run gc on parent
|
||||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||||
|
|
||||||
// Check that the data is still accessible on the branch.
|
// Check that the data is still accessible on the branch.
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|||||||
220
pageserver/src/tenant_config.rs
Normal file
220
pageserver/src/tenant_config.rs
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
//! Functions for handling per-tenant configuration options
|
||||||
|
//!
|
||||||
|
//! If tenant is created with --config option,
|
||||||
|
//! the tenant-specific config will be stored in tenant's directory.
|
||||||
|
//! Otherwise, global pageserver's config is used.
|
||||||
|
//!
|
||||||
|
//! If the tenant config file is corrupted, the tenant will be disabled.
|
||||||
|
//! We cannot use global or default config instead, because wrong settings
|
||||||
|
//! may lead to a data loss.
|
||||||
|
//!
|
||||||
|
use crate::config::PageServerConf;
|
||||||
|
use crate::STORAGE_FORMAT_VERSION;
|
||||||
|
use anyhow::ensure;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::time::Duration;
|
||||||
|
use zenith_utils::bin_ser::BeSer;
|
||||||
|
use zenith_utils::zid::ZTenantId;
|
||||||
|
|
||||||
|
pub const TENANT_CONFIG_NAME: &str = "config";
|
||||||
|
|
||||||
|
pub mod defaults {
|
||||||
|
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||||
|
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||||
|
// which is good for now to trigger bugs.
|
||||||
|
// This parameter actually determines L0 layer file size.
|
||||||
|
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||||
|
|
||||||
|
// Target file size, when creating image and delta layers.
|
||||||
|
// This parameter determines L1 layer file size.
|
||||||
|
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
||||||
|
|
||||||
|
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
|
||||||
|
|
||||||
|
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||||
|
pub const DEFAULT_GC_PERIOD: &str = "100 s";
|
||||||
|
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct TenantConf {
|
||||||
|
pub checkpoint_distance: u64,
|
||||||
|
pub compaction_target_size: u64,
|
||||||
|
pub compaction_period: Duration,
|
||||||
|
pub gc_horizon: u64,
|
||||||
|
pub gc_period: Duration,
|
||||||
|
pub pitr_interval: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// We assume that a write of up to TENANTCONF_MAX_SIZE bytes is atomic.
|
||||||
|
///
|
||||||
|
/// This is the same assumption that PostgreSQL makes with the control file,
|
||||||
|
/// see PG_CONTROL_MAX_SAFE_SIZE
|
||||||
|
const TENANTCONF_MAX_SIZE: usize = 512;
|
||||||
|
|
||||||
|
/// TenantConfFile is stored on disk in tenant's directory
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct TenantConfFile {
|
||||||
|
hdr: TenantConfHeader,
|
||||||
|
pub body: TenantConf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
struct TenantConfHeader {
|
||||||
|
checksum: u32, // CRC of serialized tenantconf body
|
||||||
|
size: u16, // size of serialized tenantconf
|
||||||
|
format_version: u16, // storage format version (used for compatibility checks)
|
||||||
|
}
|
||||||
|
const TENANTCONF_HDR_SIZE: usize = std::mem::size_of::<TenantConfHeader>();
|
||||||
|
|
||||||
|
impl TenantConfFile {
|
||||||
|
pub fn new(
|
||||||
|
checkpoint_distance: u64,
|
||||||
|
compaction_target_size: u64,
|
||||||
|
compaction_period: Duration,
|
||||||
|
gc_horizon: u64,
|
||||||
|
gc_period: Duration,
|
||||||
|
pitr_interval: Duration,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
hdr: TenantConfHeader {
|
||||||
|
checksum: 0,
|
||||||
|
size: 0,
|
||||||
|
format_version: STORAGE_FORMAT_VERSION,
|
||||||
|
},
|
||||||
|
body: TenantConf {
|
||||||
|
gc_period,
|
||||||
|
gc_horizon,
|
||||||
|
pitr_interval,
|
||||||
|
checkpoint_distance,
|
||||||
|
compaction_period,
|
||||||
|
compaction_target_size,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from(tconf: TenantConf) -> Self {
|
||||||
|
Self {
|
||||||
|
hdr: TenantConfHeader {
|
||||||
|
checksum: 0,
|
||||||
|
size: 0,
|
||||||
|
format_version: STORAGE_FORMAT_VERSION,
|
||||||
|
},
|
||||||
|
body: TenantConf {
|
||||||
|
gc_period: tconf.gc_period,
|
||||||
|
gc_horizon: tconf.gc_horizon,
|
||||||
|
pitr_interval: tconf.pitr_interval,
|
||||||
|
checkpoint_distance: tconf.checkpoint_distance,
|
||||||
|
compaction_period: tconf.compaction_period,
|
||||||
|
compaction_target_size: tconf.compaction_target_size,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_bytes(tenantconf_bytes: &[u8]) -> anyhow::Result<Self> {
|
||||||
|
ensure!(
|
||||||
|
tenantconf_bytes.len() == TENANTCONF_MAX_SIZE,
|
||||||
|
"tenantconf bytes size is wrong"
|
||||||
|
);
|
||||||
|
let hdr = TenantConfHeader::des(&tenantconf_bytes[0..TENANTCONF_HDR_SIZE])?;
|
||||||
|
ensure!(
|
||||||
|
hdr.format_version == STORAGE_FORMAT_VERSION,
|
||||||
|
"format version mismatch"
|
||||||
|
);
|
||||||
|
let tenantconf_size = hdr.size as usize;
|
||||||
|
ensure!(
|
||||||
|
tenantconf_size <= TENANTCONF_MAX_SIZE,
|
||||||
|
"corrupted tenantconf file"
|
||||||
|
);
|
||||||
|
let calculated_checksum =
|
||||||
|
crc32c::crc32c(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size]);
|
||||||
|
ensure!(
|
||||||
|
hdr.checksum == calculated_checksum,
|
||||||
|
"tenantconf checksum mismatch"
|
||||||
|
);
|
||||||
|
let body = TenantConf::des(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size])?;
|
||||||
|
|
||||||
|
Ok(TenantConfFile { hdr, body })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
|
||||||
|
let body_bytes = self.body.ser()?;
|
||||||
|
let tenantconf_size = TENANTCONF_HDR_SIZE + body_bytes.len();
|
||||||
|
let hdr = TenantConfHeader {
|
||||||
|
size: tenantconf_size as u16,
|
||||||
|
format_version: STORAGE_FORMAT_VERSION,
|
||||||
|
checksum: crc32c::crc32c(&body_bytes),
|
||||||
|
};
|
||||||
|
let hdr_bytes = hdr.ser()?;
|
||||||
|
let mut tenantconf_bytes = vec![0u8; TENANTCONF_MAX_SIZE];
|
||||||
|
tenantconf_bytes[0..TENANTCONF_HDR_SIZE].copy_from_slice(&hdr_bytes);
|
||||||
|
tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size].copy_from_slice(&body_bytes);
|
||||||
|
Ok(tenantconf_bytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TenantConf {
|
||||||
|
pub fn default() -> TenantConf {
|
||||||
|
use defaults::*;
|
||||||
|
|
||||||
|
TenantConf {
|
||||||
|
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||||
|
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||||
|
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||||
|
.expect("cannot parse default compaction period"),
|
||||||
|
gc_horizon: DEFAULT_GC_HORIZON,
|
||||||
|
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||||
|
.expect("cannot parse default gc period"),
|
||||||
|
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||||
|
.expect("cannot parse default PITR interval"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Points to a place in pageserver's local directory,
|
||||||
|
/// where certain tenant's tenantconf file should be located.
|
||||||
|
pub fn tenantconf_path(conf: &'static PageServerConf, tenantid: ZTenantId) -> PathBuf {
|
||||||
|
conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn dummy_conf() -> Self {
|
||||||
|
TenantConf {
|
||||||
|
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||||
|
compaction_target_size: 4 * 1024 * 1024,
|
||||||
|
compaction_period: Duration::from_secs(10),
|
||||||
|
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||||
|
gc_period: Duration::from_secs(10),
|
||||||
|
pitr_interval: Duration::from_secs(60 * 60),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tenantconf_serializes_correctly() {
|
||||||
|
let original_tenantconf = TenantConfFile::new(
|
||||||
|
111,
|
||||||
|
111,
|
||||||
|
Duration::from_secs(111),
|
||||||
|
222,
|
||||||
|
Duration::from_secs(111),
|
||||||
|
Duration::from_secs(60 * 60),
|
||||||
|
);
|
||||||
|
|
||||||
|
let tenantconf_bytes = original_tenantconf
|
||||||
|
.to_bytes()
|
||||||
|
.expect("Should serialize correct tenantconf to bytes");
|
||||||
|
|
||||||
|
let deserialized_tenantconf = TenantConfFile::from_bytes(&tenantconf_bytes)
|
||||||
|
.expect("Should deserialize its own bytes");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
deserialized_tenantconf.body, original_tenantconf.body,
|
||||||
|
"Tenantconf that was serialized to bytes and deserialized back should not change"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ use crate::config::PageServerConf;
|
|||||||
use crate::layered_repository::LayeredRepository;
|
use crate::layered_repository::LayeredRepository;
|
||||||
use crate::remote_storage::RemoteIndex;
|
use crate::remote_storage::RemoteIndex;
|
||||||
use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
||||||
|
use crate::tenant_config::TenantConf;
|
||||||
use crate::thread_mgr;
|
use crate::thread_mgr;
|
||||||
use crate::thread_mgr::ThreadKind;
|
use crate::thread_mgr::ThreadKind;
|
||||||
use crate::timelines;
|
use crate::timelines;
|
||||||
@@ -63,7 +64,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
|
|||||||
TENANTS.lock().unwrap()
|
TENANTS.lock().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets up wal redo manager and repository for tenant. Reduces code duplocation.
|
// Sets up wal redo manager and repository for tenant. Reduces code duplication.
|
||||||
// Used during pageserver startup, or when new tenant is attached to pageserver.
|
// Used during pageserver startup, or when new tenant is attached to pageserver.
|
||||||
pub fn load_local_repo(
|
pub fn load_local_repo(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
@@ -75,9 +76,15 @@ pub fn load_local_repo(
|
|||||||
// Set up a WAL redo manager, for applying WAL records.
|
// Set up a WAL redo manager, for applying WAL records.
|
||||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||||
|
|
||||||
|
// Try to load config file
|
||||||
|
let tenant_conf = LayeredRepository::load_tenantconf(conf, tenant_id)
|
||||||
|
.with_context(|| format!("Failed to load tenant state for id {}", tenant_id))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Set up an object repository, for actual data storage.
|
// Set up an object repository, for actual data storage.
|
||||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||||
conf,
|
conf,
|
||||||
|
tenant_conf,
|
||||||
Arc::new(walredo_mgr),
|
Arc::new(walredo_mgr),
|
||||||
tenant_id,
|
tenant_id,
|
||||||
remote_index.clone(),
|
remote_index.clone(),
|
||||||
@@ -174,6 +181,7 @@ pub fn shutdown_all_tenants() {
|
|||||||
|
|
||||||
pub fn create_tenant_repository(
|
pub fn create_tenant_repository(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
|
tenant_conf: TenantConf,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
remote_index: RemoteIndex,
|
remote_index: RemoteIndex,
|
||||||
) -> Result<Option<ZTenantId>> {
|
) -> Result<Option<ZTenantId>> {
|
||||||
@@ -186,6 +194,7 @@ pub fn create_tenant_repository(
|
|||||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
||||||
let repo = timelines::create_repo(
|
let repo = timelines::create_repo(
|
||||||
conf,
|
conf,
|
||||||
|
tenant_conf,
|
||||||
tenantid,
|
tenantid,
|
||||||
CreateRepo::Real {
|
CreateRepo::Real {
|
||||||
wal_redo_manager,
|
wal_redo_manager,
|
||||||
@@ -210,7 +219,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
|
|||||||
/// Change the state of a tenant to Active and launch its compactor and GC
|
/// Change the state of a tenant to Active and launch its compactor and GC
|
||||||
/// threads. If the tenant was already in Active state or Stopping, does nothing.
|
/// threads. If the tenant was already in Active state or Stopping, does nothing.
|
||||||
///
|
///
|
||||||
pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> {
|
pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
|
||||||
let mut m = access_tenants();
|
let mut m = access_tenants();
|
||||||
let tenant = m
|
let tenant = m
|
||||||
.get_mut(&tenant_id)
|
.get_mut(&tenant_id)
|
||||||
@@ -230,7 +239,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
|
|||||||
None,
|
None,
|
||||||
"Compactor thread",
|
"Compactor thread",
|
||||||
true,
|
true,
|
||||||
move || crate::tenant_threads::compact_loop(tenant_id, conf),
|
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let gc_spawn_result = thread_mgr::spawn(
|
let gc_spawn_result = thread_mgr::spawn(
|
||||||
@@ -239,7 +248,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
|
|||||||
None,
|
None,
|
||||||
"GC thread",
|
"GC thread",
|
||||||
true,
|
true,
|
||||||
move || crate::tenant_threads::gc_loop(tenant_id, conf),
|
move || crate::tenant_threads::gc_loop(tenant_id),
|
||||||
)
|
)
|
||||||
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
|
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
|
||||||
|
|
||||||
@@ -251,7 +260,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
|
|||||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||||
return gc_spawn_result;
|
return gc_spawn_result;
|
||||||
}
|
}
|
||||||
|
|
||||||
tenant.state = TenantState::Active;
|
tenant.state = TenantState::Active;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -290,7 +298,7 @@ pub fn get_timeline_for_tenant_load(
|
|||||||
.get_timeline_load(timelineid)
|
.get_timeline_load(timelineid)
|
||||||
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?;
|
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?;
|
||||||
|
|
||||||
let repartition_distance = tenant.repo.conf.checkpoint_distance / 10;
|
let repartition_distance = tenant.repo.get_tenant_conf().checkpoint_distance / 10;
|
||||||
|
|
||||||
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
|
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
|
||||||
page_tline.init_logical_size()?;
|
page_tline.init_logical_size()?;
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
//! This module contains functions to serve per-tenant background processes,
|
//! This module contains functions to serve per-tenant background processes,
|
||||||
//! such as compaction and GC
|
//! such as compaction and GC
|
||||||
use crate::config::PageServerConf;
|
|
||||||
use crate::repository::Repository;
|
use crate::repository::Repository;
|
||||||
use crate::tenant_mgr;
|
use crate::tenant_mgr;
|
||||||
use crate::tenant_mgr::TenantState;
|
use crate::tenant_mgr::TenantState;
|
||||||
@@ -12,8 +11,8 @@ use zenith_utils::zid::ZTenantId;
|
|||||||
///
|
///
|
||||||
/// Compaction thread's main loop
|
/// Compaction thread's main loop
|
||||||
///
|
///
|
||||||
pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
|
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||||
if let Err(err) = compact_loop_ext(tenantid, conf) {
|
if let Err(err) = compact_loop_ext(tenantid) {
|
||||||
error!("compact loop terminated with error: {:?}", err);
|
error!("compact loop terminated with error: {:?}", err);
|
||||||
Err(err)
|
Err(err)
|
||||||
} else {
|
} else {
|
||||||
@@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
|
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||||
|
let tenant_conf = repo.get_tenant_conf();
|
||||||
|
|
||||||
std::thread::sleep(conf.compaction_period);
|
std::thread::sleep(tenant_conf.compaction_period);
|
||||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
trace!("compaction thread for tenant {} waking up", tenantid);
|
||||||
|
|
||||||
// Compact timelines
|
// Compact timelines
|
||||||
@@ -46,23 +47,29 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
|
|||||||
///
|
///
|
||||||
/// GC thread's main loop
|
/// GC thread's main loop
|
||||||
///
|
///
|
||||||
pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
|
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("gc thread for tenant {} waking up", tenantid);
|
trace!("gc thread for tenant {} waking up", tenantid);
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||||
|
let tenant_conf = repo.get_tenant_conf();
|
||||||
|
|
||||||
// Garbage collect old files that are not needed for PITR anymore
|
// Garbage collect old files that are not needed for PITR anymore
|
||||||
if conf.gc_horizon > 0 {
|
if tenant_conf.gc_horizon > 0 {
|
||||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
repo.gc_iteration(
|
||||||
repo.gc_iteration(None, conf.gc_horizon, false)?;
|
None,
|
||||||
|
tenant_conf.gc_horizon,
|
||||||
|
tenant_conf.pitr_interval,
|
||||||
|
false,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Write it in more adequate way using
|
// TODO Write it in more adequate way using
|
||||||
// condvar.wait_timeout() or something
|
// condvar.wait_timeout() or something
|
||||||
let mut sleep_time = conf.gc_period.as_secs();
|
let mut sleep_time = tenant_conf.gc_period.as_secs();
|
||||||
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
||||||
{
|
{
|
||||||
sleep_time -= 1;
|
sleep_time -= 1;
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ use crate::{
|
|||||||
layered_repository::metadata::TimelineMetadata,
|
layered_repository::metadata::TimelineMetadata,
|
||||||
remote_storage::RemoteIndex,
|
remote_storage::RemoteIndex,
|
||||||
repository::{LocalTimelineState, Repository},
|
repository::{LocalTimelineState, Repository},
|
||||||
|
tenant_config::TenantConf,
|
||||||
DatadirTimeline, RepositoryImpl,
|
DatadirTimeline, RepositoryImpl,
|
||||||
};
|
};
|
||||||
use crate::{import_datadir, LOG_FILE_NAME};
|
use crate::{import_datadir, LOG_FILE_NAME};
|
||||||
@@ -149,8 +150,8 @@ pub fn init_pageserver(
|
|||||||
|
|
||||||
if let Some(tenant_id) = create_tenant {
|
if let Some(tenant_id) = create_tenant {
|
||||||
println!("initializing tenantid {}", tenant_id);
|
println!("initializing tenantid {}", tenant_id);
|
||||||
let repo =
|
let repo = create_repo(conf, TenantConf::default(), tenant_id, CreateRepo::Dummy)
|
||||||
create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?;
|
.context("failed to create repo")?;
|
||||||
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
|
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
|
||||||
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
|
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
|
||||||
.context("failed to create initial timeline")?;
|
.context("failed to create initial timeline")?;
|
||||||
@@ -173,6 +174,7 @@ pub enum CreateRepo {
|
|||||||
|
|
||||||
pub fn create_repo(
|
pub fn create_repo(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
|
tenant_conf: TenantConf,
|
||||||
tenant_id: ZTenantId,
|
tenant_id: ZTenantId,
|
||||||
create_repo: CreateRepo,
|
create_repo: CreateRepo,
|
||||||
) -> Result<Arc<RepositoryImpl>> {
|
) -> Result<Arc<RepositoryImpl>> {
|
||||||
@@ -209,8 +211,12 @@ pub fn create_repo(
|
|||||||
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
|
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
|
||||||
info!("created directory structure in {}", repo_dir.display());
|
info!("created directory structure in {}", repo_dir.display());
|
||||||
|
|
||||||
|
// Save tenant's config
|
||||||
|
LayeredRepository::save_tenantconf(conf, tenant_id, tenant_conf, true)?;
|
||||||
|
|
||||||
Ok(Arc::new(LayeredRepository::new(
|
Ok(Arc::new(LayeredRepository::new(
|
||||||
conf,
|
conf,
|
||||||
|
tenant_conf,
|
||||||
wal_redo_manager,
|
wal_redo_manager,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
remote_index,
|
remote_index,
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ pub fn launch_wal_receiver(
|
|||||||
receivers.insert((tenantid, timelineid), receiver);
|
receivers.insert((tenantid, timelineid), receiver);
|
||||||
|
|
||||||
// Update tenant state and start tenant threads, if they are not running yet.
|
// Update tenant state and start tenant threads, if they are not running yet.
|
||||||
tenant_mgr::activate_tenant(conf, tenantid)?;
|
tenant_mgr::activate_tenant(tenantid)?;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
31
test_runner/batch_others/test_tenant_conf.py
Normal file
31
test_runner/batch_others/test_tenant_conf.py
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
from contextlib import closing
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from fixtures.zenith_fixtures import ZenithEnvBuilder
|
||||||
|
|
||||||
|
|
||||||
|
def test_tenant_config(zenith_env_builder: ZenithEnvBuilder):
|
||||||
|
env = zenith_env_builder.init_start()
|
||||||
|
"""Test per tenant configuration"""
|
||||||
|
tenant = env.zenith_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
'gc_period': '100sec',
|
||||||
|
'gc_horizon': '1024',
|
||||||
|
'pitr_interval': '3600sec',
|
||||||
|
'checkpoint_distance': '10000',
|
||||||
|
'compaction_period': '60sec',
|
||||||
|
'compaction_target_size': '1048576'
|
||||||
|
})
|
||||||
|
|
||||||
|
env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant)
|
||||||
|
pg = env.postgres.create_start(
|
||||||
|
"test_tenant_conf",
|
||||||
|
"main",
|
||||||
|
tenant,
|
||||||
|
)
|
||||||
|
|
||||||
|
with closing(env.pageserver.connect()) as psconn:
|
||||||
|
with psconn.cursor() as pscur:
|
||||||
|
pscur.execute(f"show {tenant.hex}")
|
||||||
|
assert pscur.fetchone() == (10000, 1048576, 60, 1024, 100, 3600)
|
||||||
@@ -853,13 +853,20 @@ class ZenithCli:
|
|||||||
self.env = env
|
self.env = env
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
|
def create_tenant(self,
|
||||||
|
tenant_id: Optional[uuid.UUID] = None,
|
||||||
|
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
|
||||||
"""
|
"""
|
||||||
Creates a new tenant, returns its id and its initial timeline's id.
|
Creates a new tenant, returns its id and its initial timeline's id.
|
||||||
"""
|
"""
|
||||||
if tenant_id is None:
|
if tenant_id is None:
|
||||||
tenant_id = uuid.uuid4()
|
tenant_id = uuid.uuid4()
|
||||||
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
|
if conf is None:
|
||||||
|
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
|
||||||
|
else:
|
||||||
|
res = self.raw_cli(
|
||||||
|
['tenant', 'create', '--tenant-id', tenant_id.hex] +
|
||||||
|
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
|
||||||
res.check_returncode()
|
res.check_returncode()
|
||||||
return tenant_id
|
return tenant_id
|
||||||
|
|
||||||
|
|||||||
@@ -164,7 +164,8 @@ fn main() -> Result<()> {
|
|||||||
.subcommand(App::new("create")
|
.subcommand(App::new("create")
|
||||||
.arg(tenant_id_arg.clone())
|
.arg(tenant_id_arg.clone())
|
||||||
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
|
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
|
||||||
)
|
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
|
||||||
|
)
|
||||||
)
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
App::new("pageserver")
|
App::new("pageserver")
|
||||||
@@ -521,8 +522,12 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
|
|||||||
}
|
}
|
||||||
Some(("create", create_match)) => {
|
Some(("create", create_match)) => {
|
||||||
let initial_tenant_id = parse_tenant_id(create_match)?;
|
let initial_tenant_id = parse_tenant_id(create_match)?;
|
||||||
|
let tenant_conf: HashMap<_, _> = create_match
|
||||||
|
.values_of("config")
|
||||||
|
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||||
|
.unwrap_or_default();
|
||||||
let new_tenant_id = pageserver
|
let new_tenant_id = pageserver
|
||||||
.tenant_create(initial_tenant_id)?
|
.tenant_create(initial_tenant_id, tenant_conf)?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
|
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
|
||||||
})?;
|
})?;
|
||||||
|
|||||||
Reference in New Issue
Block a user