diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 6a3ab62574..c02e0eeff4 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -7,7 +7,9 @@ use bytes::Bytes; use bytes::BytesMut; use tracing::*; +use crate::timeline::Timeline; use std::net::SocketAddr; +use std::sync::Arc; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; @@ -92,6 +94,9 @@ impl<'pg> ReceiveWalConn<'pg> { _ => bail!("unexpected message {:?} instead of greeting", msg), } + // Incoming WAL stream resumed, so reset information about the timeline pause. + swh.timeline.get().continue_streaming(); + // if requested, ask pageserver to fetch wal from us // as long as this wal_stream is alive, callmemaybe thread // will send requests to pageserver @@ -121,6 +126,7 @@ impl<'pg> ReceiveWalConn<'pg> { tx: tx_clone, tenant_id, timelineid, + timeline: Arc::clone(swh.timeline.get()), }) } None => None, @@ -144,10 +150,12 @@ struct SendWalHandlerGuard { tx: UnboundedSender, tenant_id: ZTenantId, timelineid: ZTimelineId, + timeline: Arc, } impl Drop for SendWalHandlerGuard { fn drop(&mut self) { + self.timeline.stop_streaming(); self.tx .send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid)) .unwrap_or_else(|e| { diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index f284be3aed..223a3f231e 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -3,7 +3,7 @@ use crate::send_wal::SendWalHandler; use crate::timeline::{ReplicaState, Timeline, TimelineTools}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; use postgres_ffi::xlog_utils::{ get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, @@ -30,8 +30,6 @@ use crate::callmemaybe::CallmeEvent; use tokio::sync::mpsc::UnboundedSender; use zenith_utils::zid::{ZTenantId, ZTimelineId}; -pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; - // See: https://www.postgresql.org/docs/13/protocol-replication.html const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; @@ -59,7 +57,7 @@ impl HotStandbyFeedback { /// Standby status update #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StandbyReply { - pub write_lsn: Lsn, // not used + pub write_lsn: Lsn, // last lsn received by pageserver pub flush_lsn: Lsn, // not used pub apply_lsn: Lsn, // pageserver's disk consistent lSN pub reply_ts: TimestampTz, @@ -99,7 +97,7 @@ impl Drop for ReplicationStreamGuard { fn drop(&mut self) { // the connection with pageserver is lost, // resume callback subscription - info!("Connection to pageserver is gone. Subscribe to callmemeybe again. tenantid {} timelineid {}", + debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}", self.tenant_id, self.timelineid); self.tx @@ -120,11 +118,14 @@ impl ReplicationConn { /// Handle incoming messages from the network. /// This is spawned into the background by `handle_start_replication`. - fn background_thread(mut stream_in: ReadStream, timeline: Arc) -> Result<()> { + fn background_thread( + mut stream_in: ReadStream, + timeline: Arc, + replica_id: usize, + ) -> Result<()> { let mut state = ReplicaState::new(); - let replica = timeline.add_replica(state); let _guard = ReplicationConnGuard { - replica, + replica: replica_id, timeline: timeline.clone(), }; // Wait for replica's feedback. @@ -139,13 +140,14 @@ impl ReplicationConn { // Note: deserializing is on m[1..] because we skip the tag byte. state.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; - timeline.update_replica_state(replica, Some(state)); + timeline.update_replica_state(replica_id, Some(state)); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; state.disk_consistent_lsn = reply.apply_lsn; - timeline.update_replica_state(replica, Some(state)); + state.last_received_lsn = reply.write_lsn; + timeline.update_replica_state(replica_id, Some(state)); } _ => warn!("unexpected message {:?}", msg), } @@ -212,12 +214,16 @@ impl ReplicationConn { let bg_timeline = Arc::clone(swh.timeline.get()); let bg_stream_in = self.stream_in.take().unwrap(); + let state = ReplicaState::new(); + // This replica_id is used below to check if it's time to stop replication. + let replica_id = bg_timeline.add_replica(state); + // TODO: here we got two threads, one for writing WAL and one for receiving // feedback. If one of them fails, we should shutdown the other one too. let _ = thread::Builder::new() .name("HotStandbyFeedback thread".into()) .spawn(move || { - if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) { + if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) { error!("Replication background thread failed: {}", err); } }) @@ -290,9 +296,17 @@ impl ReplicationConn { end_pos = stop_pos; } else { /* Wait until we have some data to stream */ - if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) { - end_pos = lsn + let lsn = swh.timeline.get().wait_for_lsn(start_pos); + + if let Some(lsn) = lsn { + end_pos = lsn; } else { + // Is is time to end streaming to this replica? + if swh.timeline.get().check_stop_streaming(replica_id) { + // TODO create proper error type for this + bail!("end streaming to {:?}", swh.appname); + } + // timeout expired: request pageserver status pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { sent_ptr: end_pos.0, @@ -303,9 +317,6 @@ impl ReplicationConn { continue; } } - if end_pos == END_REPLICATION_MARKER { - break; - } // Take the `File` from `wal_file`, or open a new file. let mut file = match wal_file.take() { @@ -345,7 +356,7 @@ impl ReplicationConn { start_pos += send_size as u64; - debug!("sent WAL up to {}", start_pos); + info!("sent WAL up to {}", start_pos); // Decide whether to reuse this file. If we don't set wal_file here // a new file will be opened next time. diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index ce90fcf310..b379f5ce1e 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -78,7 +78,9 @@ impl postgres_backend::Handler for SendWalHandler { if query_string.starts_with(b"IDENTIFY_SYSTEM") { self.handle_identify_system(pgb)?; } else if query_string.starts_with(b"START_REPLICATION") { - ReplicationConn::new(pgb).run(self, pgb, &query_string)?; + ReplicationConn::new(pgb) + .run(self, pgb, &query_string) + .with_context(|| "failed to run ReplicationConn")?; } else if query_string.starts_with(b"START_WAL_PUSH") { // TODO: this repeats query decoding logic from page_service so it is probably // a good idea to refactor it in pgbackend and pass string to process query instead of bytes diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index fece685a57..b8063db5d4 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -18,7 +18,7 @@ use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::zid::{ZTenantId, ZTimelineId}; -use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; +use crate::replication::HotStandbyFeedback; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, Storage, SK_FORMAT_VERSION, SK_MAGIC, @@ -38,6 +38,8 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Replica status: host standby feedback + disk consistent lsn #[derive(Debug, Clone, Copy)] pub struct ReplicaState { + /// last known lsn received by replica + pub last_received_lsn: Lsn, // None means we don't know /// combined disk_consistent_lsn of pageservers pub disk_consistent_lsn: Lsn, /// combined hot standby feedback from all replicas @@ -53,6 +55,7 @@ impl Default for ReplicaState { impl ReplicaState { pub fn new() -> ReplicaState { ReplicaState { + last_received_lsn: Lsn::MAX, disk_consistent_lsn: Lsn(u64::MAX), hs_feedback: HotStandbyFeedback { ts: 0, @@ -70,6 +73,9 @@ struct SharedState { /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about notified_commit_lsn: Lsn, + // Set stop_lsn to inform WAL senders that it's time to stop sending WAL, + // so that it send all wal up stop_lsn and can safely exit streaming connections. + stop_lsn: Option, /// State of replicas replicas: Vec>, } @@ -92,7 +98,7 @@ lazy_static! { } impl SharedState { - /// Get combined stateof all alive replicas + /// Get combined state of all alive replicas pub fn get_replicas_state(&self) -> ReplicaState { let mut acc = ReplicaState::new(); for state in self.replicas.iter().flatten() { @@ -101,6 +107,8 @@ impl SharedState { acc.hs_feedback.catalog_xmin = min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin); acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn); + // currently not used, but update it to be consistent + acc.last_received_lsn = Lsn::min(acc.last_received_lsn, state.last_received_lsn); } acc } @@ -141,6 +149,7 @@ impl SharedState { Ok(Self { notified_commit_lsn: Lsn(0), + stop_lsn: None, sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state), replicas: Vec::new(), }) @@ -197,8 +206,52 @@ impl Timeline { } } - fn _stop_wal_senders(&self) { - self.notify_wal_senders(END_REPLICATION_MARKER); + // Notify WAL senders that it's time to stop sending WAL + pub fn stop_streaming(&self) { + let mut shared_state = self.mutex.lock().unwrap(); + // Ensure that safekeeper sends WAL up to the last known committed LSN. + // It guarantees that pageserver will receive all the latest data + // before walservice disconnects. + shared_state.stop_lsn = Some(shared_state.notified_commit_lsn); + trace!( + "Stopping WAL senders. stop_lsn: {}", + shared_state.notified_commit_lsn + ); + } + + // Reset stop_lsn notification, + // so that WAL senders will continue sending WAL + pub fn continue_streaming(&self) { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.stop_lsn = None; + } + + // Check if it's time to stop streaming to the given replica. + // + // Do not stop streaming until replica is caught up with the stop_lsn. + // This is not necessary for correctness, just an optimization to + // be able to remove WAL from safekeeper and decrease amount of work + // on the next start. + pub fn check_stop_streaming(&self, replica_id: usize) -> bool { + let shared_state = self.mutex.lock().unwrap(); + + // If stop_lsn is set, it's time to shutdown streaming. + if let Some(stop_lsn_request) = shared_state.stop_lsn { + let replica_state = shared_state.replicas[replica_id].unwrap(); + // There is no data to stream, so other clauses don't matter. + if shared_state.notified_commit_lsn == Lsn(0) { + return true; + } + // Lsn::MAX means that we don't know the latest LSN yet. + // That may be a new replica, give it a chance to catch up. + if replica_state.last_received_lsn != Lsn::MAX + // If replica is fully caught up, disconnect it. + && stop_lsn_request <= replica_state.last_received_lsn + { + return true; + } + } + false } /// Pass arrived message to the safekeeper. @@ -220,6 +273,7 @@ impl Timeline { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; resp.disk_consistent_lsn = state.disk_consistent_lsn; + // XXX Do we need to add state.last_received_lsn to resp? } } // Ping wal sender that new data might be available. diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index b54dcf02f4..7e01832510 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -426,6 +426,10 @@ impl PostgresBackend { // send that in the ErrorResponse though, because it's not relevant to the // compute node logs. warn!("query handler for {:?} failed: {:#}", m.body, e); + if e.to_string().contains("failed to run") { + self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?; + return Ok(ProcessMsgResult::Break); + } self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?; } self.write_message(&BeMessage::ReadyForQuery)?;