Stop WAL streaming threads, when compute node is shut down.

WAL stream uses the 2 connections:
1. Compute node (walproposer) -> Safekeeper (ReceiveWalConn module)

When compute node is shut down, safekeeper needs to stop the respective receiving thread.
Prior to this PR it didn't work because PostgresBackend haven't handled disconnection properly.

2. Safekeeper (ReplicationConn module) -> pageserver (walreceiver thread)

When incoming WAL stream is gone, safekeeper can stop streaming WAL and cancel connection as soon as replica is caught up.
Note that the WAL can be streamed to multiple replicas simultaneously, only disconnect ones that are caught up to the last_recieved_lsn.
This commit is contained in:
anastasia
2021-12-13 17:09:29 +03:00
committed by lubennikovaav
parent 90e5b6f983
commit 3b61f364f7
5 changed files with 101 additions and 22 deletions

View File

@@ -7,7 +7,9 @@ use bytes::Bytes;
use bytes::BytesMut;
use tracing::*;
use crate::timeline::Timeline;
use std::net::SocketAddr;
use std::sync::Arc;
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
@@ -92,6 +94,9 @@ impl<'pg> ReceiveWalConn<'pg> {
_ => bail!("unexpected message {:?} instead of greeting", msg),
}
// Incoming WAL stream resumed, so reset information about the timeline pause.
swh.timeline.get().continue_streaming();
// if requested, ask pageserver to fetch wal from us
// as long as this wal_stream is alive, callmemaybe thread
// will send requests to pageserver
@@ -121,6 +126,7 @@ impl<'pg> ReceiveWalConn<'pg> {
tx: tx_clone,
tenant_id,
timelineid,
timeline: Arc::clone(swh.timeline.get()),
})
}
None => None,
@@ -144,10 +150,12 @@ struct SendWalHandlerGuard {
tx: UnboundedSender<CallmeEvent>,
tenant_id: ZTenantId,
timelineid: ZTimelineId,
timeline: Arc<Timeline>,
}
impl Drop for SendWalHandlerGuard {
fn drop(&mut self) {
self.timeline.stop_streaming();
self.tx
.send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid))
.unwrap_or_else(|e| {

View File

@@ -3,7 +3,7 @@
use crate::send_wal::SendWalHandler;
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use postgres_ffi::xlog_utils::{
get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI,
@@ -30,8 +30,6 @@ use crate::callmemaybe::CallmeEvent;
use tokio::sync::mpsc::UnboundedSender;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
// See: https://www.postgresql.org/docs/13/protocol-replication.html
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
@@ -59,7 +57,7 @@ impl HotStandbyFeedback {
/// Standby status update
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // not used
pub write_lsn: Lsn, // last lsn received by pageserver
pub flush_lsn: Lsn, // not used
pub apply_lsn: Lsn, // pageserver's disk consistent lSN
pub reply_ts: TimestampTz,
@@ -99,7 +97,7 @@ impl Drop for ReplicationStreamGuard {
fn drop(&mut self) {
// the connection with pageserver is lost,
// resume callback subscription
info!("Connection to pageserver is gone. Subscribe to callmemeybe again. tenantid {} timelineid {}",
debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}",
self.tenant_id, self.timelineid);
self.tx
@@ -120,11 +118,14 @@ impl ReplicationConn {
/// Handle incoming messages from the network.
/// This is spawned into the background by `handle_start_replication`.
fn background_thread(mut stream_in: ReadStream, timeline: Arc<Timeline>) -> Result<()> {
fn background_thread(
mut stream_in: ReadStream,
timeline: Arc<Timeline>,
replica_id: usize,
) -> Result<()> {
let mut state = ReplicaState::new();
let replica = timeline.add_replica(state);
let _guard = ReplicationConnGuard {
replica,
replica: replica_id,
timeline: timeline.clone(),
};
// Wait for replica's feedback.
@@ -139,13 +140,14 @@ impl ReplicationConn {
// Note: deserializing is on m[1..] because we skip the tag byte.
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
timeline.update_replica_state(replica, Some(state));
timeline.update_replica_state(replica_id, Some(state));
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
.context("failed to deserialize StandbyReply")?;
state.disk_consistent_lsn = reply.apply_lsn;
timeline.update_replica_state(replica, Some(state));
state.last_received_lsn = reply.write_lsn;
timeline.update_replica_state(replica_id, Some(state));
}
_ => warn!("unexpected message {:?}", msg),
}
@@ -212,12 +214,16 @@ impl ReplicationConn {
let bg_timeline = Arc::clone(swh.timeline.get());
let bg_stream_in = self.stream_in.take().unwrap();
let state = ReplicaState::new();
// This replica_id is used below to check if it's time to stop replication.
let replica_id = bg_timeline.add_replica(state);
// TODO: here we got two threads, one for writing WAL and one for receiving
// feedback. If one of them fails, we should shutdown the other one too.
let _ = thread::Builder::new()
.name("HotStandbyFeedback thread".into())
.spawn(move || {
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) {
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) {
error!("Replication background thread failed: {}", err);
}
})
@@ -290,9 +296,17 @@ impl ReplicationConn {
end_pos = stop_pos;
} else {
/* Wait until we have some data to stream */
if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) {
end_pos = lsn
let lsn = swh.timeline.get().wait_for_lsn(start_pos);
if let Some(lsn) = lsn {
end_pos = lsn;
} else {
// Is is time to end streaming to this replica?
if swh.timeline.get().check_stop_streaming(replica_id) {
// TODO create proper error type for this
bail!("end streaming to {:?}", swh.appname);
}
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
@@ -303,9 +317,6 @@ impl ReplicationConn {
continue;
}
}
if end_pos == END_REPLICATION_MARKER {
break;
}
// Take the `File` from `wal_file`, or open a new file.
let mut file = match wal_file.take() {
@@ -345,7 +356,7 @@ impl ReplicationConn {
start_pos += send_size as u64;
debug!("sent WAL up to {}", start_pos);
info!("sent WAL up to {}", start_pos);
// Decide whether to reuse this file. If we don't set wal_file here
// a new file will be opened next time.

View File

@@ -78,7 +78,9 @@ impl postgres_backend::Handler for SendWalHandler {
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
self.handle_identify_system(pgb)?;
} else if query_string.starts_with(b"START_REPLICATION") {
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
ReplicationConn::new(pgb)
.run(self, pgb, &query_string)
.with_context(|| "failed to run ReplicationConn")?;
} else if query_string.starts_with(b"START_WAL_PUSH") {
// TODO: this repeats query decoding logic from page_service so it is probably
// a good idea to refactor it in pgbackend and pass string to process query instead of bytes

View File

@@ -18,7 +18,7 @@ use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};
use crate::replication::HotStandbyFeedback;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
Storage, SK_FORMAT_VERSION, SK_MAGIC,
@@ -38,6 +38,8 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Replica status: host standby feedback + disk consistent lsn
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
/// last known lsn received by replica
pub last_received_lsn: Lsn, // None means we don't know
/// combined disk_consistent_lsn of pageservers
pub disk_consistent_lsn: Lsn,
/// combined hot standby feedback from all replicas
@@ -53,6 +55,7 @@ impl Default for ReplicaState {
impl ReplicaState {
pub fn new() -> ReplicaState {
ReplicaState {
last_received_lsn: Lsn::MAX,
disk_consistent_lsn: Lsn(u64::MAX),
hs_feedback: HotStandbyFeedback {
ts: 0,
@@ -70,6 +73,9 @@ struct SharedState {
/// For receiving-sending wal cooperation
/// quorum commit LSN we've notified walsenders about
notified_commit_lsn: Lsn,
// Set stop_lsn to inform WAL senders that it's time to stop sending WAL,
// so that it send all wal up stop_lsn and can safely exit streaming connections.
stop_lsn: Option<Lsn>,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
}
@@ -92,7 +98,7 @@ lazy_static! {
}
impl SharedState {
/// Get combined stateof all alive replicas
/// Get combined state of all alive replicas
pub fn get_replicas_state(&self) -> ReplicaState {
let mut acc = ReplicaState::new();
for state in self.replicas.iter().flatten() {
@@ -101,6 +107,8 @@ impl SharedState {
acc.hs_feedback.catalog_xmin =
min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin);
acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn);
// currently not used, but update it to be consistent
acc.last_received_lsn = Lsn::min(acc.last_received_lsn, state.last_received_lsn);
}
acc
}
@@ -141,6 +149,7 @@ impl SharedState {
Ok(Self {
notified_commit_lsn: Lsn(0),
stop_lsn: None,
sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state),
replicas: Vec::new(),
})
@@ -197,8 +206,52 @@ impl Timeline {
}
}
fn _stop_wal_senders(&self) {
self.notify_wal_senders(END_REPLICATION_MARKER);
// Notify WAL senders that it's time to stop sending WAL
pub fn stop_streaming(&self) {
let mut shared_state = self.mutex.lock().unwrap();
// Ensure that safekeeper sends WAL up to the last known committed LSN.
// It guarantees that pageserver will receive all the latest data
// before walservice disconnects.
shared_state.stop_lsn = Some(shared_state.notified_commit_lsn);
trace!(
"Stopping WAL senders. stop_lsn: {}",
shared_state.notified_commit_lsn
);
}
// Reset stop_lsn notification,
// so that WAL senders will continue sending WAL
pub fn continue_streaming(&self) {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.stop_lsn = None;
}
// Check if it's time to stop streaming to the given replica.
//
// Do not stop streaming until replica is caught up with the stop_lsn.
// This is not necessary for correctness, just an optimization to
// be able to remove WAL from safekeeper and decrease amount of work
// on the next start.
pub fn check_stop_streaming(&self, replica_id: usize) -> bool {
let shared_state = self.mutex.lock().unwrap();
// If stop_lsn is set, it's time to shutdown streaming.
if let Some(stop_lsn_request) = shared_state.stop_lsn {
let replica_state = shared_state.replicas[replica_id].unwrap();
// There is no data to stream, so other clauses don't matter.
if shared_state.notified_commit_lsn == Lsn(0) {
return true;
}
// Lsn::MAX means that we don't know the latest LSN yet.
// That may be a new replica, give it a chance to catch up.
if replica_state.last_received_lsn != Lsn::MAX
// If replica is fully caught up, disconnect it.
&& stop_lsn_request <= replica_state.last_received_lsn
{
return true;
}
}
false
}
/// Pass arrived message to the safekeeper.
@@ -220,6 +273,7 @@ impl Timeline {
let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
resp.disk_consistent_lsn = state.disk_consistent_lsn;
// XXX Do we need to add state.last_received_lsn to resp?
}
}
// Ping wal sender that new data might be available.

View File

@@ -426,6 +426,10 @@ impl PostgresBackend {
// send that in the ErrorResponse though, because it's not relevant to the
// compute node logs.
warn!("query handler for {:?} failed: {:#}", m.body, e);
if e.to_string().contains("failed to run") {
self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?;
return Ok(ProcessMsgResult::Break);
}
self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?;
}
self.write_message(&BeMessage::ReadyForQuery)?;