mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
send CallmeEvent::Unsubscribe request only when pageserver is caught up with safekeeper and it's time to stop streaming
This commit is contained in:
@@ -25,7 +25,7 @@ async fn request_callback(
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<()> {
|
||||
debug!(
|
||||
info!(
|
||||
"callmemaybe request_callback Connecting to pageserver {}",
|
||||
&pageserver_connstr
|
||||
);
|
||||
@@ -75,6 +75,7 @@ pub enum CallmeEvent {
|
||||
Resume(ZTenantId, ZTimelineId),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SubscriptionState {
|
||||
tenantid: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
@@ -136,7 +137,7 @@ impl SubscriptionState {
|
||||
fn call(&mut self, recall_period: Duration, listen_pg_addr: String) {
|
||||
// Ignore call request if this subscription is paused
|
||||
if self.paused {
|
||||
debug!(
|
||||
info!(
|
||||
"ignore call request for paused subscription
|
||||
tenantid: {}, timelineid: {}",
|
||||
self.tenantid, self.timelineid
|
||||
@@ -146,7 +147,7 @@ impl SubscriptionState {
|
||||
|
||||
// Check if it too early to recall
|
||||
if self.handle.is_some() && self.last_call_time.elapsed() < recall_period {
|
||||
debug!(
|
||||
info!(
|
||||
"too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?}
|
||||
tenantid: {}, timelineid: {}",
|
||||
self.last_call_time, recall_period, self.tenantid, self.timelineid
|
||||
@@ -174,7 +175,7 @@ impl SubscriptionState {
|
||||
|
||||
// Update last_call_time
|
||||
self.last_call_time = Instant::now();
|
||||
debug!(
|
||||
info!(
|
||||
"new call spawned. time {:?}
|
||||
tenantid: {}, timelineid: {}",
|
||||
self.last_call_time, self.tenantid, self.timelineid
|
||||
@@ -201,18 +202,22 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
|
||||
CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) =>
|
||||
{
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
if let Some(sub) = subscriptions.get(&(tenantid, timelineid))
|
||||
{
|
||||
info!("callmemaybe. subscription already exists {:?}", sub);
|
||||
}
|
||||
if let Some(mut sub) = subscriptions.insert((tenantid, timelineid),
|
||||
SubscriptionState::new(tenantid, timelineid, pageserver_connstr))
|
||||
{
|
||||
sub.call(conf.recall_period, conf.listen_pg_addr.clone());
|
||||
}
|
||||
debug!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}",
|
||||
info!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
CallmeEvent::Unsubscribe(tenantid, timelineid) => {
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
subscriptions.remove(&(tenantid, timelineid));
|
||||
debug!("callmemaybe. thread_main. unsubscribe callback request for timelineid={} tenantid={}",
|
||||
info!("callmemaybe. thread_main. unsubscribe callback. request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
CallmeEvent::Pause(tenantid, timelineid) => {
|
||||
@@ -221,7 +226,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
|
||||
{
|
||||
sub.pause();
|
||||
};
|
||||
debug!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}",
|
||||
info!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
CallmeEvent::Resume(tenantid, timelineid) => {
|
||||
@@ -232,7 +237,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
|
||||
sub.call(conf.recall_period, conf.listen_pg_addr.clone());
|
||||
};
|
||||
|
||||
debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}",
|
||||
info!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
}
|
||||
|
||||
@@ -123,9 +123,9 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
|
||||
// create a guard to unsubscribe callback, when this wal_stream will exit
|
||||
Some(SendWalHandlerGuard {
|
||||
tx: tx_clone,
|
||||
tenant_id,
|
||||
timelineid,
|
||||
_tx: tx_clone,
|
||||
_tenant_id: tenant_id,
|
||||
_timelineid: timelineid,
|
||||
timeline: Arc::clone(spg.timeline.get()),
|
||||
})
|
||||
}
|
||||
@@ -147,22 +147,22 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
struct SendWalHandlerGuard {
|
||||
tx: UnboundedSender<CallmeEvent>,
|
||||
tenant_id: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
_tx: UnboundedSender<CallmeEvent>,
|
||||
_tenant_id: ZTenantId,
|
||||
_timelineid: ZTimelineId,
|
||||
timeline: Arc<Timeline>,
|
||||
}
|
||||
|
||||
impl Drop for SendWalHandlerGuard {
|
||||
fn drop(&mut self) {
|
||||
self.timeline.stop_streaming();
|
||||
self.tx
|
||||
.send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
"failed to send Unsubscribe request to callmemaybe thread {}",
|
||||
e
|
||||
);
|
||||
});
|
||||
// self.tx
|
||||
// .send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid))
|
||||
// .unwrap_or_else(|e| {
|
||||
// error!(
|
||||
// "failed to send Unsubscribe request to callmemaybe thread {}",
|
||||
// e
|
||||
// );
|
||||
// });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,6 +290,13 @@ impl ReplicationConn {
|
||||
} else {
|
||||
// Is is time to end streaming to this replica?
|
||||
if spg.timeline.get().check_stop_streaming(replica_id) {
|
||||
let timelineid = spg.timeline.get().timelineid;
|
||||
let tenant_id = spg.ztenantid.unwrap();
|
||||
spg.tx
|
||||
.send(CallmeEvent::Unsubscribe(tenant_id, timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!("failed to send Pause request to callmemaybe thread {}", e);
|
||||
});
|
||||
// TODO create proper error type for this
|
||||
bail!("end streaming to {:?}", spg.appname);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user