From 3b61f364f73b136012594af77a7b42ef73abe1c0 Mon Sep 17 00:00:00 2001 From: anastasia Date: Mon, 13 Dec 2021 17:09:29 +0300 Subject: [PATCH] Stop WAL streaming threads, when compute node is shut down. WAL stream uses the 2 connections: 1. Compute node (walproposer) -> Safekeeper (ReceiveWalConn module) When compute node is shut down, safekeeper needs to stop the respective receiving thread. Prior to this PR it didn't work because PostgresBackend haven't handled disconnection properly. 2. Safekeeper (ReplicationConn module) -> pageserver (walreceiver thread) When incoming WAL stream is gone, safekeeper can stop streaming WAL and cancel connection as soon as replica is caught up. Note that the WAL can be streamed to multiple replicas simultaneously, only disconnect ones that are caught up to the last_recieved_lsn. --- walkeeper/src/receive_wal.rs | 8 ++++ walkeeper/src/replication.rs | 45 ++++++++++++-------- walkeeper/src/send_wal.rs | 4 +- walkeeper/src/timeline.rs | 62 ++++++++++++++++++++++++++-- zenith_utils/src/postgres_backend.rs | 4 ++ 5 files changed, 101 insertions(+), 22 deletions(-) diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 6a3ab62574..c02e0eeff4 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -7,7 +7,9 @@ use bytes::Bytes; use bytes::BytesMut; use tracing::*; +use crate::timeline::Timeline; use std::net::SocketAddr; +use std::sync::Arc; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; @@ -92,6 +94,9 @@ impl<'pg> ReceiveWalConn<'pg> { _ => bail!("unexpected message {:?} instead of greeting", msg), } + // Incoming WAL stream resumed, so reset information about the timeline pause. + swh.timeline.get().continue_streaming(); + // if requested, ask pageserver to fetch wal from us // as long as this wal_stream is alive, callmemaybe thread // will send requests to pageserver @@ -121,6 +126,7 @@ impl<'pg> ReceiveWalConn<'pg> { tx: tx_clone, tenant_id, timelineid, + timeline: Arc::clone(swh.timeline.get()), }) } None => None, @@ -144,10 +150,12 @@ struct SendWalHandlerGuard { tx: UnboundedSender, tenant_id: ZTenantId, timelineid: ZTimelineId, + timeline: Arc, } impl Drop for SendWalHandlerGuard { fn drop(&mut self) { + self.timeline.stop_streaming(); self.tx .send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid)) .unwrap_or_else(|e| { diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index f284be3aed..223a3f231e 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -3,7 +3,7 @@ use crate::send_wal::SendWalHandler; use crate::timeline::{ReplicaState, Timeline, TimelineTools}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; use postgres_ffi::xlog_utils::{ get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, @@ -30,8 +30,6 @@ use crate::callmemaybe::CallmeEvent; use tokio::sync::mpsc::UnboundedSender; use zenith_utils::zid::{ZTenantId, ZTimelineId}; -pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; - // See: https://www.postgresql.org/docs/13/protocol-replication.html const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; @@ -59,7 +57,7 @@ impl HotStandbyFeedback { /// Standby status update #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StandbyReply { - pub write_lsn: Lsn, // not used + pub write_lsn: Lsn, // last lsn received by pageserver pub flush_lsn: Lsn, // not used pub apply_lsn: Lsn, // pageserver's disk consistent lSN pub reply_ts: TimestampTz, @@ -99,7 +97,7 @@ impl Drop for ReplicationStreamGuard { fn drop(&mut self) { // the connection with pageserver is lost, // resume callback subscription - info!("Connection to pageserver is gone. Subscribe to callmemeybe again. tenantid {} timelineid {}", + debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}", self.tenant_id, self.timelineid); self.tx @@ -120,11 +118,14 @@ impl ReplicationConn { /// Handle incoming messages from the network. /// This is spawned into the background by `handle_start_replication`. - fn background_thread(mut stream_in: ReadStream, timeline: Arc) -> Result<()> { + fn background_thread( + mut stream_in: ReadStream, + timeline: Arc, + replica_id: usize, + ) -> Result<()> { let mut state = ReplicaState::new(); - let replica = timeline.add_replica(state); let _guard = ReplicationConnGuard { - replica, + replica: replica_id, timeline: timeline.clone(), }; // Wait for replica's feedback. @@ -139,13 +140,14 @@ impl ReplicationConn { // Note: deserializing is on m[1..] because we skip the tag byte. state.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; - timeline.update_replica_state(replica, Some(state)); + timeline.update_replica_state(replica_id, Some(state)); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; state.disk_consistent_lsn = reply.apply_lsn; - timeline.update_replica_state(replica, Some(state)); + state.last_received_lsn = reply.write_lsn; + timeline.update_replica_state(replica_id, Some(state)); } _ => warn!("unexpected message {:?}", msg), } @@ -212,12 +214,16 @@ impl ReplicationConn { let bg_timeline = Arc::clone(swh.timeline.get()); let bg_stream_in = self.stream_in.take().unwrap(); + let state = ReplicaState::new(); + // This replica_id is used below to check if it's time to stop replication. + let replica_id = bg_timeline.add_replica(state); + // TODO: here we got two threads, one for writing WAL and one for receiving // feedback. If one of them fails, we should shutdown the other one too. let _ = thread::Builder::new() .name("HotStandbyFeedback thread".into()) .spawn(move || { - if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) { + if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) { error!("Replication background thread failed: {}", err); } }) @@ -290,9 +296,17 @@ impl ReplicationConn { end_pos = stop_pos; } else { /* Wait until we have some data to stream */ - if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) { - end_pos = lsn + let lsn = swh.timeline.get().wait_for_lsn(start_pos); + + if let Some(lsn) = lsn { + end_pos = lsn; } else { + // Is is time to end streaming to this replica? + if swh.timeline.get().check_stop_streaming(replica_id) { + // TODO create proper error type for this + bail!("end streaming to {:?}", swh.appname); + } + // timeout expired: request pageserver status pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { sent_ptr: end_pos.0, @@ -303,9 +317,6 @@ impl ReplicationConn { continue; } } - if end_pos == END_REPLICATION_MARKER { - break; - } // Take the `File` from `wal_file`, or open a new file. let mut file = match wal_file.take() { @@ -345,7 +356,7 @@ impl ReplicationConn { start_pos += send_size as u64; - debug!("sent WAL up to {}", start_pos); + info!("sent WAL up to {}", start_pos); // Decide whether to reuse this file. If we don't set wal_file here // a new file will be opened next time. diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index ce90fcf310..b379f5ce1e 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -78,7 +78,9 @@ impl postgres_backend::Handler for SendWalHandler { if query_string.starts_with(b"IDENTIFY_SYSTEM") { self.handle_identify_system(pgb)?; } else if query_string.starts_with(b"START_REPLICATION") { - ReplicationConn::new(pgb).run(self, pgb, &query_string)?; + ReplicationConn::new(pgb) + .run(self, pgb, &query_string) + .with_context(|| "failed to run ReplicationConn")?; } else if query_string.starts_with(b"START_WAL_PUSH") { // TODO: this repeats query decoding logic from page_service so it is probably // a good idea to refactor it in pgbackend and pass string to process query instead of bytes diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index fece685a57..b8063db5d4 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -18,7 +18,7 @@ use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::zid::{ZTenantId, ZTimelineId}; -use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; +use crate::replication::HotStandbyFeedback; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, Storage, SK_FORMAT_VERSION, SK_MAGIC, @@ -38,6 +38,8 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Replica status: host standby feedback + disk consistent lsn #[derive(Debug, Clone, Copy)] pub struct ReplicaState { + /// last known lsn received by replica + pub last_received_lsn: Lsn, // None means we don't know /// combined disk_consistent_lsn of pageservers pub disk_consistent_lsn: Lsn, /// combined hot standby feedback from all replicas @@ -53,6 +55,7 @@ impl Default for ReplicaState { impl ReplicaState { pub fn new() -> ReplicaState { ReplicaState { + last_received_lsn: Lsn::MAX, disk_consistent_lsn: Lsn(u64::MAX), hs_feedback: HotStandbyFeedback { ts: 0, @@ -70,6 +73,9 @@ struct SharedState { /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about notified_commit_lsn: Lsn, + // Set stop_lsn to inform WAL senders that it's time to stop sending WAL, + // so that it send all wal up stop_lsn and can safely exit streaming connections. + stop_lsn: Option, /// State of replicas replicas: Vec>, } @@ -92,7 +98,7 @@ lazy_static! { } impl SharedState { - /// Get combined stateof all alive replicas + /// Get combined state of all alive replicas pub fn get_replicas_state(&self) -> ReplicaState { let mut acc = ReplicaState::new(); for state in self.replicas.iter().flatten() { @@ -101,6 +107,8 @@ impl SharedState { acc.hs_feedback.catalog_xmin = min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin); acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn); + // currently not used, but update it to be consistent + acc.last_received_lsn = Lsn::min(acc.last_received_lsn, state.last_received_lsn); } acc } @@ -141,6 +149,7 @@ impl SharedState { Ok(Self { notified_commit_lsn: Lsn(0), + stop_lsn: None, sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state), replicas: Vec::new(), }) @@ -197,8 +206,52 @@ impl Timeline { } } - fn _stop_wal_senders(&self) { - self.notify_wal_senders(END_REPLICATION_MARKER); + // Notify WAL senders that it's time to stop sending WAL + pub fn stop_streaming(&self) { + let mut shared_state = self.mutex.lock().unwrap(); + // Ensure that safekeeper sends WAL up to the last known committed LSN. + // It guarantees that pageserver will receive all the latest data + // before walservice disconnects. + shared_state.stop_lsn = Some(shared_state.notified_commit_lsn); + trace!( + "Stopping WAL senders. stop_lsn: {}", + shared_state.notified_commit_lsn + ); + } + + // Reset stop_lsn notification, + // so that WAL senders will continue sending WAL + pub fn continue_streaming(&self) { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.stop_lsn = None; + } + + // Check if it's time to stop streaming to the given replica. + // + // Do not stop streaming until replica is caught up with the stop_lsn. + // This is not necessary for correctness, just an optimization to + // be able to remove WAL from safekeeper and decrease amount of work + // on the next start. + pub fn check_stop_streaming(&self, replica_id: usize) -> bool { + let shared_state = self.mutex.lock().unwrap(); + + // If stop_lsn is set, it's time to shutdown streaming. + if let Some(stop_lsn_request) = shared_state.stop_lsn { + let replica_state = shared_state.replicas[replica_id].unwrap(); + // There is no data to stream, so other clauses don't matter. + if shared_state.notified_commit_lsn == Lsn(0) { + return true; + } + // Lsn::MAX means that we don't know the latest LSN yet. + // That may be a new replica, give it a chance to catch up. + if replica_state.last_received_lsn != Lsn::MAX + // If replica is fully caught up, disconnect it. + && stop_lsn_request <= replica_state.last_received_lsn + { + return true; + } + } + false } /// Pass arrived message to the safekeeper. @@ -220,6 +273,7 @@ impl Timeline { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; resp.disk_consistent_lsn = state.disk_consistent_lsn; + // XXX Do we need to add state.last_received_lsn to resp? } } // Ping wal sender that new data might be available. diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index b54dcf02f4..7e01832510 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -426,6 +426,10 @@ impl PostgresBackend { // send that in the ErrorResponse though, because it's not relevant to the // compute node logs. warn!("query handler for {:?} failed: {:#}", m.body, e); + if e.to_string().contains("failed to run") { + self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?; + return Ok(ProcessMsgResult::Break); + } self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?; } self.write_message(&BeMessage::ReadyForQuery)?;