From 3da8233f084a824acf064c64235c65528ce349f4 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 11 Mar 2022 07:20:20 +0300 Subject: [PATCH] Fixes after rebase --- control_plane/src/storage.rs | 2 +- pageserver/src/config.rs | 39 +++++++++--------- pageserver/src/http/models.rs | 2 +- pageserver/src/http/routes.rs | 2 +- pageserver/src/layered_repository.rs | 42 +++++++++++--------- pageserver/src/page_service.rs | 4 +- pageserver/src/repository.rs | 4 +- pageserver/src/tenant_mgr.rs | 19 +++++---- pageserver/src/tenant_threads.rs | 2 +- pageserver/src/timelines.rs | 7 +++- test_runner/batch_others/test_tenant_conf.py | 17 ++++++-- test_runner/fixtures/zenith_fixtures.py | 8 +++- zenith/src/main.rs | 2 +- 13 files changed, 89 insertions(+), 61 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 296c227c01..4b9fbe7c08 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -343,7 +343,7 @@ impl PageServerNode { pub fn tenant_create( &self, new_tenant_id: Option, - settings: HashMap<&str, &str> + settings: HashMap<&str, &str>, ) -> anyhow::Result> { let tenant_id_string = self .http_request(Method::POST, format!("{}/tenant", self.http_base_url)) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f6c51bb4b8..3605b5716a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -164,25 +164,8 @@ impl TenantConf { pitr_interval: conf.pitr_interval, checkpoint_distance: conf.checkpoint_distance, compaction_period: conf.compaction_period, - } - } -} - -// use dedicated enum for builder to better indicate the intention -// and avoid possible confusion with nested options -pub enum BuilderValue { - Set(T), - NotSet, -} - -impl BuilderValue { - pub fn ok_or(self, err: E) -> Result { - match self { - Self::Set(v) => Ok(v), - Self::NotSet => Err(err), } } - pub fn save(&self, conf: &'static PageServerConf, tenantid: ZTenantId) -> Result<()> { let _enter = info_span!("saving tenant config").entered(); let path = conf.tenant_path(&tenantid).join(TENANT_CONFIG_NAME); @@ -214,6 +197,22 @@ impl BuilderValue { } } +// use dedicated enum for builder to better indicate the intention +// and avoid possible confusion with nested options +pub enum BuilderValue { + Set(T), + NotSet, +} + +impl BuilderValue { + pub fn ok_or(self, err: E) -> Result { + match self { + Self::Set(v) => Ok(v), + Self::NotSet => Err(err), + } + } +} + // needed to simplify config construction struct PageServerConfigBuilder { listen_pg_addr: BuilderValue, @@ -314,7 +313,7 @@ impl PageServerConfigBuilder { self.gc_period = BuilderValue::Set(gc_period) } - pub fn pitr_interval(&mut self, gc_period: Duration) { + pub fn pitr_interval(&mut self, pitr_interval: Duration) { self.pitr_interval = BuilderValue::Set(pitr_interval) } @@ -386,7 +385,9 @@ impl PageServerConfigBuilder { .gc_horizon .ok_or(anyhow::anyhow!("missing gc_horizon"))?, gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?, - pitr_interval: self.pitr_interval.ok_or(anyhow::anyhow!("missing pitr_interval"))?, + pitr_interval: self + .pitr_interval + .ok_or(anyhow::anyhow!("missing pitr_interval"))?, wait_lsn_timeout: self .wait_lsn_timeout .ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?, diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 12ee30be4d..6131515210 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -43,7 +43,7 @@ pub struct StatusResponse { } impl TenantCreateRequest { - pub fn new(new_tenant_id: ZTenantId) -> TenantCreateRequest { + pub fn new(new_tenant_id: Option) -> TenantCreateRequest { TenantCreateRequest { new_tenant_id, checkpoint_distance: None, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f915e942de..5dca3bd863 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -318,7 +318,7 @@ async fn tenant_create_handler(mut request: Request) -> Result, cutoff: Lsn) { + fn update_gc_info(&self, retain_lsns: Vec, cutoff: Lsn, pitr: Duration) { let mut gc_info = self.gc_info.write().unwrap(); gc_info.retain_lsns = retain_lsns; gc_info.cutoff = cutoff; + gc_info.pitr = pitr; } /// @@ -1900,19 +1907,13 @@ impl LayeredTimeline { if let Ok(metadata) = fs::metadata(&l.filename()) { let last_modified = metadata.modified()?; if now.duration_since(last_modified)? < pitr { - info!( - "keeping {} {}-{} because it's modification time {:?} is newer than PiTR {:?}", - seg, - l.get_start_lsn(), - l.get_end_lsn(), - last_modified, - pitr - ); - if seg.rel.is_relation() { - result.ondisk_relfiles_needed_by_pitr += 1; - } else { - result.ondisk_nonrelfiles_needed_by_pitr += 1; - } + debug!( + "keeping {} because it's modification time {:?} is newer than PITR {:?}", + l.filename().display(), + last_modified, + pitr + ); + result.layers_needed_by_pitr += 1; continue 'outer; } } @@ -2253,7 +2254,12 @@ pub mod tests { } let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff); + let parts = keyspace + .clone() + .to_keyspace() + .partition(TEST_FILE_SIZE as u64); + + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2323,7 +2329,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2400,7 +2406,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2c169ba7fd..8c0a1f4d61 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -683,14 +683,14 @@ impl postgres_backend::Handler for PageServerHandler { let conf = repo.get_tenant_conf(); pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), - RowDescriptor::int8_col(b"checkpoint_period"), + RowDescriptor::int8_col(b"compaction_period"), RowDescriptor::int8_col(b"gc_horizon"), RowDescriptor::int8_col(b"gc_period"), RowDescriptor::int8_col(b"pitr_interval"), ]))? .write_message_noflush(&BeMessage::DataRow(&[ Some(conf.checkpoint_distance.to_string().as_bytes()), - Some(conf.checkpoint_period.as_secs().to_string().as_bytes()), + Some(conf.compaction_period.as_secs().to_string().as_bytes()), Some(conf.gc_horizon.to_string().as_bytes()), Some(conf.gc_period.as_secs().to_string().as_bytes()), Some(conf.pitr_interval.as_secs().to_string().as_bytes()), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index adf47ca79e..348b2b91e0 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,7 +1,7 @@ +use crate::config::TenantConf; use crate::layered_repository::metadata::TimelineMetadata; use crate::remote_storage::RemoteIndex; use crate::walrecord::ZenithWalRecord; -use crate::config::TenantConf; use crate::CheckpointConfig; use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; @@ -727,7 +727,7 @@ mod tests { // FIXME: this doesn't actually remove any layer currently, given how the checkpointing // and compaction works. But it does set the 'cutoff' point so that the cross check // below should fail. - repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; + repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; // try to branch at lsn 25, should fail because we already garbage collected the data match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) { diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c885dc83f8..eae792d45b 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -1,7 +1,6 @@ //! This module acts as a switchboard to access different repositories managed by this //! page server. -use crate::branches; use crate::config::{PageServerConf, TenantConf}; use crate::layered_repository::LayeredRepository; use crate::remote_storage::RemoteIndex; @@ -76,9 +75,19 @@ pub fn load_local_repo( // Set up a WAL redo manager, for applying WAL records. let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); + // Try to load config file + let tenant_conf = match TenantConf::load(conf, tenant_id) { + Ok(tenant_conf) => tenant_conf, + Err(e) => { + error!("Failed to load tenant state: {:?}", e); + TenantConf::from(conf) + } + }; + // Set up an object repository, for actual data storage. let repo: Arc = Arc::new(LayeredRepository::new( conf, + tenant_conf, Arc::new(walredo_mgr), tenant_id, remote_index.clone(), @@ -179,9 +188,6 @@ pub fn create_tenant_repository( tenantid: ZTenantId, remote_index: RemoteIndex, ) -> Result> { - let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); - let repo = branches::create_repo(conf, tenant_conf, tenantid, wal_redo_manager)?; - match access_tenants().entry(tenantid) { Entry::Occupied(_) => { debug!("tenant {} already exists", tenantid); @@ -189,10 +195,9 @@ pub fn create_tenant_repository( } Entry::Vacant(v) => { let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); - let tenant_conf = match TenantConf::load(conf, tenant_id)?; let repo = timelines::create_repo( conf, - tenant_conf, + tenant_conf, tenantid, CreateRepo::Real { wal_redo_manager, @@ -217,7 +222,7 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option { /// Change the state of a tenant to Active and launch its compactor and GC /// threads. If the tenant was already in Active state or Stopping, does nothing. /// -pub fn activate_tenant(tenantid: ZTenantId) -> Result<()> { +pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> { let mut m = access_tenants(); let tenant = m .get_mut(&tenant_id) diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 0b21b70e7e..0971f1087e 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -28,7 +28,7 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; let tenant_conf = repo.get_tenant_conf(); - std::thread::sleep(tenant_conf.checkpoint_period); + std::thread::sleep(tenant_conf.compaction_period); trace!("compaction thread for tenant {} waking up", tenantid); // Compact timelines diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 105c3c869f..c6fab256fc 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -20,6 +20,7 @@ use zenith_utils::{crashsafe_dir, logging}; use crate::{ config::PageServerConf, + config::TenantConf, layered_repository::metadata::TimelineMetadata, remote_storage::RemoteIndex, repository::{LocalTimelineState, Repository}, @@ -149,8 +150,8 @@ pub fn init_pageserver( if let Some(tenant_id) = create_tenant { println!("initializing tenantid {}", tenant_id); - let repo = - create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?; + let repo = create_repo(conf, TenantConf::from(conf), tenant_id, CreateRepo::Dummy) + .context("failed to create repo")?; let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate); bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref()) .context("failed to create initial timeline")?; @@ -173,6 +174,7 @@ pub enum CreateRepo { pub fn create_repo( conf: &'static PageServerConf, + tenant_conf: TenantConf, tenant_id: ZTenantId, create_repo: CreateRepo, ) -> Result> { @@ -211,6 +213,7 @@ pub fn create_repo( Ok(Arc::new(LayeredRepository::new( conf, + tenant_conf, wal_redo_manager, tenant_id, remote_index, diff --git a/test_runner/batch_others/test_tenant_conf.py b/test_runner/batch_others/test_tenant_conf.py index 02d1e14da1..0e2fc5df17 100644 --- a/test_runner/batch_others/test_tenant_conf.py +++ b/test_runner/batch_others/test_tenant_conf.py @@ -4,11 +4,20 @@ import pytest from fixtures.zenith_fixtures import ZenithEnvBuilder -def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder): - env = zenith_env_builder.init() - """Test per tenant configuration""" - tenant = env.create_tenant(conf={'gc_period':'100sec','gc_horizon':'1024','pitr_interval':'3600sec','checkpoint_distance':'10000','checkpoint_period':'60sec'}) +def test_tenant_config(zenith_env_builder: ZenithEnvBuilder): + env = zenith_env_builder.init_start() + """Test per tenant configuration""" + tenant = env.zenith_cli.create_tenant( + conf={ + 'gc_period': '100sec', + 'gc_horizon': '1024', + 'pitr_interval': '3600sec', + 'checkpoint_distance': '10000', + 'compaction_period': '60sec' + }) + + env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant) pg = env.postgres.create_start( "test_tenant_conf", "main", diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 09164bd4d0..224603ed15 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -853,7 +853,9 @@ class ZenithCli: self.env = env pass - def create_tenant(self, tenant_id: Optional[uuid.UUID] = None, conf: Optional[Dict[str,str]] = None) -> uuid.UUID: + def create_tenant(self, + tenant_id: Optional[uuid.UUID] = None, + conf: Optional[Dict[str, str]] = None) -> uuid.UUID: """ Creates a new tenant, returns its id and its initial timeline's id. """ @@ -862,7 +864,9 @@ class ZenithCli: if conf is None: res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex]) else: - res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex] + sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), [])) + res = self.raw_cli( + ['tenant', 'create', '--tenant-id', tenant_id.hex] + + sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), [])) res.check_returncode() return tenant_id diff --git a/zenith/src/main.rs b/zenith/src/main.rs index ca169867f6..f3757f66ae 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -525,7 +525,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re let tenant_conf: HashMap<_, _> = create_match .values_of("config") .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) - .unwrap_or(HashMap::new()); + .unwrap_or_default(); let new_tenant_id = pageserver .tenant_create(initial_tenant_id, tenant_conf)? .ok_or_else(|| {