add tenant id tracking to safekeeper

Previously timelines were namespaced only by ZTimelineId, so this patch
adds ZTenant id to the key of a hashtable

closes #381
This commit is contained in:
Dmitry Rodionov
2021-09-01 12:37:17 +03:00
committed by Dmitry
parent 59c19d6e18
commit 3c5452da88
3 changed files with 29 additions and 16 deletions

View File

@@ -46,8 +46,8 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT
let me_conf: Config = me_connstr.parse().unwrap();
let (host, port) = connection_host_port(&me_conf);
let callme = format!(
"callmemaybe {} {} host={} port={} options='-c ztimelineid={}'",
tenantid, timelineid, host, port, timelineid,
"callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'",
tenantid, timelineid, host, port, timelineid, tenantid,
);
loop {

View File

@@ -6,14 +6,14 @@ use crate::receive_wal::ReceiveWalConn;
use crate::replication::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use bytes::Bytes;
use std::str::FromStr;
use std::sync::Arc;
use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor};
use zenith_utils::zid::ZTimelineId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::timeline::CreateControlFile;
@@ -23,19 +23,25 @@ pub struct SendWalHandler {
pub conf: WalAcceptorConf,
/// assigned application name
pub appname: Option<String>,
pub tenantid: Option<ZTenantId>,
pub timelineid: Option<ZTimelineId>,
pub timeline: Option<Arc<Timeline>>,
}
impl postgres_backend::Handler for SendWalHandler {
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
match sm.params.get("ztimelineid") {
Some(ref ztimelineid) => {
let ztlid = ZTimelineId::from_str(ztimelineid)?;
self.timelineid = Some(ztlid);
}
_ => bail!("timelineid is required"),
}
let ztimelineid = sm
.params
.get("ztimelineid")
.ok_or(anyhow!("timelineid is required"))?;
self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?);
let ztenantid = sm
.params
.get("ztenantid")
.ok_or(anyhow!("tenantid is required"))?;
self.tenantid = Some(ZTenantId::from_str(ztenantid)?);
if let Some(app_name) = sm.params.get("application_name") {
self.appname = Some(app_name.clone());
}
@@ -48,12 +54,14 @@ impl postgres_backend::Handler for SendWalHandler {
if query_string.starts_with(b"START_WAL_PUSH") {
self.timeline.set(
&self.conf,
self.tenantid.unwrap(),
self.timelineid.unwrap(),
CreateControlFile::True,
)?;
} else {
self.timeline.set(
&self.conf,
self.tenantid.unwrap(),
self.timelineid.unwrap(),
CreateControlFile::False,
)?;
@@ -79,6 +87,7 @@ impl SendWalHandler {
SendWalHandler {
conf,
appname: None,
tenantid: None,
timelineid: None,
timeline: None,
}

View File

@@ -14,7 +14,7 @@ use std::sync::{Arc, Condvar, Mutex};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTimelineId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};
use crate::safekeeper::{
@@ -258,9 +258,11 @@ pub trait TimelineTools {
fn set(
&mut self,
conf: &WalAcceptorConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
create: CreateControlFile,
) -> Result<()>;
fn get(&self) -> &Arc<Timeline>;
}
@@ -268,13 +270,14 @@ impl TimelineTools for Option<Arc<Timeline>> {
fn set(
&mut self,
conf: &WalAcceptorConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
create: CreateControlFile,
) -> Result<()> {
// We will only set the timeline once. If it were to ever change,
// anyone who cloned the Arc would be out of date.
assert!(self.is_none());
*self = Some(GlobalTimelines::get(conf, timeline_id, create)?);
*self = Some(GlobalTimelines::get(conf, tenant_id, timeline_id, create)?);
Ok(())
}
@@ -284,7 +287,7 @@ impl TimelineTools for Option<Arc<Timeline>> {
}
lazy_static! {
pub static ref TIMELINES: Mutex<HashMap<ZTimelineId, Arc<Timeline>>> =
pub static ref TIMELINES: Mutex<HashMap<(ZTenantId, ZTimelineId), Arc<Timeline>>> =
Mutex::new(HashMap::new());
}
@@ -296,12 +299,13 @@ impl GlobalTimelines {
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &WalAcceptorConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
create: CreateControlFile,
) -> Result<Arc<Timeline>> {
let mut timelines = TIMELINES.lock().unwrap();
match timelines.get(&timeline_id) {
match timelines.get(&(tenant_id, timeline_id)) {
Some(result) => Ok(Arc::clone(result)),
None => {
info!(
@@ -313,7 +317,7 @@ impl GlobalTimelines {
let shared_state = SharedState::create_restore(conf, timeline_id, create)?;
let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
timelines.insert(timeline_id, Arc::clone(&new_tli));
timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli));
Ok(new_tli)
}
}