diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 611f36abed..2df08b126b 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -46,8 +46,8 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT let me_conf: Config = me_connstr.parse().unwrap(); let (host, port) = connection_host_port(&me_conf); let callme = format!( - "callmemaybe {} {} host={} port={} options='-c ztimelineid={}'", - tenantid, timelineid, host, port, timelineid, + "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'", + tenantid, timelineid, host, port, timelineid, tenantid, ); loop { diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index f72811fbea..e51658d733 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -6,14 +6,14 @@ use crate::receive_wal::ReceiveWalConn; use crate::replication::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use std::str::FromStr; use std::sync::Arc; use zenith_utils::postgres_backend; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor}; -use zenith_utils::zid::ZTimelineId; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::timeline::CreateControlFile; @@ -23,19 +23,25 @@ pub struct SendWalHandler { pub conf: WalAcceptorConf, /// assigned application name pub appname: Option, + pub tenantid: Option, pub timelineid: Option, pub timeline: Option>, } impl postgres_backend::Handler for SendWalHandler { fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> { - match sm.params.get("ztimelineid") { - Some(ref ztimelineid) => { - let ztlid = ZTimelineId::from_str(ztimelineid)?; - self.timelineid = Some(ztlid); - } - _ => bail!("timelineid is required"), - } + let ztimelineid = sm + .params + .get("ztimelineid") + .ok_or(anyhow!("timelineid is required"))?; + self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?); + + let ztenantid = sm + .params + .get("ztenantid") + .ok_or(anyhow!("tenantid is required"))?; + self.tenantid = Some(ZTenantId::from_str(ztenantid)?); + if let Some(app_name) = sm.params.get("application_name") { self.appname = Some(app_name.clone()); } @@ -48,12 +54,14 @@ impl postgres_backend::Handler for SendWalHandler { if query_string.starts_with(b"START_WAL_PUSH") { self.timeline.set( &self.conf, + self.tenantid.unwrap(), self.timelineid.unwrap(), CreateControlFile::True, )?; } else { self.timeline.set( &self.conf, + self.tenantid.unwrap(), self.timelineid.unwrap(), CreateControlFile::False, )?; @@ -79,6 +87,7 @@ impl SendWalHandler { SendWalHandler { conf, appname: None, + tenantid: None, timelineid: None, timeline: None, } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 17007fc146..f087a89f17 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, Condvar, Mutex}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; -use zenith_utils::zid::ZTimelineId; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; use crate::safekeeper::{ @@ -258,9 +258,11 @@ pub trait TimelineTools { fn set( &mut self, conf: &WalAcceptorConf, + tenant_id: ZTenantId, timeline_id: ZTimelineId, create: CreateControlFile, ) -> Result<()>; + fn get(&self) -> &Arc; } @@ -268,13 +270,14 @@ impl TimelineTools for Option> { fn set( &mut self, conf: &WalAcceptorConf, + tenant_id: ZTenantId, timeline_id: ZTimelineId, create: CreateControlFile, ) -> Result<()> { // We will only set the timeline once. If it were to ever change, // anyone who cloned the Arc would be out of date. assert!(self.is_none()); - *self = Some(GlobalTimelines::get(conf, timeline_id, create)?); + *self = Some(GlobalTimelines::get(conf, tenant_id, timeline_id, create)?); Ok(()) } @@ -284,7 +287,7 @@ impl TimelineTools for Option> { } lazy_static! { - pub static ref TIMELINES: Mutex>> = + pub static ref TIMELINES: Mutex>> = Mutex::new(HashMap::new()); } @@ -296,12 +299,13 @@ impl GlobalTimelines { /// If control file doesn't exist and create=false, bails out. pub fn get( conf: &WalAcceptorConf, + tenant_id: ZTenantId, timeline_id: ZTimelineId, create: CreateControlFile, ) -> Result> { let mut timelines = TIMELINES.lock().unwrap(); - match timelines.get(&timeline_id) { + match timelines.get(&(tenant_id, timeline_id)) { Some(result) => Ok(Arc::clone(result)), None => { info!( @@ -313,7 +317,7 @@ impl GlobalTimelines { let shared_state = SharedState::create_restore(conf, timeline_id, create)?; let new_tli = Arc::new(Timeline::new(timeline_id, shared_state)); - timelines.insert(timeline_id, Arc::clone(&new_tli)); + timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli)); Ok(new_tli) } }