//! This module implements the streaming side of replication protocol, starting //! with the "START_REPLICATION" message. use crate::handler::SafekeeperPostgresHandler; use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use crate::wal_storage::WalReader; use anyhow::{bail, Context, Result}; use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::cmp::min; use std::net::Shutdown; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use std::{str, thread}; use tokio::sync::mpsc::UnboundedSender; use tracing::*; use utils::{ bin_ser::BeSer, lsn::Lsn, postgres_backend::PostgresBackend, pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback}, sock_split::ReadStream, zid::{ZTenantId, ZTimelineId}, }; // See: https://www.postgresql.org/docs/13/protocol-replication.html const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; // zenith extension of replication protocol const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; type FullTransactionId = u64; /// Hot standby feedback received from replica #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub struct HotStandbyFeedback { pub ts: TimestampTz, pub xmin: FullTransactionId, pub catalog_xmin: FullTransactionId, } impl HotStandbyFeedback { pub fn empty() -> HotStandbyFeedback { HotStandbyFeedback { ts: 0, xmin: 0, catalog_xmin: 0, } } } /// Standby status update #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StandbyReply { pub write_lsn: Lsn, // last lsn received by pageserver pub flush_lsn: Lsn, // pageserver's disk consistent lSN pub apply_lsn: Lsn, // pageserver's remote consistent lSN pub reply_ts: TimestampTz, pub reply_requested: bool, } /// A network connection that's speaking the replication protocol. pub struct ReplicationConn { /// This is an `Option` because we will spawn a background thread that will /// `take` it from us. stream_in: Option, } /// Scope guard to unregister replication connection from timeline struct ReplicationConnGuard { replica: usize, // replica internal ID assigned by timeline timeline: Arc, } impl Drop for ReplicationConnGuard { fn drop(&mut self) { self.timeline.remove_replica(self.replica); } } // XXX: Naming is a bit messy here. // This ReplicationStreamGuard lives as long as ReplicationConn // and current ReplicationConnGuard is tied to the background thread // that receives feedback. struct ReplicationStreamGuard { tx: UnboundedSender, tenant_id: ZTenantId, timeline_id: ZTimelineId, pageserver_connstr: String, } impl Drop for ReplicationStreamGuard { fn drop(&mut self) { // the connection with pageserver is lost, // resume callback subscription debug!( "Connection to pageserver is gone. Resume callmemaybe subsciption if necessary. tenantid {} timelineid {}", self.tenant_id, self.timeline_id, ); let subscription_key = SubscriptionStateKey::new( self.tenant_id, self.timeline_id, self.pageserver_connstr.to_owned(), ); self.tx .send(CallmeEvent::Resume(subscription_key)) .unwrap_or_else(|e| { error!("failed to send Resume request to callmemaybe thread {}", e); }); } } impl ReplicationConn { /// Create a new `ReplicationConn` pub fn new(pgb: &mut PostgresBackend) -> Self { Self { stream_in: pgb.take_stream_in(), } } /// Handle incoming messages from the network. /// This is spawned into the background by `handle_start_replication`. fn background_thread( mut stream_in: ReadStream, replica_guard: Arc, ) -> Result<()> { let replica_id = replica_guard.replica; let timeline = &replica_guard.timeline; let mut state = ReplicaState::new(); // Wait for replica's feedback. while let Some(msg) = FeMessage::read(&mut stream_in)? { match &msg { FeMessage::CopyData(m) => { // There's three possible data messages that the client is supposed to send here: // `HotStandbyFeedback` and `StandbyStatusUpdate` and `ZenithStandbyFeedback`. match m.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { // Note: deserializing is on m[1..] because we skip the tag byte. state.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; timeline.update_replica_state(replica_id, state); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let _reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; // This must be a regular postgres replica, // because pageserver doesn't send this type of messages to safekeeper. // Currently this is not implemented, so this message is ignored. warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet."); // timeline.update_replica_state(replica_id, Some(state)); } Some(ZENITH_STATUS_UPDATE_TAG_BYTE) => { // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. let buf = Bytes::copy_from_slice(&m[9..]); let reply = ZenithFeedback::parse(buf); trace!("ZenithFeedback is {:?}", reply); // Only pageserver sends ZenithFeedback, so set the flag. // This replica is the source of information to resend to compute. state.zenith_feedback = Some(reply); timeline.update_replica_state(replica_id, state); } _ => warn!("unexpected message {:?}", msg), } } FeMessage::Sync => {} FeMessage::CopyFail => { // Shutdown the connection, because rust-postgres client cannot be dropped // when connection is alive. let _ = stream_in.shutdown(Shutdown::Both); bail!("Copy failed"); } _ => { // We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored. info!("unexpected message {:?}", msg); } } } Ok(()) } /// /// Handle START_REPLICATION replication command /// pub fn run( &mut self, spg: &mut SafekeeperPostgresHandler, pgb: &mut PostgresBackend, mut start_pos: Lsn, pageserver_connstr: Option, ) -> Result<()> { let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap(), pageserver_connstr = %pageserver_connstr.as_deref().unwrap_or_default()).entered(); // spawn the background thread which receives HotStandbyFeedback messages. let bg_timeline = Arc::clone(spg.timeline.get()); let bg_stream_in = self.stream_in.take().unwrap(); let state = ReplicaState::new(); // This replica_id is used below to check if it's time to stop replication. let replica_id = bg_timeline.add_replica(state); // Use a guard object to remove our entry from the timeline, when the background // thread and us have both finished using it. let replica_guard = Arc::new(ReplicationConnGuard { replica: replica_id, timeline: bg_timeline, }); let bg_replica_guard = Arc::clone(&replica_guard); // TODO: here we got two threads, one for writing WAL and one for receiving // feedback. If one of them fails, we should shutdown the other one too. let _ = thread::Builder::new() .name("HotStandbyFeedback thread".into()) .spawn(move || { if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) { error!("Replication background thread failed: {}", err); } })?; let mut wal_seg_size: usize; loop { wal_seg_size = spg.timeline.get().get_state().1.server.wal_seg_size as usize; if wal_seg_size == 0 { error!("Cannot start replication before connecting to wal_proposer"); sleep(Duration::from_secs(1)); } else { break; } } let wal_end = spg.timeline.get().get_end_of_wal(); // Walproposer gets special handling: safekeeper must give proposer all // local WAL till the end, whether committed or not (walproposer will // hang otherwise). That's because walproposer runs the consensus and // synchronizes safekeepers on the most advanced one. // // There is a small risk of this WAL getting concurrently garbaged if // another compute rises which collects majority and starts fixing log // on this safekeeper itself. That's ok as (old) proposer will never be // able to commit such WAL. let stop_pos: Option = if spg.appname == Some("wal_proposer_recovery".to_string()) { Some(wal_end) } else { None }; info!("Start replication from {:?} till {:?}", start_pos, stop_pos); // Don't spam pageserver with callmemaybe queries // when replication connection with pageserver is already established. let _guard = { if spg.appname == Some("wal_proposer_recovery".to_string()) { None } else { let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); let zttid = spg.timeline.get().zttid; let tx_clone = spg.timeline.get().callmemaybe_tx.clone(); let subscription_key = SubscriptionStateKey::new( zttid.tenant_id, zttid.timeline_id, pageserver_connstr.clone(), ); tx_clone .send(CallmeEvent::Pause(subscription_key)) .unwrap_or_else(|e| { error!("failed to send Pause request to callmemaybe thread {}", e); }); // create a guard to subscribe callback again, when this connection will exit Some(ReplicationStreamGuard { tx: tx_clone, tenant_id: zttid.tenant_id, timeline_id: zttid.timeline_id, pageserver_connstr, }) } }; // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; let mut end_pos = Lsn(0); let mut wal_reader = WalReader::new( spg.conf.timeline_dir(&spg.timeline.get().zttid), wal_seg_size, start_pos, ); // buffer for wal sending, limited by MAX_SEND_SIZE let mut send_buf = vec![0u8; MAX_SEND_SIZE]; loop { if let Some(stop_pos) = stop_pos { if start_pos >= stop_pos { break; /* recovery finished */ } end_pos = stop_pos; } else { /* Wait until we have some data to stream */ let lsn = spg.timeline.get().wait_for_lsn(start_pos); if let Some(lsn) = lsn { end_pos = lsn; } else { // TODO: also check once in a while whether we are walsender // to right pageserver. if spg.timeline.get().check_deactivate(replica_id)? { // Shut down, timeline is suspended. // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); } // timeout expired: request pageserver status pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { sent_ptr: end_pos.0, timestamp: get_current_timestamp(), request_reply: true, })) .context("Failed to send KeepAlive message")?; continue; } } let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; let send_size = min(send_size, send_buf.len()); let send_buf = &mut send_buf[..send_size]; // read wal into buffer let send_size = wal_reader.read(send_buf)?; let send_buf = &send_buf[..send_size]; // Write some data to the network socket. pgb.write_message(&BeMessage::XLogData(XLogDataBody { wal_start: start_pos.0, wal_end: end_pos.0, timestamp: get_current_timestamp(), data: send_buf, })) .context("Failed to send XLogData")?; start_pos += send_size as u64; trace!("sent WAL up to {}", start_pos); } Ok(()) } }