Add get_timeline_for_tenant() to tenant_mgr (#615)

Most of the previous usages of get_repository_for_tenant were followed
by immediately getting a timeline in that repository, without keeping it
around for longer.

The new `get_timeline_for_tenant` function implements that same
behavior, but in one line.
This commit is contained in:
Max Sharnoff
2021-09-16 18:38:21 +01:00
committed by GitHub
parent bbe4f39790
commit 3743344e64
5 changed files with 29 additions and 29 deletions

View File

@@ -257,7 +257,7 @@ pub(crate) fn get_tenants(conf: &PageServerConf) -> Result<Vec<String>> {
}
pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<Vec<BranchInfo>> {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?;
// Each branch has a corresponding record (text file) in the refs/branches
// with timeline_id.
@@ -277,7 +277,7 @@ pub(crate) fn create_branch(
startpoint_str: &str,
tenantid: &ZTenantId,
) -> Result<BranchInfo> {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?;
if conf.branch_path(branchname, tenantid).exists() {
anyhow::bail!("branch {} already exists", branchname);

View File

@@ -131,7 +131,7 @@ async fn branch_detail_handler(request: Request<Body>) -> Result<Response<Body>,
let path = conf.branch_path(branch_name, &tenantid);
let response_data = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(&tenantid)?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
BranchInfo::from_path(path, conf, &tenantid, &repo)
})
.await

View File

@@ -10,7 +10,7 @@
// *callmemaybe <zenith timelineid> $url* -- ask pageserver to start walreceiver on $url
//
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
@@ -230,10 +230,7 @@ impl PageServerHandler {
tenantid: ZTenantId,
) -> anyhow::Result<()> {
// Check that the timeline exists
let repository = tenant_mgr::get_repository_for_tenant(&tenantid)?;
let timeline = repository
.get_timeline(timelineid)
.context(format!("error fetching timeline {}", timelineid))?;
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
/* switch client to COPYBOTH */
pgb.write_message(&BeMessage::CopyBothResponse)?;
@@ -392,10 +389,8 @@ impl PageServerHandler {
tenantid: ZTenantId,
) -> anyhow::Result<()> {
// check that the timeline exists
let repository = tenant_mgr::get_repository_for_tenant(&tenantid)?;
let timeline = repository
.get_timeline(timelineid)
.context(format!("error fetching timeline {}", timelineid))?;
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
/* switch client to COPYOUT */
pgb.write_message(&BeMessage::CopyOutResponse)?;
info!("sent CopyOut");
@@ -533,10 +528,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.check_permission(Some(tenantid))?;
// Check that the timeline exists
let repository = tenant_mgr::get_repository_for_tenant(&tenantid)?;
repository
.get_timeline(timelineid)
.context(format!("error fetching timeline {}", timelineid))?;
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned());
@@ -631,7 +623,7 @@ impl postgres_backend::Handler for PageServerHandler {
.map(|h| h.as_str().parse())
.unwrap_or(Ok(self.conf.gc_horizon))?;
let repo = tenant_mgr::get_repository_for_tenant(&tenantid)?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;

View File

@@ -3,17 +3,17 @@
use crate::branches;
use crate::layered_repository::LayeredRepository;
use crate::repository::Repository;
use crate::repository::{Repository, Timeline};
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Context, Result};
use lazy_static::lazy_static;
use log::info;
use std::collections::HashMap;
use std::fs;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
pub static ref REPOSITORY: Mutex<HashMap<ZTenantId, Arc<dyn Repository>>> =
@@ -67,9 +67,18 @@ pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc<dyn Repositor
o.insert(tenantid, repo);
}
pub fn get_repository_for_tenant(tenantid: &ZTenantId) -> Result<Arc<dyn Repository>> {
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
let o = &REPOSITORY.lock().unwrap();
o.get(tenantid)
o.get(&tenantid)
.map(|repo| Arc::clone(repo))
.ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid))
}
pub fn get_timeline_for_tenant(
tenantid: ZTenantId,
timelineid: ZTimelineId,
) -> Result<Arc<dyn Timeline>> {
get_repository_for_tenant(tenantid)?
.get_timeline(timelineid)
.with_context(|| format!("cannot fetch timeline {}", timelineid))
}

View File

@@ -72,7 +72,7 @@ pub fn launch_wal_receiver(
let _walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
thread_main(conf, timelineid, &tenantid);
thread_main(conf, timelineid, tenantid);
})
.unwrap();
}
@@ -93,7 +93,7 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
//
// This is the entry point for the WAL receiver thread.
//
fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: &ZTenantId) {
fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
info!(
"WAL receiver thread started for timeline : '{}'",
timelineid
@@ -123,7 +123,7 @@ fn walreceiver_main(
conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: &ZTenantId,
tenantid: ZTenantId,
) -> Result<(), Error> {
// Connect to the database in replication mode.
info!("connecting to {:?}", wal_producer_connstr);
@@ -149,8 +149,7 @@ fn walreceiver_main(
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let repository = tenant_mgr::get_repository_for_tenant(tenantid)?;
let timeline = repository.get_timeline(timelineid).unwrap();
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
//
// Start streaming the WAL, from where we left off previously.
@@ -199,7 +198,7 @@ fn walreceiver_main(
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
data,
tenantid,
&tenantid,
)?;
trace!("received XLogData between {} and {}", startlsn, endlsn);
@@ -260,7 +259,7 @@ fn walreceiver_main(
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
last_rec_lsn,
tenantid,
&tenantid,
)?;
if newest_segno - oldest_segno >= 10 {