From 3743344e64b59dd01640af58e0ce29d3e30860a0 Mon Sep 17 00:00:00 2001 From: Max Sharnoff Date: Thu, 16 Sep 2021 18:38:21 +0100 Subject: [PATCH] Add `get_timeline_for_tenant()` to `tenant_mgr` (#615) Most of the previous usages of get_repository_for_tenant were followed by immediately getting a timeline in that repository, without keeping it around for longer. The new `get_timeline_for_tenant` function implements that same behavior, but in one line. --- pageserver/src/branches.rs | 4 ++-- pageserver/src/http/routes.rs | 2 +- pageserver/src/page_service.rs | 20 ++++++-------------- pageserver/src/tenant_mgr.rs | 19 ++++++++++++++----- pageserver/src/walreceiver.rs | 13 ++++++------- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 4177adca66..4094757f31 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -257,7 +257,7 @@ pub(crate) fn get_tenants(conf: &PageServerConf) -> Result> { } pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Result> { - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?; // Each branch has a corresponding record (text file) in the refs/branches // with timeline_id. @@ -277,7 +277,7 @@ pub(crate) fn create_branch( startpoint_str: &str, tenantid: &ZTenantId, ) -> Result { - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?; if conf.branch_path(branchname, tenantid).exists() { anyhow::bail!("branch {} already exists", branchname); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 1c440e2d87..cd6b84b22f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -131,7 +131,7 @@ async fn branch_detail_handler(request: Request) -> Result, let path = conf.branch_path(branch_name, &tenantid); let response_data = tokio::task::spawn_blocking(move || { - let repo = tenant_mgr::get_repository_for_tenant(&tenantid)?; + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; BranchInfo::from_path(path, conf, &tenantid, &repo) }) .await diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f8f2989344..f38a4b9eed 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,7 +10,7 @@ // *callmemaybe $url* -- ask pageserver to start walreceiver on $url // -use anyhow::{anyhow, bail, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; @@ -230,10 +230,7 @@ impl PageServerHandler { tenantid: ZTenantId, ) -> anyhow::Result<()> { // Check that the timeline exists - let repository = tenant_mgr::get_repository_for_tenant(&tenantid)?; - let timeline = repository - .get_timeline(timelineid) - .context(format!("error fetching timeline {}", timelineid))?; + let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; /* switch client to COPYBOTH */ pgb.write_message(&BeMessage::CopyBothResponse)?; @@ -392,10 +389,8 @@ impl PageServerHandler { tenantid: ZTenantId, ) -> anyhow::Result<()> { // check that the timeline exists - let repository = tenant_mgr::get_repository_for_tenant(&tenantid)?; - let timeline = repository - .get_timeline(timelineid) - .context(format!("error fetching timeline {}", timelineid))?; + let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; + /* switch client to COPYOUT */ pgb.write_message(&BeMessage::CopyOutResponse)?; info!("sent CopyOut"); @@ -533,10 +528,7 @@ impl postgres_backend::Handler for PageServerHandler { self.check_permission(Some(tenantid))?; // Check that the timeline exists - let repository = tenant_mgr::get_repository_for_tenant(&tenantid)?; - repository - .get_timeline(timelineid) - .context(format!("error fetching timeline {}", timelineid))?; + tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned()); @@ -631,7 +623,7 @@ impl postgres_backend::Handler for PageServerHandler { .map(|h| h.as_str().parse()) .unwrap_or(Ok(self.conf.gc_horizon))?; - let repo = tenant_mgr::get_repository_for_tenant(&tenantid)?; + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 920bcaa251..3258ab2567 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,17 +3,17 @@ use crate::branches; use crate::layered_repository::LayeredRepository; -use crate::repository::Repository; +use crate::repository::{Repository, Timeline}; use crate::walredo::PostgresRedoManager; use crate::PageServerConf; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use lazy_static::lazy_static; use log::info; use std::collections::HashMap; use std::fs; use std::str::FromStr; use std::sync::{Arc, Mutex}; -use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; lazy_static! { pub static ref REPOSITORY: Mutex>> = @@ -67,9 +67,18 @@ pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc Result> { +pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result> { let o = &REPOSITORY.lock().unwrap(); - o.get(tenantid) + o.get(&tenantid) .map(|repo| Arc::clone(repo)) .ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid)) } + +pub fn get_timeline_for_tenant( + tenantid: ZTenantId, + timelineid: ZTimelineId, +) -> Result> { + get_repository_for_tenant(tenantid)? + .get_timeline(timelineid) + .with_context(|| format!("cannot fetch timeline {}", timelineid)) +} diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 69721be480..fc9afb95cf 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -72,7 +72,7 @@ pub fn launch_wal_receiver( let _walreceiver_thread = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(move || { - thread_main(conf, timelineid, &tenantid); + thread_main(conf, timelineid, tenantid); }) .unwrap(); } @@ -93,7 +93,7 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { // // This is the entry point for the WAL receiver thread. // -fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: &ZTenantId) { +fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId) { info!( "WAL receiver thread started for timeline : '{}'", timelineid @@ -123,7 +123,7 @@ fn walreceiver_main( conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, - tenantid: &ZTenantId, + tenantid: ZTenantId, ) -> Result<(), Error> { // Connect to the database in replication mode. info!("connecting to {:?}", wal_producer_connstr); @@ -149,8 +149,7 @@ fn walreceiver_main( let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; - let repository = tenant_mgr::get_repository_for_tenant(tenantid)?; - let timeline = repository.get_timeline(timelineid).unwrap(); + let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; // // Start streaming the WAL, from where we left off previously. @@ -199,7 +198,7 @@ fn walreceiver_main( &timelineid, pg_constants::WAL_SEGMENT_SIZE, data, - tenantid, + &tenantid, )?; trace!("received XLogData between {} and {}", startlsn, endlsn); @@ -260,7 +259,7 @@ fn walreceiver_main( &timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn, - tenantid, + &tenantid, )?; if newest_segno - oldest_segno >= 10 {