Centralize suspending/resuming timeline activity on safekeepers.

Timeline is active whenever there is at least 1 connection from compute or
pageserver is not caught up. Currently 'active' means callmemaybes are being
sent.

Fixes race: now suspend condition checking and callmemaybe unsubscribe happen
under the same lock.
This commit is contained in:
Arseny Sher
2022-01-10 23:48:34 +03:00
parent d69b0539ba
commit 729ac38ea8
3 changed files with 213 additions and 144 deletions

View File

@@ -5,6 +5,7 @@
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use bytes::BytesMut;
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;
use crate::timeline::Timeline;
@@ -18,10 +19,8 @@ use crate::handler::SafekeeperPostgresHandler;
use crate::timeline::TimelineTools;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage};
use zenith_utils::zid::ZTenantId;
use crate::callmemaybe::CallmeEvent;
use crate::callmemaybe::SubscriptionStateKey;
pub struct ReceiveWalConn<'pg> {
/// Postgres connection
@@ -82,50 +81,23 @@ impl<'pg> ReceiveWalConn<'pg> {
let mut msg = self
.read_msg()
.context("failed to receive proposer greeting")?;
let tenant_id: ZTenantId;
match msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with wal proposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
);
tenant_id = greeting.tenant_id;
}
_ => bail!("unexpected message {:?} instead of greeting", msg),
}
// Incoming WAL stream resumed, so reset information about the timeline pause.
spg.timeline.get().continue_streaming();
// if requested, ask pageserver to fetch wal from us
// as long as this wal_stream is alive, callmemaybe thread
// will send requests to pageserver
let _guard = match self.pageserver_connstr {
Some(ref pageserver_connstr) => {
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver
// we should use callmemaybe mechanism.
let timeline_id = spg.timeline.get().zttid.timeline_id;
let subscription_key = SubscriptionStateKey::new(
tenant_id,
timeline_id,
pageserver_connstr.to_owned(),
);
spg.tx
.send(CallmeEvent::Subscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Subscribe request to callmemaybe thread {}",
e
);
});
// create a guard to unsubscribe callback, when this wal_stream will exit
Some(SendWalHandlerGuard {
timeline: Arc::clone(spg.timeline.get()),
})
}
None => None,
// Register the connection and defer unregister.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref(), &spg.tx)?;
let _guard = ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
callmemaybe_tx: spg.tx.clone(),
};
loop {
@@ -142,12 +114,15 @@ impl<'pg> ReceiveWalConn<'pg> {
}
}
struct SendWalHandlerGuard {
struct ComputeConnectionGuard {
timeline: Arc<Timeline>,
callmemaybe_tx: UnboundedSender<CallmeEvent>,
}
impl Drop for SendWalHandlerGuard {
impl Drop for ComputeConnectionGuard {
fn drop(&mut self) {
self.timeline.stop_streaming();
self.timeline
.on_compute_disconnect(&self.callmemaybe_tx)
.unwrap();
}
}

View File

@@ -283,12 +283,14 @@ impl ReplicationConn {
if spg.appname == Some("wal_proposer_recovery".to_string()) {
None
} else {
let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let tenant_id = spg.ztenantid.unwrap();
let timeline_id = spg.timeline.get().zttid.timeline_id;
let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let zttid = spg.timeline.get().zttid;
let tx_clone = spg.tx.clone();
let subscription_key =
SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr.clone());
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.clone(),
);
spg.tx
.send(CallmeEvent::Pause(subscription_key))
.unwrap_or_else(|e| {
@@ -298,8 +300,8 @@ impl ReplicationConn {
// create a guard to subscribe callback again, when this connection will exit
Some(ReplicationStreamGuard {
tx: tx_clone,
tenant_id,
timeline_id,
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
pageserver_connstr,
})
}
@@ -324,21 +326,10 @@ impl ReplicationConn {
if let Some(lsn) = lsn {
end_pos = lsn;
} else {
// Is it time to end streaming to this replica?
if spg.timeline.get().check_stop_streaming(replica_id) {
// this expect should never fail because in wal_proposer_recovery mode stop_pos is set
// and this code is not reachable
let pageserver_connstr = pageserver_connstr
.expect("there should be a pageserver connection string");
let tenant_id = spg.ztenantid.unwrap();
let timeline_id = spg.timeline.get().zttid.timeline_id;
let subscription_key =
SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr);
spg.tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
// TODO: also check once in a while whether we are walsender
// to right pageserver.
if spg.timeline.get().check_deactivate(replica_id, &spg.tx)? {
// Shut down, timeline is suspended.
// TODO create proper error type for this
bail!("end streaming to {:?}", spg.appname);
}

View File

@@ -12,12 +12,14 @@ use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;
use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantTimelineId;
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
Storage, SK_FORMAT_VERSION, SK_MAGIC,
@@ -70,18 +72,25 @@ impl ReplicaState {
}
}
/// Shared state associated with database instance (tenant)
/// Shared state associated with database instance
struct SharedState {
/// Safekeeper object
sk: SafeKeeper<FileStorage>,
/// For receiving-sending wal cooperation
/// quorum commit LSN we've notified walsenders about
notified_commit_lsn: Lsn,
// Set stop_lsn to inform WAL senders that it's time to stop sending WAL,
// so that it send all wal up stop_lsn and can safely exit streaming connections.
stop_lsn: Option<Lsn>,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// Inactive clusters shouldn't occupy any resources, so timeline is
/// activated whenever there is a compute connection or pageserver is not
/// caughtup (it must have latest WAL for new compute start) and suspended
/// otherwise.
///
/// TODO: it might be better to remove tli completely from GlobalTimelines
/// when tli is inactive instead of having this flag.
active: bool,
num_computes: u32,
pageserver_connstr: Option<String>,
}
// A named boolean.
@@ -102,6 +111,118 @@ lazy_static! {
}
impl SharedState {
/// Restore SharedState from control file.
/// If create=false and file doesn't exist, bails out.
fn create_restore(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let state = FileStorage::load_control_file_conf(conf, zttid, create)
.context("failed to load from control file")?;
let file_storage = FileStorage::new(zttid, conf);
let flush_lsn = if state.server.wal_seg_size != 0 {
let wal_dir = conf.timeline_dir(zttid);
find_end_of_wal(
&wal_dir,
state.server.wal_seg_size as usize,
true,
state.wal_start_lsn,
)?
.0
} else {
0
};
Ok(Self {
notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state),
replicas: Vec::new(),
active: false,
num_computes: 0,
pageserver_connstr: None,
})
}
/// Activate the timeline: start/change walsender (via callmemaybe).
fn activate(
&mut self,
zttid: &ZTenantTimelineId,
pageserver_connstr: Option<&String>,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
// unsub old sub. xxx: callmemaybe is going out
let old_subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(old_subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
}
if let Some(pageserver_connstr) = pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
// xx: sending to channel under lock is not very cool, but
// shouldn't be a problem here. If it is, we can grab a counter
// here and later augment channel messages with it.
callmemaybe_tx
.send(CallmeEvent::Subscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Subscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is subscribed to callmemaybe to {}",
zttid.timeline_id, pageserver_connstr
);
}
self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned());
self.active = true;
Ok(())
}
/// Deactivate the timeline: stop callmemaybe.
fn deactivate(
&mut self,
zttid: &ZTenantTimelineId,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
if self.active {
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Unsubscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is unsubscribed from callmemaybe to {}",
zttid.timeline_id,
self.pageserver_connstr.as_ref().unwrap()
);
}
self.active = false;
}
Ok(())
}
/// Get combined state of all alive replicas
pub fn get_replicas_state(&self) -> ReplicaState {
let mut acc = ReplicaState::new();
@@ -159,37 +280,6 @@ impl SharedState {
self.replicas.push(Some(state));
pos
}
/// Restore SharedState from control file.
/// If create=false and file doesn't exist, bails out.
fn create_restore(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let state = FileStorage::load_control_file_conf(conf, zttid, create)
.context("failed to load from control file")?;
let file_storage = FileStorage::new(zttid, conf);
let flush_lsn = if state.server.wal_seg_size != 0 {
let wal_dir = conf.timeline_dir(zttid);
find_end_of_wal(
&wal_dir,
state.server.wal_seg_size as usize,
true,
state.wal_start_lsn,
)?
.0
} else {
0
};
Ok(Self {
notified_commit_lsn: Lsn(0),
stop_lsn: None,
sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state),
replicas: Vec::new(),
})
}
}
/// Database instance (tenant)
@@ -209,6 +299,67 @@ impl Timeline {
}
}
/// Register compute connection, starting timeline-related activity if it is
/// not running yet.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_connect(
&self,
pageserver_connstr: Option<&String>,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.activate(&self.zttid, pageserver_connstr, callmemaybe_tx)?;
Ok(())
}
/// De-register compute connection, shutting down timeline activity if
/// pageserver doesn't need catchup.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_disconnect(
&self,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
// If there is no pageserver, can suspend right away; otherwise let
// walsender do that.
if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() {
shared_state.deactivate(&self.zttid, callmemaybe_tx)?;
}
Ok(())
}
/// Deactivate tenant if there is no computes and pageserver is caughtup,
/// assuming the pageserver status is in replica_id.
/// Returns true if deactivated.
pub fn check_deactivate(
&self,
replica_id: usize,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
if !shared_state.active {
// already suspended
return Ok(true);
}
if shared_state.num_computes == 0 {
let replica_state = shared_state.replicas[replica_id].unwrap();
let deactivate = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
(replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.last_received_lsn >= shared_state.sk.commit_lsn);
if deactivate {
shared_state.deactivate(&self.zttid, callmemaybe_tx)?;
return Ok(true);
}
}
Ok(false)
}
/// Timed wait for an LSN to be committed.
///
/// Returns the last committed LSN, which will be at least
@@ -242,54 +393,6 @@ impl Timeline {
}
}
// Notify WAL senders that it's time to stop sending WAL
pub fn stop_streaming(&self) {
let mut shared_state = self.mutex.lock().unwrap();
// Ensure that safekeeper sends WAL up to the last known committed LSN.
// It guarantees that pageserver will receive all the latest data
// before walservice disconnects.
shared_state.stop_lsn = Some(shared_state.notified_commit_lsn);
trace!(
"Stopping WAL senders. stop_lsn: {}",
shared_state.notified_commit_lsn
);
}
// Reset stop_lsn notification,
// so that WAL senders will continue sending WAL
pub fn continue_streaming(&self) {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.stop_lsn = None;
}
// Check if it's time to stop streaming to the given replica.
//
// Do not stop streaming until replica is caught up with the stop_lsn.
// This is not necessary for correctness, just an optimization to
// be able to remove WAL from safekeeper and decrease amount of work
// on the next start.
pub fn check_stop_streaming(&self, replica_id: usize) -> bool {
let shared_state = self.mutex.lock().unwrap();
// If stop_lsn is set, it's time to shutdown streaming.
if let Some(stop_lsn_request) = shared_state.stop_lsn {
let replica_state = shared_state.replicas[replica_id].unwrap();
// There is no data to stream, so other clauses don't matter.
if shared_state.notified_commit_lsn == Lsn(0) {
return true;
}
// Lsn::MAX means that we don't know the latest LSN yet.
// That may be a new replica, give it a chance to catch up.
if replica_state.last_received_lsn != Lsn::MAX
// If replica is fully caught up, disconnect it.
&& stop_lsn_request <= replica_state.last_received_lsn
{
return true;
}
}
false
}
/// Pass arrived message to the safekeeper.
pub fn process_msg(
&self,