mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 16:10:37 +00:00
Fixes after rebase
This commit is contained in:
committed by
Anastasia Lubennikova
parent
51c7ceb1d9
commit
3da8233f08
@@ -343,7 +343,7 @@ impl PageServerNode {
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: Option<ZTenantId>,
|
||||
settings: HashMap<&str, &str>
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<Option<ZTenantId>> {
|
||||
let tenant_id_string = self
|
||||
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
|
||||
|
||||
@@ -164,25 +164,8 @@ impl TenantConf {
|
||||
pitr_interval: conf.pitr_interval,
|
||||
checkpoint_distance: conf.checkpoint_distance,
|
||||
compaction_period: conf.compaction_period,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// use dedicated enum for builder to better indicate the intention
|
||||
// and avoid possible confusion with nested options
|
||||
pub enum BuilderValue<T> {
|
||||
Set(T),
|
||||
NotSet,
|
||||
}
|
||||
|
||||
impl<T> BuilderValue<T> {
|
||||
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
|
||||
match self {
|
||||
Self::Set(v) => Ok(v),
|
||||
Self::NotSet => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn save(&self, conf: &'static PageServerConf, tenantid: ZTenantId) -> Result<()> {
|
||||
let _enter = info_span!("saving tenant config").entered();
|
||||
let path = conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME);
|
||||
@@ -214,6 +197,22 @@ impl<T> BuilderValue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// use dedicated enum for builder to better indicate the intention
|
||||
// and avoid possible confusion with nested options
|
||||
pub enum BuilderValue<T> {
|
||||
Set(T),
|
||||
NotSet,
|
||||
}
|
||||
|
||||
impl<T> BuilderValue<T> {
|
||||
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
|
||||
match self {
|
||||
Self::Set(v) => Ok(v),
|
||||
Self::NotSet => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// needed to simplify config construction
|
||||
struct PageServerConfigBuilder {
|
||||
listen_pg_addr: BuilderValue<String>,
|
||||
@@ -314,7 +313,7 @@ impl PageServerConfigBuilder {
|
||||
self.gc_period = BuilderValue::Set(gc_period)
|
||||
}
|
||||
|
||||
pub fn pitr_interval(&mut self, gc_period: Duration) {
|
||||
pub fn pitr_interval(&mut self, pitr_interval: Duration) {
|
||||
self.pitr_interval = BuilderValue::Set(pitr_interval)
|
||||
}
|
||||
|
||||
@@ -386,7 +385,9 @@ impl PageServerConfigBuilder {
|
||||
.gc_horizon
|
||||
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
|
||||
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
|
||||
pitr_interval: self.pitr_interval.ok_or(anyhow::anyhow!("missing pitr_interval"))?,
|
||||
pitr_interval: self
|
||||
.pitr_interval
|
||||
.ok_or(anyhow::anyhow!("missing pitr_interval"))?,
|
||||
wait_lsn_timeout: self
|
||||
.wait_lsn_timeout
|
||||
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
|
||||
|
||||
@@ -43,7 +43,7 @@ pub struct StatusResponse {
|
||||
}
|
||||
|
||||
impl TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: ZTenantId) -> TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: Option<ZTenantId>) -> TenantCreateRequest {
|
||||
TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
checkpoint_distance: None,
|
||||
|
||||
@@ -318,7 +318,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let new_tenant_id = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
|
||||
|
||||
tenant_mgr::create_tenant_repository(conf, tenant_conf,target_tenant_id, remote_index)
|
||||
tenant_mgr::create_tenant_repository(conf, tenant_conf, target_tenant_id, remote_index)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
@@ -33,7 +33,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
use crate::config::{PageServerConf, TenantConf};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::keyspace::KeySpace;
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
|
||||
@@ -834,6 +834,11 @@ struct GcInfo {
|
||||
///
|
||||
/// FIXME: is this inclusive or exclusive?
|
||||
cutoff: Lsn,
|
||||
|
||||
/// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()'
|
||||
/// minus 'pitr_interval'
|
||||
///
|
||||
pitr: Duration,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
@@ -1039,6 +1044,7 @@ impl LayeredTimeline {
|
||||
gc_info: RwLock::new(GcInfo {
|
||||
retain_lsns: Vec::new(),
|
||||
cutoff: Lsn(0),
|
||||
pitr: Duration::ZERO,
|
||||
}),
|
||||
|
||||
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
|
||||
@@ -1821,10 +1827,11 @@ impl LayeredTimeline {
|
||||
/// the latest LSN subtracted by a constant, and doesn't do anything smart
|
||||
/// to figure out what read-only nodes might actually need.)
|
||||
///
|
||||
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn) {
|
||||
fn update_gc_info(&self, retain_lsns: Vec<Lsn>, cutoff: Lsn, pitr: Duration) {
|
||||
let mut gc_info = self.gc_info.write().unwrap();
|
||||
gc_info.retain_lsns = retain_lsns;
|
||||
gc_info.cutoff = cutoff;
|
||||
gc_info.pitr = pitr;
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1900,19 +1907,13 @@ impl LayeredTimeline {
|
||||
if let Ok(metadata) = fs::metadata(&l.filename()) {
|
||||
let last_modified = metadata.modified()?;
|
||||
if now.duration_since(last_modified)? < pitr {
|
||||
info!(
|
||||
"keeping {} {}-{} because it's modification time {:?} is newer than PiTR {:?}",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn(),
|
||||
last_modified,
|
||||
pitr
|
||||
);
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_needed_by_pitr += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_needed_by_pitr += 1;
|
||||
}
|
||||
debug!(
|
||||
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
|
||||
l.filename().display(),
|
||||
last_modified,
|
||||
pitr
|
||||
);
|
||||
result.layers_needed_by_pitr += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
@@ -2253,7 +2254,12 @@ pub mod tests {
|
||||
}
|
||||
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline.update_gc_info(Vec::new(), cutoff);
|
||||
let parts = keyspace
|
||||
.clone()
|
||||
.to_keyspace()
|
||||
.partition(TEST_FILE_SIZE as u64);
|
||||
|
||||
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||
tline.compact()?;
|
||||
tline.gc()?;
|
||||
@@ -2323,7 +2329,7 @@ pub mod tests {
|
||||
// Perform a cycle of checkpoint, compaction, and GC
|
||||
println!("checkpointing {}", lsn);
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline.update_gc_info(Vec::new(), cutoff);
|
||||
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||
tline.compact()?;
|
||||
tline.gc()?;
|
||||
@@ -2400,7 +2406,7 @@ pub mod tests {
|
||||
// Perform a cycle of checkpoint, compaction, and GC
|
||||
println!("checkpointing {}", lsn);
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline.update_gc_info(Vec::new(), cutoff);
|
||||
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO);
|
||||
tline.checkpoint(CheckpointConfig::Forced)?;
|
||||
tline.compact()?;
|
||||
tline.gc()?;
|
||||
|
||||
@@ -683,14 +683,14 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
let conf = repo.get_tenant_conf();
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"checkpoint_period"),
|
||||
RowDescriptor::int8_col(b"compaction_period"),
|
||||
RowDescriptor::int8_col(b"gc_horizon"),
|
||||
RowDescriptor::int8_col(b"gc_period"),
|
||||
RowDescriptor::int8_col(b"pitr_interval"),
|
||||
]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(conf.checkpoint_distance.to_string().as_bytes()),
|
||||
Some(conf.checkpoint_period.as_secs().to_string().as_bytes()),
|
||||
Some(conf.compaction_period.as_secs().to_string().as_bytes()),
|
||||
Some(conf.gc_horizon.to_string().as_bytes()),
|
||||
Some(conf.gc_period.as_secs().to_string().as_bytes()),
|
||||
Some(conf.pitr_interval.as_secs().to_string().as_bytes()),
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::config::TenantConf;
|
||||
use crate::layered_repository::metadata::TimelineMetadata;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
use crate::walrecord::ZenithWalRecord;
|
||||
use crate::config::TenantConf;
|
||||
use crate::CheckpointConfig;
|
||||
use anyhow::{bail, Result};
|
||||
use byteorder::{ByteOrder, BE};
|
||||
@@ -727,7 +727,7 @@ mod tests {
|
||||
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
|
||||
// and compaction works. But it does set the 'cutoff' point so that the cross check
|
||||
// below should fail.
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||
|
||||
// try to branch at lsn 25, should fail because we already garbage collected the data
|
||||
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! This module acts as a switchboard to access different repositories managed by this
|
||||
//! page server.
|
||||
|
||||
use crate::branches;
|
||||
use crate::config::{PageServerConf, TenantConf};
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
@@ -76,9 +75,19 @@ pub fn load_local_repo(
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||
|
||||
// Try to load config file
|
||||
let tenant_conf = match TenantConf::load(conf, tenant_id) {
|
||||
Ok(tenant_conf) => tenant_conf,
|
||||
Err(e) => {
|
||||
error!("Failed to load tenant state: {:?}", e);
|
||||
TenantConf::from(conf)
|
||||
}
|
||||
};
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||
conf,
|
||||
tenant_conf,
|
||||
Arc::new(walredo_mgr),
|
||||
tenant_id,
|
||||
remote_index.clone(),
|
||||
@@ -179,9 +188,6 @@ pub fn create_tenant_repository(
|
||||
tenantid: ZTenantId,
|
||||
remote_index: RemoteIndex,
|
||||
) -> Result<Option<ZTenantId>> {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
||||
let repo = branches::create_repo(conf, tenant_conf, tenantid, wal_redo_manager)?;
|
||||
|
||||
match access_tenants().entry(tenantid) {
|
||||
Entry::Occupied(_) => {
|
||||
debug!("tenant {} already exists", tenantid);
|
||||
@@ -189,10 +195,9 @@ pub fn create_tenant_repository(
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
||||
let tenant_conf = match TenantConf::load(conf, tenant_id)?;
|
||||
let repo = timelines::create_repo(
|
||||
conf,
|
||||
tenant_conf,
|
||||
tenant_conf,
|
||||
tenantid,
|
||||
CreateRepo::Real {
|
||||
wal_redo_manager,
|
||||
@@ -217,7 +222,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
|
||||
/// Change the state of a tenant to Active and launch its compactor and GC
|
||||
/// threads. If the tenant was already in Active state or Stopping, does nothing.
|
||||
///
|
||||
pub fn activate_tenant(tenantid: ZTenantId) -> Result<()> {
|
||||
pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
|
||||
let mut m = access_tenants();
|
||||
let tenant = m
|
||||
.get_mut(&tenant_id)
|
||||
|
||||
@@ -28,7 +28,7 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let tenant_conf = repo.get_tenant_conf();
|
||||
|
||||
std::thread::sleep(tenant_conf.checkpoint_period);
|
||||
std::thread::sleep(tenant_conf.compaction_period);
|
||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
||||
|
||||
// Compact timelines
|
||||
|
||||
@@ -20,6 +20,7 @@ use zenith_utils::{crashsafe_dir, logging};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
config::TenantConf,
|
||||
layered_repository::metadata::TimelineMetadata,
|
||||
remote_storage::RemoteIndex,
|
||||
repository::{LocalTimelineState, Repository},
|
||||
@@ -149,8 +150,8 @@ pub fn init_pageserver(
|
||||
|
||||
if let Some(tenant_id) = create_tenant {
|
||||
println!("initializing tenantid {}", tenant_id);
|
||||
let repo =
|
||||
create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?;
|
||||
let repo = create_repo(conf, TenantConf::from(conf), tenant_id, CreateRepo::Dummy)
|
||||
.context("failed to create repo")?;
|
||||
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
|
||||
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
|
||||
.context("failed to create initial timeline")?;
|
||||
@@ -173,6 +174,7 @@ pub enum CreateRepo {
|
||||
|
||||
pub fn create_repo(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConf,
|
||||
tenant_id: ZTenantId,
|
||||
create_repo: CreateRepo,
|
||||
) -> Result<Arc<RepositoryImpl>> {
|
||||
@@ -211,6 +213,7 @@ pub fn create_repo(
|
||||
|
||||
Ok(Arc::new(LayeredRepository::new(
|
||||
conf,
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
remote_index,
|
||||
|
||||
@@ -4,11 +4,20 @@ import pytest
|
||||
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder
|
||||
|
||||
def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder):
|
||||
env = zenith_env_builder.init()
|
||||
"""Test per tenant configuration"""
|
||||
tenant = env.create_tenant(conf={'gc_period':'100sec','gc_horizon':'1024','pitr_interval':'3600sec','checkpoint_distance':'10000','checkpoint_period':'60sec'})
|
||||
|
||||
def test_tenant_config(zenith_env_builder: ZenithEnvBuilder):
|
||||
env = zenith_env_builder.init_start()
|
||||
"""Test per tenant configuration"""
|
||||
tenant = env.zenith_cli.create_tenant(
|
||||
conf={
|
||||
'gc_period': '100sec',
|
||||
'gc_horizon': '1024',
|
||||
'pitr_interval': '3600sec',
|
||||
'checkpoint_distance': '10000',
|
||||
'compaction_period': '60sec'
|
||||
})
|
||||
|
||||
env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant)
|
||||
pg = env.postgres.create_start(
|
||||
"test_tenant_conf",
|
||||
"main",
|
||||
|
||||
@@ -853,7 +853,9 @@ class ZenithCli:
|
||||
self.env = env
|
||||
pass
|
||||
|
||||
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None, conf: Optional[Dict[str,str]] = None) -> uuid.UUID:
|
||||
def create_tenant(self,
|
||||
tenant_id: Optional[uuid.UUID] = None,
|
||||
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
|
||||
"""
|
||||
Creates a new tenant, returns its id and its initial timeline's id.
|
||||
"""
|
||||
@@ -862,7 +864,9 @@ class ZenithCli:
|
||||
if conf is None:
|
||||
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
|
||||
else:
|
||||
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex] + sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
|
||||
res = self.raw_cli(
|
||||
['tenant', 'create', '--tenant-id', tenant_id.hex] +
|
||||
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
|
||||
res.check_returncode()
|
||||
return tenant_id
|
||||
|
||||
|
||||
@@ -525,7 +525,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
|
||||
let tenant_conf: HashMap<_, _> = create_match
|
||||
.values_of("config")
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.unwrap_or(HashMap::new());
|
||||
.unwrap_or_default();
|
||||
let new_tenant_id = pageserver
|
||||
.tenant_create(initial_tenant_id, tenant_conf)?
|
||||
.ok_or_else(|| {
|
||||
|
||||
Reference in New Issue
Block a user