mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
9 Commits
conrad/pro
...
tenant_con
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d3b15fc3e | ||
|
|
c93d31f9db | ||
|
|
47ece2be8b | ||
|
|
3da8233f08 | ||
|
|
51c7ceb1d9 | ||
|
|
5169d664c5 | ||
|
|
1bb364b5bc | ||
|
|
7e6eff4969 | ||
|
|
baac8ac410 |
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::net::TcpStream;
|
||||
use std::path::PathBuf;
|
||||
@@ -342,10 +343,25 @@ impl PageServerNode {
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: Option<ZTenantId>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<Option<ZTenantId>> {
|
||||
let tenant_id_string = self
|
||||
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
|
||||
.json(&TenantCreateRequest { new_tenant_id })
|
||||
.json(&TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
checkpoint_distance: settings
|
||||
.get("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>().unwrap()),
|
||||
compaction_target_size: settings
|
||||
.get("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>().unwrap()),
|
||||
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
|
||||
gc_horizon: settings
|
||||
.get("gc_horizon")
|
||||
.map(|x| x.parse::<u64>().unwrap()),
|
||||
gc_period: settings.get("gc_period").map(|x| x.to_string()),
|
||||
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?
|
||||
.json::<Option<String>>()?;
|
||||
|
||||
@@ -27,21 +27,6 @@ pub mod defaults {
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
||||
|
||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: &str = "100 s";
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
|
||||
@@ -62,13 +47,6 @@ pub mod defaults {
|
||||
#listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
|
||||
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
|
||||
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
|
||||
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
|
||||
|
||||
#gc_period = '{DEFAULT_GC_PERIOD}'
|
||||
#gc_horizon = {DEFAULT_GC_HORIZON}
|
||||
|
||||
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
|
||||
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
|
||||
|
||||
@@ -94,22 +72,6 @@ pub struct PageServerConf {
|
||||
/// Example (default): 127.0.0.1:9898
|
||||
pub listen_http_addr: String,
|
||||
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
// page server crashes.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub checkpoint_distance: u64,
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub compaction_target_size: u64,
|
||||
|
||||
// How often to check if there's compaction work to be done.
|
||||
pub compaction_period: Duration,
|
||||
|
||||
pub gc_horizon: u64,
|
||||
pub gc_period: Duration,
|
||||
|
||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
pub wait_lsn_timeout: Duration,
|
||||
// How long to wait for WAL redo to complete.
|
||||
@@ -158,14 +120,6 @@ struct PageServerConfigBuilder {
|
||||
|
||||
listen_http_addr: BuilderValue<String>,
|
||||
|
||||
checkpoint_distance: BuilderValue<u64>,
|
||||
|
||||
compaction_target_size: BuilderValue<u64>,
|
||||
compaction_period: BuilderValue<Duration>,
|
||||
|
||||
gc_horizon: BuilderValue<u64>,
|
||||
gc_period: BuilderValue<Duration>,
|
||||
|
||||
wait_lsn_timeout: BuilderValue<Duration>,
|
||||
wal_redo_timeout: BuilderValue<Duration>,
|
||||
|
||||
@@ -194,13 +148,6 @@ impl Default for PageServerConfigBuilder {
|
||||
Self {
|
||||
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
|
||||
compaction_target_size: Set(DEFAULT_COMPACTION_TARGET_SIZE),
|
||||
compaction_period: Set(humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period")),
|
||||
gc_horizon: Set(DEFAULT_GC_HORIZON),
|
||||
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period")),
|
||||
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
@@ -229,26 +176,6 @@ impl PageServerConfigBuilder {
|
||||
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
|
||||
}
|
||||
|
||||
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
|
||||
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn compaction_target_size(&mut self, compaction_target_size: u64) {
|
||||
self.compaction_target_size = BuilderValue::Set(compaction_target_size)
|
||||
}
|
||||
|
||||
pub fn compaction_period(&mut self, compaction_period: Duration) {
|
||||
self.compaction_period = BuilderValue::Set(compaction_period)
|
||||
}
|
||||
|
||||
pub fn gc_horizon(&mut self, gc_horizon: u64) {
|
||||
self.gc_horizon = BuilderValue::Set(gc_horizon)
|
||||
}
|
||||
|
||||
pub fn gc_period(&mut self, gc_period: Duration) {
|
||||
self.gc_period = BuilderValue::Set(gc_period)
|
||||
}
|
||||
|
||||
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
|
||||
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
|
||||
}
|
||||
@@ -304,19 +231,6 @@ impl PageServerConfigBuilder {
|
||||
listen_http_addr: self
|
||||
.listen_http_addr
|
||||
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
|
||||
checkpoint_distance: self
|
||||
.checkpoint_distance
|
||||
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
|
||||
compaction_target_size: self
|
||||
.compaction_target_size
|
||||
.ok_or(anyhow::anyhow!("missing compaction_target_size"))?,
|
||||
compaction_period: self
|
||||
.compaction_period
|
||||
.ok_or(anyhow::anyhow!("missing compaction_period"))?,
|
||||
gc_horizon: self
|
||||
.gc_horizon
|
||||
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
|
||||
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
|
||||
wait_lsn_timeout: self
|
||||
.wait_lsn_timeout
|
||||
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
|
||||
@@ -448,13 +362,6 @@ impl PageServerConf {
|
||||
match key {
|
||||
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
|
||||
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
|
||||
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
|
||||
"compaction_target_size" => {
|
||||
builder.compaction_target_size(parse_toml_u64(key, item)?)
|
||||
}
|
||||
"compaction_period" => builder.compaction_period(parse_toml_duration(key, item)?),
|
||||
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
|
||||
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
|
||||
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
|
||||
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
|
||||
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
||||
@@ -587,11 +494,6 @@ impl PageServerConf {
|
||||
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
|
||||
PageServerConf {
|
||||
id: ZNodeId(0),
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
compaction_target_size: 4 * 1024 * 1024,
|
||||
compaction_period: Duration::from_secs(10),
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
gc_period: Duration::from_secs(10),
|
||||
wait_lsn_timeout: Duration::from_secs(60),
|
||||
wal_redo_timeout: Duration::from_secs(60),
|
||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||
@@ -658,14 +560,6 @@ mod tests {
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
|
||||
checkpoint_distance = 111 # in bytes
|
||||
|
||||
compaction_target_size = 111 # in bytes
|
||||
compaction_period = '111 s'
|
||||
|
||||
gc_period = '222 s'
|
||||
gc_horizon = 222
|
||||
|
||||
wait_lsn_timeout = '111 s'
|
||||
wal_redo_timeout = '111 s'
|
||||
|
||||
@@ -697,11 +591,6 @@ id = 10
|
||||
id: ZNodeId(10),
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
compaction_target_size: defaults::DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(defaults::DEFAULT_COMPACTION_PERIOD)?,
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
|
||||
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
|
||||
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
|
||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||
@@ -742,11 +631,6 @@ id = 10
|
||||
id: ZNodeId(10),
|
||||
listen_pg_addr: "127.0.0.1:64000".to_string(),
|
||||
listen_http_addr: "127.0.0.1:9898".to_string(),
|
||||
checkpoint_distance: 111,
|
||||
compaction_target_size: 111,
|
||||
compaction_period: Duration::from_secs(111),
|
||||
gc_horizon: 222,
|
||||
gc_period: Duration::from_secs(222),
|
||||
wait_lsn_timeout: Duration::from_secs(111),
|
||||
wal_redo_timeout: Duration::from_secs(111),
|
||||
superuser: "zzzz".to_string(),
|
||||
|
||||
@@ -25,6 +25,12 @@ pub struct TenantCreateRequest {
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_tenant_id: Option<ZTenantId>,
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
pub gc_horizon: Option<u64>,
|
||||
pub gc_period: Option<String>,
|
||||
pub pitr_interval: Option<String>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -36,3 +42,17 @@ pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId
|
||||
pub struct StatusResponse {
|
||||
pub id: ZNodeId,
|
||||
}
|
||||
|
||||
impl TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: Option<ZTenantId>) -> TenantCreateRequest {
|
||||
TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
checkpoint_distance: None,
|
||||
compaction_target_size: None,
|
||||
compaction_period: None,
|
||||
gc_horizon: None,
|
||||
gc_period: None,
|
||||
pitr_interval: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -333,6 +333,16 @@ paths:
|
||||
new_tenant_id:
|
||||
type: string
|
||||
format: hex
|
||||
gc_period:
|
||||
type: string
|
||||
gc_horizon:
|
||||
type: integer
|
||||
pitr_interval:
|
||||
type: string
|
||||
checkpoint_distance:
|
||||
type: integer
|
||||
compaction_period:
|
||||
type: string
|
||||
responses:
|
||||
"201":
|
||||
description: New tenant created successfully
|
||||
|
||||
@@ -23,6 +23,7 @@ use super::models::{
|
||||
};
|
||||
use crate::remote_storage::{schedule_timeline_download, RemoteIndex};
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_config::TenantConf;
|
||||
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
|
||||
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
|
||||
|
||||
@@ -290,6 +291,28 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
let remote_index = get_state(&request).remote_index.clone();
|
||||
|
||||
let mut tenant_conf = TenantConf::default();
|
||||
if let Some(gc_period) = request_data.gc_period {
|
||||
tenant_conf.gc_period =
|
||||
humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?;
|
||||
}
|
||||
if let Some(gc_horizon) = request_data.gc_horizon {
|
||||
tenant_conf.gc_horizon = gc_horizon;
|
||||
}
|
||||
if let Some(pitr_interval) = request_data.pitr_interval {
|
||||
tenant_conf.pitr_interval =
|
||||
humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?;
|
||||
}
|
||||
if let Some(checkpoint_distance) = request_data.checkpoint_distance {
|
||||
tenant_conf.checkpoint_distance = checkpoint_distance;
|
||||
}
|
||||
if let Some(compaction_target_size) = request_data.compaction_target_size {
|
||||
tenant_conf.compaction_target_size = compaction_target_size;
|
||||
}
|
||||
if let Some(compaction_period) = request_data.compaction_period {
|
||||
tenant_conf.compaction_period =
|
||||
humantime::parse_duration(&compaction_period).map_err(ApiError::from_err)?;
|
||||
}
|
||||
let target_tenant_id = request_data
|
||||
.new_tenant_id
|
||||
.map(ZTenantId::from)
|
||||
@@ -297,8 +320,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
|
||||
let new_tenant_id = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
|
||||
let conf = get_config(&request);
|
||||
|
||||
tenant_mgr::create_tenant_repository(get_config(&request), target_tenant_id, remote_index)
|
||||
tenant_mgr::create_tenant_repository(conf, tenant_conf, target_tenant_id, remote_index)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
@@ -29,11 +29,13 @@ use std::ops::{Bound::Included, Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{self, AtomicBool};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::KeySpace;
|
||||
use crate::tenant_config::{TenantConf, TenantConfFile};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
|
||||
use crate::repository::{
|
||||
@@ -117,6 +119,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
||||
///
|
||||
pub struct LayeredRepository {
|
||||
pub conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConf,
|
||||
|
||||
tenantid: ZTenantId,
|
||||
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
@@ -140,6 +144,10 @@ pub struct LayeredRepository {
|
||||
impl Repository for LayeredRepository {
|
||||
type Timeline = LayeredTimeline;
|
||||
|
||||
fn get_tenant_conf(&self) -> TenantConf {
|
||||
self.tenant_conf
|
||||
}
|
||||
|
||||
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.get_timeline_internal(timelineid, &timelines)
|
||||
@@ -269,6 +277,7 @@ impl Repository for LayeredRepository {
|
||||
&self,
|
||||
target_timelineid: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
let timeline_str = target_timelineid
|
||||
@@ -278,7 +287,7 @@ impl Repository for LayeredRepository {
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["gc", &self.tenantid.to_string(), &timeline_str])
|
||||
.observe_closure_duration(|| {
|
||||
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
|
||||
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -540,6 +549,7 @@ impl LayeredRepository {
|
||||
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConf,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
tenantid: ZTenantId,
|
||||
remote_index: RemoteIndex,
|
||||
@@ -548,6 +558,7 @@ impl LayeredRepository {
|
||||
LayeredRepository {
|
||||
tenantid,
|
||||
conf,
|
||||
tenant_conf,
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
gc_cs: Mutex::new(()),
|
||||
walredo_mgr,
|
||||
@@ -556,6 +567,58 @@ impl LayeredRepository {
|
||||
}
|
||||
}
|
||||
|
||||
/// Save tenant's config to file
|
||||
pub fn save_tenantconf(
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
tenant_conf: TenantConf,
|
||||
first_save: bool,
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("saving tenantconf").entered();
|
||||
let path = TenantConf::tenantconf_path(conf, tenantid);
|
||||
info!("save tenantconf to {}", path.display());
|
||||
|
||||
// use OpenOptions to ensure file presence is consistent with first_save
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
&path,
|
||||
OpenOptions::new().write(true).create_new(first_save),
|
||||
)?;
|
||||
|
||||
let data = TenantConfFile::from(tenant_conf);
|
||||
let tenantconf_bytes = data.to_bytes().context("Failed to get tenantconf bytes")?;
|
||||
|
||||
if file.write(&tenantconf_bytes)? != tenantconf_bytes.len() {
|
||||
bail!("Could not write all the tenantconf bytes in a single call");
|
||||
}
|
||||
file.sync_all()?;
|
||||
|
||||
// fsync the parent directory to ensure the directory entry is durable
|
||||
if first_save {
|
||||
let tenant_dir = File::open(
|
||||
&path
|
||||
.parent()
|
||||
.expect("Tenantconf should always have a parent dir"),
|
||||
)?;
|
||||
tenant_dir.sync_all()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load_tenantconf(
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<TenantConf> {
|
||||
let path = TenantConf::tenantconf_path(conf, tenantid);
|
||||
info!("loading tenantconf from {}", path.display());
|
||||
let tenantconf_bytes = std::fs::read(&path)?;
|
||||
let tenant_conf_file = TenantConfFile::from_bytes(&tenantconf_bytes);
|
||||
match tenant_conf_file {
|
||||
Ok(tenant_conf) => return Ok(tenant_conf.body),
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
}
|
||||
|
||||
/// Save timeline metadata to file
|
||||
fn save_metadata(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -631,6 +694,7 @@ impl LayeredRepository {
|
||||
&self,
|
||||
target_timelineid: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
let _span_guard =
|
||||
@@ -706,7 +770,7 @@ impl LayeredRepository {
|
||||
timeline.checkpoint(CheckpointConfig::Forced)?;
|
||||
info!("timeline {} checkpoint_before_gc done", timelineid);
|
||||
}
|
||||
timeline.update_gc_info(branchpoints, cutoff);
|
||||
timeline.update_gc_info(branchpoints, cutoff, pitr);
|
||||
let result = timeline.gc()?;
|
||||
|
||||
totals += result;
|
||||
@@ -823,6 +887,11 @@ struct GcInfo {
|
||||
///
|
||||
/// FIXME: is this inclusive or exclusive?
|
||||
cutoff: Lsn,
|
||||
|
||||
/// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()'
|
||||
/// minus 'pitr_interval'
|
||||
///
|
||||
pitr: Duration,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
@@ -1028,6 +1097,7 @@ impl LayeredTimeline {
|
||||
gc_info: RwLock::new(GcInfo {
|
||||
retain_lsns: Vec::new(),
|
||||
cutoff: Lsn(0),
|
||||
pitr: Duration::ZERO,
|
||||
}),
|
||||
|
||||
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
|
||||
@@ -1370,9 +1440,11 @@ impl LayeredTimeline {
|
||||
///
|
||||
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
|
||||
let tenant_conf = repo.get_tenant_conf();
|
||||
|
||||
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
|
||||
if distance >= self.conf.checkpoint_distance.into() {
|
||||
if distance >= tenant_conf.checkpoint_distance.into() {
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
}
|
||||
@@ -1578,13 +1650,18 @@ impl LayeredTimeline {
|
||||
// above. Rewrite it.
|
||||
let _compaction_cs = self.compaction_cs.lock().unwrap();
|
||||
|
||||
let target_file_size = self.conf.checkpoint_distance;
|
||||
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
|
||||
let tenant_conf = repo.get_tenant_conf();
|
||||
|
||||
let target_file_size = tenant_conf.checkpoint_distance;
|
||||
|
||||
// Define partitioning schema if needed
|
||||
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
|
||||
{
|
||||
let (partitioning, lsn) =
|
||||
pgdir.repartition(self.get_last_record_lsn(), self.conf.compaction_target_size)?;
|
||||
let (partitioning, lsn) = pgdir.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
tenant_conf.compaction_target_size,
|
||||
)?;
|
||||
let timer = self.create_images_time_histo.start_timer();
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
@@ -1810,10 +1887,11 @@ impl LayeredTimeline {
|
||||
/// the latest LSN subtracted by a constant, and doesn't do anything smart
|
||||
/// to figure out what read-only nodes might actually need.)
|
||||
///
|
||||
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn) {
|
||||
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn, pitr: Duration) {
|
||||
let mut gc_info = self.gc_info.write().unwrap();
|
||||
gc_info.retain_lsns = retain_lsns;
|
||||
gc_info.cutoff = cutoff;
|
||||
gc_info.pitr = pitr;
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1824,7 +1902,7 @@ impl LayeredTimeline {
|
||||
/// obsolete.
|
||||
///
|
||||
fn gc(&self) -> Result<GcResult> {
|
||||
let now = Instant::now();
|
||||
let now = SystemTime::now();
|
||||
let mut result: GcResult = Default::default();
|
||||
let disk_consistent_lsn = self.get_disk_consistent_lsn();
|
||||
|
||||
@@ -1833,6 +1911,7 @@ impl LayeredTimeline {
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
let retain_lsns = &gc_info.retain_lsns;
|
||||
let cutoff = gc_info.cutoff;
|
||||
let pitr = gc_info.pitr;
|
||||
|
||||
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
|
||||
|
||||
@@ -1850,8 +1929,9 @@ impl LayeredTimeline {
|
||||
//
|
||||
// Garbage collect the layer if all conditions are satisfied:
|
||||
// 1. it is older than cutoff LSN;
|
||||
// 2. it doesn't need to be retained for 'retain_lsns';
|
||||
// 3. newer on-disk image layers cover the layer's whole key range
|
||||
// 2. it is older than PITR interval;
|
||||
// 3. it doesn't need to be retained for 'retain_lsns';
|
||||
// 4. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
@@ -1877,8 +1957,27 @@ impl LayeredTimeline {
|
||||
result.layers_needed_by_cutoff += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// 2. Is it needed by a child branch?
|
||||
// 2. It is newer than PiTR interval?
|
||||
// We use modification time of layer file to estimate update time.
|
||||
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
|
||||
// It is not expected that users will need high precision here. And this estimation
|
||||
// is conservative: modification time of file is always newer than actual time of version
|
||||
// creation. So it is safe for users.
|
||||
//
|
||||
if let Ok(metadata) = fs::metadata(&l.filename()) {
|
||||
let last_modified = metadata.modified()?;
|
||||
if now.duration_since(last_modified)? < pitr {
|
||||
debug!(
|
||||
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
|
||||
l.filename().display(),
|
||||
last_modified,
|
||||
pitr
|
||||
);
|
||||
result.layers_needed_by_pitr += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
// 3. Is it needed by a child branch?
|
||||
// NOTE With that wee would keep data that
|
||||
// might be referenced by child branches forever.
|
||||
// We can track this in child timeline GC and delete parent layers when
|
||||
@@ -1897,7 +1996,7 @@ impl LayeredTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Is there a later on-disk layer for this relation?
|
||||
// 4. Is there a later on-disk layer for this relation?
|
||||
//
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
@@ -1938,7 +2037,7 @@ impl LayeredTimeline {
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
|
||||
result.elapsed = now.elapsed();
|
||||
result.elapsed = now.elapsed()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@@ -2215,7 +2314,8 @@ pub mod tests {
|
||||
}
|
||||
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline.update_gc_info(Vec::new(), cutoff);
|
||||
|
||||
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||
tline.compact()?;
|
||||
tline.gc()?;
|
||||
@@ -2285,7 +2385,7 @@ pub mod tests {
|
||||
// Perform a cycle of checkpoint, compaction, and GC
|
||||
println!("checkpointing {}", lsn);
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline.update_gc_info(Vec::new(), cutoff);
|
||||
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||
tline.compact()?;
|
||||
tline.gc()?;
|
||||
@@ -2362,7 +2462,7 @@ pub mod tests {
|
||||
// Perform a cycle of checkpoint, compaction, and GC
|
||||
println!("checkpointing {}", lsn);
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline.update_gc_info(Vec::new(), cutoff);
|
||||
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||
tline.compact()?;
|
||||
tline.gc()?;
|
||||
|
||||
@@ -10,6 +10,7 @@ pub mod pgdatadir_mapping;
|
||||
pub mod reltag;
|
||||
pub mod remote_storage;
|
||||
pub mod repository;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_threads;
|
||||
pub mod thread_mgr;
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::net::TcpListener;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
use zenith_metrics::{register_histogram_vec, HistogramVec};
|
||||
use zenith_utils::auth::{self, JwtAuth};
|
||||
@@ -672,6 +673,37 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
}
|
||||
}
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("show ") {
|
||||
// show <tenant_id>
|
||||
let (_, params_raw) = query_string.split_at("show ".len());
|
||||
let params = params_raw.split(' ').collect::<Vec<_>>();
|
||||
ensure!(params.len() == 1, "invalid param number for config command");
|
||||
let tenantid = ZTenantId::from_str(params[0])?;
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let tenant_conf = repo.get_tenant_conf();
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"compaction_target_size"),
|
||||
RowDescriptor::int8_col(b"compaction_period"),
|
||||
RowDescriptor::int8_col(b"gc_horizon"),
|
||||
RowDescriptor::int8_col(b"gc_period"),
|
||||
RowDescriptor::int8_col(b"pitr_interval"),
|
||||
]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(tenant_conf.checkpoint_distance.to_string().as_bytes()),
|
||||
Some(tenant_conf.compaction_target_size.to_string().as_bytes()),
|
||||
Some(
|
||||
tenant_conf
|
||||
.compaction_period
|
||||
.as_secs()
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
),
|
||||
Some(tenant_conf.gc_horizon.to_string().as_bytes()),
|
||||
Some(tenant_conf.gc_period.as_secs().to_string().as_bytes()),
|
||||
Some(tenant_conf.pitr_interval.as_secs().to_string().as_bytes()),
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("do_gc ") {
|
||||
// Run GC immediately on given timeline.
|
||||
// FIXME: This is just for tests. See test_runner/batch_others/test_gc.py.
|
||||
@@ -689,16 +721,21 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
|
||||
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let tenant_conf = repo.get_tenant_conf();
|
||||
|
||||
let gc_horizon: u64 = caps
|
||||
.get(4)
|
||||
.map(|h| h.as_str().parse())
|
||||
.unwrap_or(Ok(self.conf.gc_horizon))?;
|
||||
.unwrap_or(Ok(tenant_conf.gc_horizon))?;
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
|
||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"layers_total"),
|
||||
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
|
||||
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
|
||||
RowDescriptor::int8_col(b"layers_needed_by_branches"),
|
||||
RowDescriptor::int8_col(b"layers_not_updated"),
|
||||
RowDescriptor::int8_col(b"layers_removed"),
|
||||
@@ -707,6 +744,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(result.layers_total.to_string().as_bytes()),
|
||||
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
|
||||
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
|
||||
Some(result.layers_needed_by_branches.to_string().as_bytes()),
|
||||
Some(result.layers_not_updated.to_string().as_bytes()),
|
||||
Some(result.layers_removed.to_string().as_bytes()),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::layered_repository::metadata::TimelineMetadata;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
use crate::tenant_config::TenantConf;
|
||||
use crate::walrecord::ZenithWalRecord;
|
||||
use crate::CheckpointConfig;
|
||||
use anyhow::{bail, Result};
|
||||
@@ -249,6 +250,7 @@ pub trait Repository: Send + Sync {
|
||||
&self,
|
||||
timelineid: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult>;
|
||||
|
||||
@@ -261,6 +263,8 @@ pub trait Repository: Send + Sync {
|
||||
|
||||
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
|
||||
fn get_remote_index(&self) -> &RemoteIndex;
|
||||
|
||||
fn get_tenant_conf(&self) -> TenantConf;
|
||||
}
|
||||
|
||||
/// A timeline, that belongs to the current repository.
|
||||
@@ -303,6 +307,7 @@ impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
|
||||
pub struct GcResult {
|
||||
pub layers_total: u64,
|
||||
pub layers_needed_by_cutoff: u64,
|
||||
pub layers_needed_by_pitr: u64,
|
||||
pub layers_needed_by_branches: u64,
|
||||
pub layers_not_updated: u64,
|
||||
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
|
||||
@@ -313,6 +318,7 @@ pub struct GcResult {
|
||||
impl AddAssign for GcResult {
|
||||
fn add_assign(&mut self, other: Self) {
|
||||
self.layers_total += other.layers_total;
|
||||
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
|
||||
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
|
||||
self.layers_needed_by_branches += other.layers_needed_by_branches;
|
||||
self.layers_not_updated += other.layers_not_updated;
|
||||
@@ -454,6 +460,7 @@ pub mod repo_harness {
|
||||
|
||||
pub struct RepoHarness<'a> {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: ZTenantId,
|
||||
|
||||
pub lock_guard: (
|
||||
@@ -485,12 +492,15 @@ pub mod repo_harness {
|
||||
// OK in a test.
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
|
||||
let tenant_conf = TenantConf::dummy_conf();
|
||||
|
||||
let tenant_id = ZTenantId::generate();
|
||||
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
|
||||
fs::create_dir_all(conf.timelines_path(&tenant_id))?;
|
||||
|
||||
Ok(Self {
|
||||
conf,
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
lock_guard,
|
||||
})
|
||||
@@ -505,6 +515,7 @@ pub mod repo_harness {
|
||||
|
||||
let repo = LayeredRepository::new(
|
||||
self.conf,
|
||||
self.tenant_conf,
|
||||
walredo_mgr,
|
||||
self.tenant_id,
|
||||
RemoteIndex::empty(),
|
||||
@@ -720,7 +731,7 @@ mod tests {
|
||||
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
|
||||
// and compaction works. But it does set the 'cutoff' point so that the cross check
|
||||
// below should fail.
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||
|
||||
// try to branch at lsn 25, should fail because we already garbage collected the data
|
||||
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
|
||||
@@ -771,7 +782,7 @@ mod tests {
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20))?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
|
||||
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
|
||||
match tline.get(*TEST_KEY, Lsn(0x25)) {
|
||||
@@ -794,7 +805,7 @@ mod tests {
|
||||
.get_timeline_load(NEW_TIMELINE_ID)
|
||||
.expect("Should have a local timeline");
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
|
||||
|
||||
Ok(())
|
||||
@@ -813,7 +824,7 @@ mod tests {
|
||||
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
|
||||
|
||||
// run gc on parent
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||
|
||||
// Check that the data is still accessible on the branch.
|
||||
assert_eq!(
|
||||
|
||||
220
pageserver/src/tenant_config.rs
Normal file
220
pageserver/src/tenant_config.rs
Normal file
@@ -0,0 +1,220 @@
|
||||
//! Functions for handling per-tenant configuration options
|
||||
//!
|
||||
//! If tenant is created with --config option,
|
||||
//! the tenant-specific config will be stored in tenant's directory.
|
||||
//! Otherwise, global pageserver's config is used.
|
||||
//!
|
||||
//! If the tenant config file is corrupted, the tenant will be disabled.
|
||||
//! We cannot use global or default config instead, because wrong settings
|
||||
//! may lead to a data loss.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::STORAGE_FORMAT_VERSION;
|
||||
use anyhow::ensure;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
|
||||
pub const TENANT_CONFIG_NAME: &str = "config";
|
||||
|
||||
pub mod defaults {
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
||||
|
||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: &str = "100 s";
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TenantConf {
|
||||
pub checkpoint_distance: u64,
|
||||
pub compaction_target_size: u64,
|
||||
pub compaction_period: Duration,
|
||||
pub gc_horizon: u64,
|
||||
pub gc_period: Duration,
|
||||
pub pitr_interval: Duration,
|
||||
}
|
||||
|
||||
/// We assume that a write of up to TENANTCONF_MAX_SIZE bytes is atomic.
|
||||
///
|
||||
/// This is the same assumption that PostgreSQL makes with the control file,
|
||||
/// see PG_CONTROL_MAX_SAFE_SIZE
|
||||
const TENANTCONF_MAX_SIZE: usize = 512;
|
||||
|
||||
/// TenantConfFile is stored on disk in tenant's directory
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct TenantConfFile {
|
||||
hdr: TenantConfHeader,
|
||||
pub body: TenantConf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TenantConfHeader {
|
||||
checksum: u32, // CRC of serialized tenantconf body
|
||||
size: u16, // size of serialized tenantconf
|
||||
format_version: u16, // storage format version (used for compatibility checks)
|
||||
}
|
||||
const TENANTCONF_HDR_SIZE: usize = std::mem::size_of::<TenantConfHeader>();
|
||||
|
||||
impl TenantConfFile {
|
||||
pub fn new(
|
||||
checkpoint_distance: u64,
|
||||
compaction_target_size: u64,
|
||||
compaction_period: Duration,
|
||||
gc_horizon: u64,
|
||||
gc_period: Duration,
|
||||
pitr_interval: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
hdr: TenantConfHeader {
|
||||
checksum: 0,
|
||||
size: 0,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
},
|
||||
body: TenantConf {
|
||||
gc_period,
|
||||
gc_horizon,
|
||||
pitr_interval,
|
||||
checkpoint_distance,
|
||||
compaction_period,
|
||||
compaction_target_size,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from(tconf: TenantConf) -> Self {
|
||||
Self {
|
||||
hdr: TenantConfHeader {
|
||||
checksum: 0,
|
||||
size: 0,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
},
|
||||
body: TenantConf {
|
||||
gc_period: tconf.gc_period,
|
||||
gc_horizon: tconf.gc_horizon,
|
||||
pitr_interval: tconf.pitr_interval,
|
||||
checkpoint_distance: tconf.checkpoint_distance,
|
||||
compaction_period: tconf.compaction_period,
|
||||
compaction_target_size: tconf.compaction_target_size,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_bytes(tenantconf_bytes: &[u8]) -> anyhow::Result<Self> {
|
||||
ensure!(
|
||||
tenantconf_bytes.len() == TENANTCONF_MAX_SIZE,
|
||||
"tenantconf bytes size is wrong"
|
||||
);
|
||||
let hdr = TenantConfHeader::des(&tenantconf_bytes[0..TENANTCONF_HDR_SIZE])?;
|
||||
ensure!(
|
||||
hdr.format_version == STORAGE_FORMAT_VERSION,
|
||||
"format version mismatch"
|
||||
);
|
||||
let tenantconf_size = hdr.size as usize;
|
||||
ensure!(
|
||||
tenantconf_size <= TENANTCONF_MAX_SIZE,
|
||||
"corrupted tenantconf file"
|
||||
);
|
||||
let calculated_checksum =
|
||||
crc32c::crc32c(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size]);
|
||||
ensure!(
|
||||
hdr.checksum == calculated_checksum,
|
||||
"tenantconf checksum mismatch"
|
||||
);
|
||||
let body = TenantConf::des(&tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size])?;
|
||||
|
||||
Ok(TenantConfFile { hdr, body })
|
||||
}
|
||||
|
||||
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
|
||||
let body_bytes = self.body.ser()?;
|
||||
let tenantconf_size = TENANTCONF_HDR_SIZE + body_bytes.len();
|
||||
let hdr = TenantConfHeader {
|
||||
size: tenantconf_size as u16,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
checksum: crc32c::crc32c(&body_bytes),
|
||||
};
|
||||
let hdr_bytes = hdr.ser()?;
|
||||
let mut tenantconf_bytes = vec![0u8; TENANTCONF_MAX_SIZE];
|
||||
tenantconf_bytes[0..TENANTCONF_HDR_SIZE].copy_from_slice(&hdr_bytes);
|
||||
tenantconf_bytes[TENANTCONF_HDR_SIZE..tenantconf_size].copy_from_slice(&body_bytes);
|
||||
Ok(tenantconf_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl TenantConf {
|
||||
pub fn default() -> TenantConf {
|
||||
use defaults::*;
|
||||
|
||||
TenantConf {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period"),
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||
.expect("cannot parse default PITR interval"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Points to a place in pageserver's local directory,
|
||||
/// where certain tenant's tenantconf file should be located.
|
||||
pub fn tenantconf_path(conf: &'static PageServerConf, tenantid: ZTenantId) -> PathBuf {
|
||||
conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn dummy_conf() -> Self {
|
||||
TenantConf {
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
compaction_target_size: 4 * 1024 * 1024,
|
||||
compaction_period: Duration::from_secs(10),
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
gc_period: Duration::from_secs(10),
|
||||
pitr_interval: Duration::from_secs(60 * 60),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn tenantconf_serializes_correctly() {
|
||||
let original_tenantconf = TenantConfFile::new(
|
||||
111,
|
||||
111,
|
||||
Duration::from_secs(111),
|
||||
222,
|
||||
Duration::from_secs(111),
|
||||
Duration::from_secs(60 * 60),
|
||||
);
|
||||
|
||||
let tenantconf_bytes = original_tenantconf
|
||||
.to_bytes()
|
||||
.expect("Should serialize correct tenantconf to bytes");
|
||||
|
||||
let deserialized_tenantconf = TenantConfFile::from_bytes(&tenantconf_bytes)
|
||||
.expect("Should deserialize its own bytes");
|
||||
|
||||
assert_eq!(
|
||||
deserialized_tenantconf.body, original_tenantconf.body,
|
||||
"Tenantconf that was serialized to bytes and deserialized back should not change"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ use crate::config::PageServerConf;
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
||||
use crate::tenant_config::TenantConf;
|
||||
use crate::thread_mgr;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::timelines;
|
||||
@@ -63,7 +64,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||
TENANTS.lock().unwrap()
|
||||
}
|
||||
|
||||
// Sets up wal redo manager and repository for tenant. Reduces code duplocation.
|
||||
// Sets up wal redo manager and repository for tenant. Reduces code duplication.
|
||||
// Used during pageserver startup, or when new tenant is attached to pageserver.
|
||||
pub fn load_local_repo(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -75,9 +76,15 @@ pub fn load_local_repo(
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||
|
||||
// Try to load config file
|
||||
let tenant_conf = LayeredRepository::load_tenantconf(conf, tenant_id)
|
||||
.with_context(|| format!("Failed to load tenant state for id {}", tenant_id))
|
||||
.unwrap();
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||
conf,
|
||||
tenant_conf,
|
||||
Arc::new(walredo_mgr),
|
||||
tenant_id,
|
||||
remote_index.clone(),
|
||||
@@ -174,6 +181,7 @@ pub fn shutdown_all_tenants() {
|
||||
|
||||
pub fn create_tenant_repository(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConf,
|
||||
tenantid: ZTenantId,
|
||||
remote_index: RemoteIndex,
|
||||
) -> Result<Option<ZTenantId>> {
|
||||
@@ -186,6 +194,7 @@ pub fn create_tenant_repository(
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
||||
let repo = timelines::create_repo(
|
||||
conf,
|
||||
tenant_conf,
|
||||
tenantid,
|
||||
CreateRepo::Real {
|
||||
wal_redo_manager,
|
||||
@@ -210,7 +219,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
|
||||
/// Change the state of a tenant to Active and launch its compactor and GC
|
||||
/// threads. If the tenant was already in Active state or Stopping, does nothing.
|
||||
///
|
||||
pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> {
|
||||
pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
|
||||
let mut m = access_tenants();
|
||||
let tenant = m
|
||||
.get_mut(&tenant_id)
|
||||
@@ -230,7 +239,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
|
||||
None,
|
||||
"Compactor thread",
|
||||
true,
|
||||
move || crate::tenant_threads::compact_loop(tenant_id, conf),
|
||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||
)?;
|
||||
|
||||
let gc_spawn_result = thread_mgr::spawn(
|
||||
@@ -239,7 +248,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
|
||||
None,
|
||||
"GC thread",
|
||||
true,
|
||||
move || crate::tenant_threads::gc_loop(tenant_id, conf),
|
||||
move || crate::tenant_threads::gc_loop(tenant_id),
|
||||
)
|
||||
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
|
||||
|
||||
@@ -251,7 +260,6 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> R
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
return gc_spawn_result;
|
||||
}
|
||||
|
||||
tenant.state = TenantState::Active;
|
||||
}
|
||||
|
||||
@@ -290,7 +298,7 @@ pub fn get_timeline_for_tenant_load(
|
||||
.get_timeline_load(timelineid)
|
||||
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?;
|
||||
|
||||
let repartition_distance = tenant.repo.conf.checkpoint_distance / 10;
|
||||
let repartition_distance = tenant.repo.get_tenant_conf().checkpoint_distance / 10;
|
||||
|
||||
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
|
||||
page_tline.init_logical_size()?;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
use crate::config::PageServerConf;
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_mgr;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
@@ -12,8 +11,8 @@ use zenith_utils::zid::ZTenantId;
|
||||
///
|
||||
/// Compaction thread's main loop
|
||||
///
|
||||
pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
|
||||
if let Err(err) = compact_loop_ext(tenantid, conf) {
|
||||
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
if let Err(err) = compact_loop_ext(tenantid) {
|
||||
error!("compact loop terminated with error: {:?}", err);
|
||||
Err(err)
|
||||
} else {
|
||||
@@ -21,13 +20,15 @@ pub fn compact_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
|
||||
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let tenant_conf = repo.get_tenant_conf();
|
||||
|
||||
std::thread::sleep(conf.compaction_period);
|
||||
std::thread::sleep(tenant_conf.compaction_period);
|
||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
||||
|
||||
// Compact timelines
|
||||
@@ -46,23 +47,29 @@ fn compact_loop_ext(tenantid: ZTenantId, conf: &'static PageServerConf) -> Resul
|
||||
///
|
||||
/// GC thread's main loop
|
||||
///
|
||||
pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
|
||||
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("gc thread for tenant {} waking up", tenantid);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let tenant_conf = repo.get_tenant_conf();
|
||||
|
||||
// Garbage collect old files that are not needed for PITR anymore
|
||||
if conf.gc_horizon > 0 {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
repo.gc_iteration(None, conf.gc_horizon, false)?;
|
||||
if tenant_conf.gc_horizon > 0 {
|
||||
repo.gc_iteration(
|
||||
None,
|
||||
tenant_conf.gc_horizon,
|
||||
tenant_conf.pitr_interval,
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
|
||||
// TODO Write it in more adequate way using
|
||||
// condvar.wait_timeout() or something
|
||||
let mut sleep_time = conf.gc_period.as_secs();
|
||||
let mut sleep_time = tenant_conf.gc_period.as_secs();
|
||||
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
||||
{
|
||||
sleep_time -= 1;
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::{
|
||||
layered_repository::metadata::TimelineMetadata,
|
||||
remote_storage::RemoteIndex,
|
||||
repository::{LocalTimelineState, Repository},
|
||||
tenant_config::TenantConf,
|
||||
DatadirTimeline, RepositoryImpl,
|
||||
};
|
||||
use crate::{import_datadir, LOG_FILE_NAME};
|
||||
@@ -149,8 +150,8 @@ pub fn init_pageserver(
|
||||
|
||||
if let Some(tenant_id) = create_tenant {
|
||||
println!("initializing tenantid {}", tenant_id);
|
||||
let repo =
|
||||
create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?;
|
||||
let repo = create_repo(conf, TenantConf::default(), tenant_id, CreateRepo::Dummy)
|
||||
.context("failed to create repo")?;
|
||||
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
|
||||
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
|
||||
.context("failed to create initial timeline")?;
|
||||
@@ -173,6 +174,7 @@ pub enum CreateRepo {
|
||||
|
||||
pub fn create_repo(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConf,
|
||||
tenant_id: ZTenantId,
|
||||
create_repo: CreateRepo,
|
||||
) -> Result<Arc<RepositoryImpl>> {
|
||||
@@ -209,8 +211,12 @@ pub fn create_repo(
|
||||
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
|
||||
info!("created directory structure in {}", repo_dir.display());
|
||||
|
||||
// Save tenant's config
|
||||
LayeredRepository::save_tenantconf(conf, tenant_id, tenant_conf, true)?;
|
||||
|
||||
Ok(Arc::new(LayeredRepository::new(
|
||||
conf,
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
remote_index,
|
||||
|
||||
@@ -93,7 +93,7 @@ pub fn launch_wal_receiver(
|
||||
receivers.insert((tenantid, timelineid), receiver);
|
||||
|
||||
// Update tenant state and start tenant threads, if they are not running yet.
|
||||
tenant_mgr::activate_tenant(conf, tenantid)?;
|
||||
tenant_mgr::activate_tenant(tenantid)?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
|
||||
31
test_runner/batch_others/test_tenant_conf.py
Normal file
31
test_runner/batch_others/test_tenant_conf.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder
|
||||
|
||||
|
||||
def test_tenant_config(zenith_env_builder: ZenithEnvBuilder):
|
||||
env = zenith_env_builder.init_start()
|
||||
"""Test per tenant configuration"""
|
||||
tenant = env.zenith_cli.create_tenant(
|
||||
conf={
|
||||
'gc_period': '100sec',
|
||||
'gc_horizon': '1024',
|
||||
'pitr_interval': '3600sec',
|
||||
'checkpoint_distance': '10000',
|
||||
'compaction_period': '60sec',
|
||||
'compaction_target_size': '1048576'
|
||||
})
|
||||
|
||||
env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant)
|
||||
pg = env.postgres.create_start(
|
||||
"test_tenant_conf",
|
||||
"main",
|
||||
tenant,
|
||||
)
|
||||
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor() as pscur:
|
||||
pscur.execute(f"show {tenant.hex}")
|
||||
assert pscur.fetchone() == (10000, 1048576, 60, 1024, 100, 3600)
|
||||
@@ -853,13 +853,20 @@ class ZenithCli:
|
||||
self.env = env
|
||||
pass
|
||||
|
||||
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
|
||||
def create_tenant(self,
|
||||
tenant_id: Optional[uuid.UUID] = None,
|
||||
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
|
||||
"""
|
||||
Creates a new tenant, returns its id and its initial timeline's id.
|
||||
"""
|
||||
if tenant_id is None:
|
||||
tenant_id = uuid.uuid4()
|
||||
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
|
||||
if conf is None:
|
||||
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
|
||||
else:
|
||||
res = self.raw_cli(
|
||||
['tenant', 'create', '--tenant-id', tenant_id.hex] +
|
||||
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
|
||||
res.check_returncode()
|
||||
return tenant_id
|
||||
|
||||
|
||||
@@ -164,7 +164,8 @@ fn main() -> Result<()> {
|
||||
.subcommand(App::new("create")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
|
||||
)
|
||||
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
App::new("pageserver")
|
||||
@@ -521,8 +522,12 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
let initial_tenant_id = parse_tenant_id(create_match)?;
|
||||
let tenant_conf: HashMap<_, _> = create_match
|
||||
.values_of("config")
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.unwrap_or_default();
|
||||
let new_tenant_id = pageserver
|
||||
.tenant_create(initial_tenant_id)?
|
||||
.tenant_create(initial_tenant_id, tenant_conf)?
|
||||
.ok_or_else(|| {
|
||||
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
|
||||
})?;
|
||||
|
||||
Reference in New Issue
Block a user