walkeeper: use named type as a key in callmemaybe subscriptions hashmap

This commit is contained in:
Dmitry Rodionov
2022-01-17 22:34:35 +03:00
committed by Dmitry Rodionov
parent 39591ef627
commit 458bc0c838
4 changed files with 70 additions and 54 deletions

View File

@@ -64,18 +64,35 @@ pub fn thread_main(conf: SafeKeeperConf, rx: UnboundedReceiver<CallmeEvent>) ->
runtime.block_on(main_loop(conf, rx))
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct SubscriptionStateKey {
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
pageserver_connstr: String,
}
impl SubscriptionStateKey {
pub fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, pageserver_connstr: String) -> Self {
Self {
tenant_id,
timeline_id,
pageserver_connstr,
}
}
}
/// Messages to the callmemaybe thread
#[derive(Debug)]
pub enum CallmeEvent {
// add new subscription to the list
Subscribe(ZTenantId, ZTimelineId, String),
Subscribe(SubscriptionStateKey),
// remove the subscription from the list
Unsubscribe(ZTenantId, ZTimelineId, String),
Unsubscribe(SubscriptionStateKey),
// don't serve this subscription, but keep it in the list
Pause(ZTenantId, ZTimelineId, String),
Pause(SubscriptionStateKey),
// resume this subscription, if it exists,
// but don't create a new one if it is gone
Resume(ZTenantId, ZTimelineId, String),
Resume(SubscriptionStateKey),
// TODO how do we delete from subscriptions?
}
@@ -194,7 +211,7 @@ impl Drop for SubscriptionState {
}
pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEvent>) -> Result<()> {
let subscriptions: Mutex<HashMap<(ZTenantId, ZTimelineId, String), SubscriptionState>> =
let subscriptions: Mutex<HashMap<SubscriptionStateKey, SubscriptionState>> =
Mutex::new(HashMap::new());
let mut ticker = tokio::time::interval(conf.recall_period);
@@ -204,11 +221,13 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
{
match request.context("done")?
{
CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) =>
CallmeEvent::Subscribe(key) =>
{
let _enter = info_span!("callmemaybe: subscribe", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered();
let _enter = info_span!("callmemaybe: subscribe", timelineid = %key.timeline_id, tenantid = %key.tenant_id, pageserver_connstr=%key.pageserver_connstr.clone()).entered();
let mut subscriptions = subscriptions.lock().unwrap();
match subscriptions.entry((tenantid, timelineid, pageserver_connstr.clone())) {
// XXX this clone is ugly, is there a way to use the trick with Borrow trait with entry API?
// when we switch to node id instead of the connection string key will be Copy and there will be no need to clone
match subscriptions.entry(key.clone()) {
Entry::Occupied(_) => {
// Do nothing if subscription already exists
// If it is paused it means that there is already established replication connection.
@@ -220,29 +239,29 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
}
Entry::Vacant(entry) => {
let subscription = entry.insert(SubscriptionState::new(
tenantid,
timelineid,
pageserver_connstr,
key.tenant_id,
key.timeline_id,
key.pageserver_connstr,
));
subscription.call(conf.recall_period, conf.listen_pg_addr.clone());
}
}
},
CallmeEvent::Unsubscribe(tenantid, timelineid, pageserver_connstr) => {
let _enter = debug_span!("callmemaybe: unsubscribe", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered();
CallmeEvent::Unsubscribe(key) => {
let _enter = debug_span!("callmemaybe: unsubscribe", timelineid = %key.timeline_id, tenantid = %key.tenant_id, pageserver_connstr=%key.pageserver_connstr.clone()).entered();
debug!("unsubscribe");
let mut subscriptions = subscriptions.lock().unwrap();
subscriptions.remove(&(tenantid, timelineid, pageserver_connstr));
subscriptions.remove(&key);
},
CallmeEvent::Pause(tenantid, timelineid, pageserver_connstr) => {
let _enter = debug_span!("callmemaybe: pause", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered();
CallmeEvent::Pause(key) => {
let _enter = debug_span!("callmemaybe: pause", timelineid = %key.timeline_id, tenantid = %key.tenant_id, pageserver_connstr=%key.pageserver_connstr.clone()).entered();
let mut subscriptions = subscriptions.lock().unwrap();
// If pause received when no corresponding subscription exists it means that someone started replication
// without using callmemaybe. So we create subscription and pause it.
// In tenant relocation scenario subscribe call will be executed after pause when compute is restarted.
// In that case there is no need to create new/unpause existing subscription.
match subscriptions.entry((tenantid, timelineid, pageserver_connstr.clone())) {
match subscriptions.entry(key.clone()) {
Entry::Occupied(mut sub) => {
debug!("pause existing");
sub.get_mut().pause();
@@ -250,19 +269,21 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
Entry::Vacant(entry) => {
debug!("create paused");
let subscription = entry.insert(SubscriptionState::new(
tenantid,
timelineid,
pageserver_connstr,
key.tenant_id,
key.timeline_id,
key.pageserver_connstr,
));
subscription.pause();
}
}
},
CallmeEvent::Resume(tenantid, timelineid, pageserver_connstr) => {
debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={} pageserver_connstr={}",
timelineid, tenantid, pageserver_connstr);
CallmeEvent::Resume(key) => {
debug!(
"callmemaybe. thread_main. resume callback request for timelineid={} tenantid={} pageserver_connstr={}",
key.timeline_id, key.tenant_id, key.pageserver_connstr,
);
let mut subscriptions = subscriptions.lock().unwrap();
if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid, pageserver_connstr))
if let Some(sub) = subscriptions.get_mut(&key)
{
sub.resume();
};
@@ -270,9 +291,10 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
}
},
_ = ticker.tick() => {
let _enter = debug_span!("callmemaybe: tick").entered();
let mut subscriptions = subscriptions.lock().unwrap();
for (&(_tenantid, _timelineid, _), state) in subscriptions.iter_mut() {
for (_, state) in subscriptions.iter_mut() {
state.call(conf.recall_period, conf.listen_pg_addr.clone());
}
},

View File

@@ -21,6 +21,7 @@ use zenith_utils::pq_proto::{BeMessage, FeMessage};
use zenith_utils::zid::ZTenantId;
use crate::callmemaybe::CallmeEvent;
use crate::callmemaybe::SubscriptionStateKey;
pub struct ReceiveWalConn<'pg> {
/// Postgres connection
@@ -104,13 +105,14 @@ impl<'pg> ReceiveWalConn<'pg> {
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver
// we should use callmemaybe mechanism.
let timelineid = spg.timeline.get().timelineid;
let timeline_id = spg.timeline.get().timeline_id;
let subscription_key = SubscriptionStateKey::new(
tenant_id,
timeline_id,
pageserver_connstr.to_owned(),
);
spg.tx
.send(CallmeEvent::Subscribe(
tenant_id,
timelineid,
pageserver_connstr.to_owned(),
))
.send(CallmeEvent::Subscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Subscribe request to callmemaybe thread {}",

View File

@@ -26,7 +26,7 @@ use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody};
use zenith_utils::sock_split::ReadStream;
use crate::callmemaybe::CallmeEvent;
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use tokio::sync::mpsc::UnboundedSender;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
@@ -90,7 +90,7 @@ impl Drop for ReplicationConnGuard {
struct ReplicationStreamGuard {
tx: UnboundedSender<CallmeEvent>,
tenant_id: ZTenantId,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
pageserver_connstr: String,
}
@@ -110,11 +110,7 @@ impl Drop for ReplicationStreamGuard {
);
self.tx
.send(CallmeEvent::Resume(
self.tenant_id,
self.timelineid,
self.pageserver_connstr.to_owned(),
))
.send(CallmeEvent::Resume(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Resume request to callmemaybe thread {}", e);
});
@@ -271,15 +267,13 @@ impl ReplicationConn {
None
} else {
let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let timelineid = spg.timeline.get().timelineid;
let timeline_id = spg.timeline.get().timeline_id;
let tenant_id = spg.ztenantid.unwrap();
let tx_clone = spg.tx.clone();
let subscription_key =
SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr.clone());
spg.tx
.send(CallmeEvent::Pause(
tenant_id,
timelineid,
pageserver_connstr.clone(),
))
.send(CallmeEvent::Pause(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
@@ -288,7 +282,7 @@ impl ReplicationConn {
Some(ReplicationStreamGuard {
tx: tx_clone,
tenant_id,
timelineid,
timeline_id,
pageserver_connstr,
})
}
@@ -318,15 +312,13 @@ impl ReplicationConn {
// this expect should never fail because in wal_proposer_recovery mode stop_pos is set
// and this code is not reachable
let pageserver_connstr = pageserver_connstr
.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
.expect("there should be a pageserver connection string");
let timelineid = spg.timeline.get().timeline_id;
let tenant_id = spg.ztenantid.unwrap();
let subscription_key =
SubscriptionStateKey::new(tenant_id, timelineid, pageserver_connstr);
spg.tx
.send(CallmeEvent::Unsubscribe(
tenant_id,
timelineid,
pageserver_connstr,
))
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
@@ -352,7 +344,7 @@ impl ReplicationConn {
// Open a new file.
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let timeline_id = spg.timeline.get().timelineid;
let timeline_id = spg.timeline.get().timeline_id;
let wal_file_path = spg.conf.timeline_dir(&timeline_id).join(wal_file_name);
Self::open_wal_file(&wal_file_path)?
}

View File

@@ -166,7 +166,7 @@ impl SharedState {
/// Database instance (tenant)
pub struct Timeline {
pub timelineid: ZTimelineId,
pub timeline_id: ZTimelineId,
mutex: Mutex<SharedState>,
/// conditional variable used to notify wal senders
cond: Condvar,
@@ -175,7 +175,7 @@ pub struct Timeline {
impl Timeline {
fn new(timelineid: ZTimelineId, shared_state: SharedState) -> Timeline {
Timeline {
timelineid,
timeline_id: timelineid,
mutex: Mutex::new(shared_state),
cond: Condvar::new(),
}