diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs new file mode 100644 index 0000000000..7aad3a62f2 --- /dev/null +++ b/walkeeper/src/handler.rs @@ -0,0 +1,165 @@ +//! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres +//! protocol commands. + +use crate::json_ctrl::handle_json_ctrl; +use crate::receive_wal::ReceiveWalConn; +use crate::send_wal::ReplicationConn; +use crate::timeline::{Timeline, TimelineTools}; +use crate::SafeKeeperConf; +use anyhow::{anyhow, bail, Context, Result}; +use bytes::Bytes; +use postgres_ffi::xlog_utils::PG_TLI; +use std::str::FromStr; +use std::sync::Arc; +use zenith_utils::postgres_backend; +use zenith_utils::postgres_backend::PostgresBackend; +use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, INT4_OID, TEXT_OID}; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; + +use crate::callmemaybe::CallmeEvent; +use crate::timeline::CreateControlFile; +use tokio::sync::mpsc::UnboundedSender; + +/// Safekeeper handler of postgres commands +pub struct SafekeeperPostgresHandler { + pub conf: SafeKeeperConf, + /// assigned application name + pub appname: Option, + pub tenantid: Option, + pub timelineid: Option, + pub timeline: Option>, + //sender to communicate with callmemaybe thread + pub tx: UnboundedSender, +} + +impl postgres_backend::Handler for SafekeeperPostgresHandler { + fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> { + let ztimelineid = sm + .params + .get("ztimelineid") + .ok_or_else(|| anyhow!("timelineid is required"))?; + self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?); + + let ztenantid = sm + .params + .get("ztenantid") + .ok_or_else(|| anyhow!("tenantid is required"))?; + self.tenantid = Some(ZTenantId::from_str(ztenantid)?); + + if let Some(app_name) = sm.params.get("application_name") { + self.appname = Some(app_name.clone()); + } + + Ok(()) + } + + fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> { + // START_WAL_PUSH is the only command that initializes the timeline in production. + // There is also JSON_CTRL command, which should initialize the timeline for testing. + if self.timeline.is_none() { + if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL") + { + self.timeline.set( + &self.conf, + self.tenantid.unwrap(), + self.timelineid.unwrap(), + CreateControlFile::True, + )?; + } else { + self.timeline.set( + &self.conf, + self.tenantid.unwrap(), + self.timelineid.unwrap(), + CreateControlFile::False, + )?; + } + } + if query_string.starts_with(b"IDENTIFY_SYSTEM") { + self.handle_identify_system(pgb)?; + } else if query_string.starts_with(b"START_REPLICATION") { + ReplicationConn::new(pgb) + .run(self, pgb, &query_string) + .with_context(|| "failed to run ReplicationConn")?; + } else if query_string.starts_with(b"START_WAL_PUSH") { + // TODO: this repeats query decoding logic from page_service so it is probably + // a good idea to refactor it in pgbackend and pass string to process query instead of bytes + let decoded_query_string = match query_string.last() { + Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?, + _ => std::str::from_utf8(&query_string)?, + }; + let pageserver_connstr = decoded_query_string + .split_whitespace() + .nth(1) + .map(|s| s.to_owned()); + ReceiveWalConn::new(pgb, pageserver_connstr) + .run(self) + .with_context(|| "failed to run ReceiveWalConn")?; + } else if query_string.starts_with(b"JSON_CTRL") { + handle_json_ctrl(self, pgb, &query_string)?; + } else { + bail!("Unexpected command {:?}", query_string); + } + Ok(()) + } +} + +impl SafekeeperPostgresHandler { + pub fn new(conf: SafeKeeperConf, tx: UnboundedSender) -> Self { + SafekeeperPostgresHandler { + conf, + appname: None, + tenantid: None, + timelineid: None, + timeline: None, + tx, + } + } + + /// + /// Handle IDENTIFY_SYSTEM replication command + /// + fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { + let start_pos = self.timeline.get().get_end_of_wal(); + let lsn = start_pos.to_string(); + let sysid = self.timeline.get().get_info().server.system_id.to_string(); + let lsn_bytes = lsn.as_bytes(); + let tli = PG_TLI.to_string(); + let tli_bytes = tli.as_bytes(); + let sysid_bytes = sysid.as_bytes(); + + pgb.write_message_noflush(&BeMessage::RowDescription(&[ + RowDescriptor { + name: b"systemid", + typoid: TEXT_OID, + typlen: -1, + ..Default::default() + }, + RowDescriptor { + name: b"timeline", + typoid: INT4_OID, + typlen: 4, + ..Default::default() + }, + RowDescriptor { + name: b"xlogpos", + typoid: TEXT_OID, + typlen: -1, + ..Default::default() + }, + RowDescriptor { + name: b"dbname", + typoid: TEXT_OID, + typlen: -1, + ..Default::default() + }, + ]))? + .write_message_noflush(&BeMessage::DataRow(&[ + Some(sysid_bytes), + Some(tli_bytes), + Some(lsn_bytes), + None, + ]))? + .write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?; + Ok(()) + } +} diff --git a/walkeeper/src/json_ctrl.rs b/walkeeper/src/json_ctrl.rs index 685b4e6335..82fa748c7f 100644 --- a/walkeeper/src/json_ctrl.rs +++ b/walkeeper/src/json_ctrl.rs @@ -12,12 +12,12 @@ use crc32c::crc32c_append; use serde::{Deserialize, Serialize}; use tracing::*; +use crate::handler::SafekeeperPostgresHandler; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; use crate::safekeeper::{ AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting, }; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; -use crate::send_wal::SendWalHandler; use crate::timeline::TimelineTools; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils; @@ -57,7 +57,7 @@ struct AppendResult { /// content, and then append it with specified term and lsn. This /// function is used to test safekeepers in different scenarios. pub fn handle_json_ctrl( - swh: &mut SendWalHandler, + spg: &mut SafekeeperPostgresHandler, pgb: &mut PostgresBackend, cmd: &Bytes, ) -> Result<()> { @@ -71,16 +71,16 @@ pub fn handle_json_ctrl( info!("JSON_CTRL request: {:?}", append_request); // need to init safekeeper state before AppendRequest - prepare_safekeeper(swh)?; + prepare_safekeeper(spg)?; // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { - send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?; + send_proposer_elected(spg, append_request.term, append_request.epoch_start_lsn)?; } - let inserted_wal = append_logical_message(swh, append_request)?; + let inserted_wal = append_logical_message(spg, append_request)?; let response = AppendResult { - state: swh.timeline.get().get_info(), + state: spg.timeline.get().get_info(), inserted_wal, }; let response_data = serde_json::to_vec(&response)?; @@ -98,28 +98,28 @@ pub fn handle_json_ctrl( /// Prepare safekeeper to process append requests without crashes, /// by sending ProposerGreeting with default server.wal_seg_size. -fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> { +fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> { let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting { protocol_version: 1, // current protocol pg_version: 0, // unknown proposer_id: [0u8; 16], system_id: 0, - ztli: swh.timelineid.unwrap(), - tenant_id: swh.tenantid.unwrap(), + ztli: spg.timelineid.unwrap(), + tenant_id: spg.tenantid.unwrap(), tli: 0, wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests }); - let response = swh.timeline.get().process_msg(&greeting_request)?; + let response = spg.timeline.get().process_msg(&greeting_request)?; match response { Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), _ => anyhow::bail!("not GreetingResponse"), } } -fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> { +fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> { // add new term to existing history - let history = swh.timeline.get().get_info().acceptor_state.term_history; + let history = spg.timeline.get().get_info().acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; history_entries.push(TermSwitchEntry { term, lsn }); @@ -131,7 +131,7 @@ fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Resu term_history: history, }); - swh.timeline.get().process_msg(&proposer_elected_request)?; + spg.timeline.get().process_msg(&proposer_elected_request)?; Ok(()) } @@ -145,11 +145,11 @@ struct InsertedWAL { /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. fn append_logical_message( - swh: &mut SendWalHandler, + spg: &mut SafekeeperPostgresHandler, msg: AppendLogicalMessage, ) -> Result { let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message); - let sk_state = swh.timeline.get().get_info(); + let sk_state = spg.timeline.get().get_info(); let begin_lsn = msg.begin_lsn; let end_lsn = begin_lsn + wal_data.len() as u64; @@ -173,7 +173,7 @@ fn append_logical_message( wal_data: Bytes::from(wal_data), }); - let response = swh.timeline.get().process_msg(&append_request)?; + let response = spg.timeline.get().process_msg(&append_request)?; let append_response = match response { Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 0ad4e7d644..08a4dd75f5 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -5,10 +5,10 @@ use std::time::Duration; use zenith_utils::zid::ZTimelineId; pub mod callmemaybe; +pub mod handler; pub mod http; pub mod json_ctrl; pub mod receive_wal; -pub mod replication; pub mod s3_offload; pub mod safekeeper; pub mod send_wal; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index c02e0eeff4..bd8a2ed0dd 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -1,4 +1,4 @@ -//! Safekeeper communication endpoint to wal proposer (compute node). +//! Safekeeper communication endpoint to WAL proposer (compute node). //! Gets messages from the network, passes them down to consensus module and //! sends replies back. @@ -14,7 +14,7 @@ use std::sync::Arc; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; -use crate::send_wal::SendWalHandler; +use crate::handler::SafekeeperPostgresHandler; use crate::timeline::TimelineTools; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeMessage}; @@ -71,8 +71,8 @@ impl<'pg> ReceiveWalConn<'pg> { } /// Receive WAL from wal_proposer - pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> { - let _enter = info_span!("WAL acceptor", timeline = %swh.timelineid.unwrap()).entered(); + pub fn run(&mut self, spg: &mut SafekeeperPostgresHandler) -> Result<()> { + let _enter = info_span!("WAL acceptor", timeline = %spg.timelineid.unwrap()).entered(); // Notify the libpq client that it's allowed to send `CopyData` messages self.pg_backend @@ -95,7 +95,7 @@ impl<'pg> ReceiveWalConn<'pg> { } // Incoming WAL stream resumed, so reset information about the timeline pause. - swh.timeline.get().continue_streaming(); + spg.timeline.get().continue_streaming(); // if requested, ask pageserver to fetch wal from us // as long as this wal_stream is alive, callmemaybe thread @@ -105,10 +105,10 @@ impl<'pg> ReceiveWalConn<'pg> { // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver // we should use callmemaybe mechanism. - let timelineid = swh.timeline.get().timelineid; - let tx_clone = swh.tx.clone(); + let timelineid = spg.timeline.get().timelineid; + let tx_clone = spg.tx.clone(); let pageserver_connstr = pageserver_connstr.to_owned(); - swh.tx + spg.tx .send(CallmeEvent::Subscribe( tenant_id, timelineid, @@ -126,14 +126,14 @@ impl<'pg> ReceiveWalConn<'pg> { tx: tx_clone, tenant_id, timelineid, - timeline: Arc::clone(swh.timeline.get()), + timeline: Arc::clone(spg.timeline.get()), }) } None => None, }; loop { - let reply = swh + let reply = spg .timeline .get() .process_msg(&msg) diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs deleted file mode 100644 index 223a3f231e..0000000000 --- a/walkeeper/src/replication.rs +++ /dev/null @@ -1,369 +0,0 @@ -//! This module implements the streaming side of replication protocol, starting -//! with the "START_REPLICATION" message. - -use crate::send_wal::SendWalHandler; -use crate::timeline::{ReplicaState, Timeline, TimelineTools}; -use anyhow::{anyhow, bail, Context, Result}; -use bytes::Bytes; -use postgres_ffi::xlog_utils::{ - get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, -}; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use std::cmp::min; -use std::fs::File; -use std::io::{Read, Seek, SeekFrom}; -use std::net::Shutdown; -use std::path::Path; -use std::sync::Arc; -use std::thread::sleep; -use std::time::Duration; -use std::{str, thread}; -use tracing::*; -use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::Lsn; -use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody}; -use zenith_utils::sock_split::ReadStream; - -use crate::callmemaybe::CallmeEvent; -use tokio::sync::mpsc::UnboundedSender; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; - -// See: https://www.postgresql.org/docs/13/protocol-replication.html -const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; -const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; - -type FullTransactionId = u64; - -/// Hot standby feedback received from replica -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -pub struct HotStandbyFeedback { - pub ts: TimestampTz, - pub xmin: FullTransactionId, - pub catalog_xmin: FullTransactionId, -} - -impl HotStandbyFeedback { - pub fn empty() -> HotStandbyFeedback { - HotStandbyFeedback { - ts: 0, - xmin: 0, - catalog_xmin: 0, - } - } -} - -/// Standby status update -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StandbyReply { - pub write_lsn: Lsn, // last lsn received by pageserver - pub flush_lsn: Lsn, // not used - pub apply_lsn: Lsn, // pageserver's disk consistent lSN - pub reply_ts: TimestampTz, - pub reply_requested: bool, -} - -/// A network connection that's speaking the replication protocol. -pub struct ReplicationConn { - /// This is an `Option` because we will spawn a background thread that will - /// `take` it from us. - stream_in: Option, -} - -/// Scope guard to unregister replication connection from timeline -struct ReplicationConnGuard { - replica: usize, // replica internal ID assigned by timeline - timeline: Arc, -} - -impl Drop for ReplicationConnGuard { - fn drop(&mut self) { - self.timeline.update_replica_state(self.replica, None); - } -} - -// XXX: Naming is a bit messy here. -// This ReplicationStreamGuard lives as long as ReplicationConn -// and current ReplicationConnGuard is tied to the background thread -// that receives feedback. -struct ReplicationStreamGuard { - tx: UnboundedSender, - tenant_id: ZTenantId, - timelineid: ZTimelineId, -} - -impl Drop for ReplicationStreamGuard { - fn drop(&mut self) { - // the connection with pageserver is lost, - // resume callback subscription - debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}", - self.tenant_id, self.timelineid); - - self.tx - .send(CallmeEvent::Resume(self.tenant_id, self.timelineid)) - .unwrap_or_else(|e| { - error!("failed to send Resume request to callmemaybe thread {}", e); - }); - } -} - -impl ReplicationConn { - /// Create a new `ReplicationConn` - pub fn new(pgb: &mut PostgresBackend) -> Self { - Self { - stream_in: pgb.take_stream_in(), - } - } - - /// Handle incoming messages from the network. - /// This is spawned into the background by `handle_start_replication`. - fn background_thread( - mut stream_in: ReadStream, - timeline: Arc, - replica_id: usize, - ) -> Result<()> { - let mut state = ReplicaState::new(); - let _guard = ReplicationConnGuard { - replica: replica_id, - timeline: timeline.clone(), - }; - // Wait for replica's feedback. - while let Some(msg) = FeMessage::read(&mut stream_in)? { - match &msg { - FeMessage::CopyData(m) => { - // There's two possible data messages that the client is supposed to send here: - // `HotStandbyFeedback` and `StandbyStatusUpdate`. - - match m.first().cloned() { - Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { - // Note: deserializing is on m[1..] because we skip the tag byte. - state.hs_feedback = HotStandbyFeedback::des(&m[1..]) - .context("failed to deserialize HotStandbyFeedback")?; - timeline.update_replica_state(replica_id, Some(state)); - } - Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { - let reply = StandbyReply::des(&m[1..]) - .context("failed to deserialize StandbyReply")?; - state.disk_consistent_lsn = reply.apply_lsn; - state.last_received_lsn = reply.write_lsn; - timeline.update_replica_state(replica_id, Some(state)); - } - _ => warn!("unexpected message {:?}", msg), - } - } - FeMessage::Sync => {} - FeMessage::CopyFail => { - // Shutdown the connection, because rust-postgres client cannot be dropped - // when connection is alive. - let _ = stream_in.shutdown(Shutdown::Both); - return Err(anyhow!("Copy failed")); - } - _ => { - // We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored. - info!("unexpected message {:?}", msg); - } - } - } - - Ok(()) - } - - /// Helper function that parses a single LSN. - fn parse_start(cmd: &[u8]) -> Result { - let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); - let caps = re.captures_iter(str::from_utf8(cmd)?); - let mut lsns = caps.map(|cap| cap[1].parse::()); - let start_pos = lsns - .next() - .ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??; - assert!(lsns.next().is_none()); - Ok(start_pos) - } - - /// Helper function for opening a wal file. - fn open_wal_file(wal_file_path: &Path) -> Result { - // First try to open the .partial file. - let mut partial_path = wal_file_path.to_owned(); - partial_path.set_extension("partial"); - if let Ok(opened_file) = File::open(&partial_path) { - return Ok(opened_file); - } - - // If that failed, try it without the .partial extension. - File::open(&wal_file_path) - .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) - .map_err(|e| { - error!("{}", e); - e - }) - } - - /// - /// Handle START_REPLICATION replication command - /// - pub fn run( - &mut self, - swh: &mut SendWalHandler, - pgb: &mut PostgresBackend, - cmd: &Bytes, - ) -> Result<()> { - let _enter = info_span!("WAL sender", timeline = %swh.timelineid.unwrap()).entered(); - - // spawn the background thread which receives HotStandbyFeedback messages. - let bg_timeline = Arc::clone(swh.timeline.get()); - let bg_stream_in = self.stream_in.take().unwrap(); - - let state = ReplicaState::new(); - // This replica_id is used below to check if it's time to stop replication. - let replica_id = bg_timeline.add_replica(state); - - // TODO: here we got two threads, one for writing WAL and one for receiving - // feedback. If one of them fails, we should shutdown the other one too. - let _ = thread::Builder::new() - .name("HotStandbyFeedback thread".into()) - .spawn(move || { - if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) { - error!("Replication background thread failed: {}", err); - } - }) - .unwrap(); - - let mut start_pos = Self::parse_start(cmd)?; - - let mut wal_seg_size: usize; - loop { - wal_seg_size = swh.timeline.get().get_info().server.wal_seg_size as usize; - if wal_seg_size == 0 { - error!("Cannot start replication before connecting to wal_proposer"); - sleep(Duration::from_secs(1)); - } else { - break; - } - } - let wal_end = swh.timeline.get().get_end_of_wal(); - // Walproposer gets special handling: safekeeper must give proposer all - // local WAL till the end, whether committed or not (walproposer will - // hang otherwise). That's because walproposer runs the consensus and - // synchronizes safekeepers on the most advanced one. - // - // There is a small risk of this WAL getting concurrently garbaged if - // another compute rises which collects majority and starts fixing log - // on this safekeeper itself. That's ok as (old) proposer will never be - // able to commit such WAL. - let stop_pos: Option = if swh.appname == Some("wal_proposer_recovery".to_string()) { - Some(wal_end) - } else { - None - }; - info!("Start replication from {:?} till {:?}", start_pos, stop_pos); - - // Don't spam pageserver with callmemaybe queries - // when replication connection with pageserver is already established. - let _guard = { - if swh.appname == Some("wal_proposer_recovery".to_string()) { - None - } else { - let timelineid = swh.timeline.get().timelineid; - let tenant_id = swh.tenantid.unwrap(); - let tx_clone = swh.tx.clone(); - swh.tx - .send(CallmeEvent::Pause(tenant_id, timelineid)) - .unwrap_or_else(|e| { - error!("failed to send Pause request to callmemaybe thread {}", e); - }); - - // create a guard to subscribe callback again, when this connection will exit - Some(ReplicationStreamGuard { - tx: tx_clone, - tenant_id, - timelineid, - }) - } - }; - - // switch to copy - pgb.write_message(&BeMessage::CopyBothResponse)?; - - let mut end_pos = Lsn(0); - let mut wal_file: Option = None; - - loop { - if let Some(stop_pos) = stop_pos { - if start_pos >= stop_pos { - break; /* recovery finished */ - } - end_pos = stop_pos; - } else { - /* Wait until we have some data to stream */ - let lsn = swh.timeline.get().wait_for_lsn(start_pos); - - if let Some(lsn) = lsn { - end_pos = lsn; - } else { - // Is is time to end streaming to this replica? - if swh.timeline.get().check_stop_streaming(replica_id) { - // TODO create proper error type for this - bail!("end streaming to {:?}", swh.appname); - } - - // timeout expired: request pageserver status - pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { - sent_ptr: end_pos.0, - timestamp: get_current_timestamp(), - request_reply: true, - })) - .context("Failed to send KeepAlive message")?; - continue; - } - } - - // Take the `File` from `wal_file`, or open a new file. - let mut file = match wal_file.take() { - Some(file) => file, - None => { - // Open a new file. - let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let timeline_id = swh.timeline.get().timelineid; - let wal_file_path = swh.conf.timeline_dir(&timeline_id).join(wal_file_name); - Self::open_wal_file(&wal_file_path)? - } - }; - - let xlogoff = start_pos.segment_offset(wal_seg_size) as usize; - - // How much to read and send in message? We cannot cross the WAL file - // boundary, and we don't want send more than MAX_SEND_SIZE. - let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; - let send_size = min(send_size, wal_seg_size - xlogoff); - let send_size = min(send_size, MAX_SEND_SIZE); - - // Read some data from the file. - let mut file_buf = vec![0u8; send_size]; - file.seek(SeekFrom::Start(xlogoff as u64)) - .and_then(|_| file.read_exact(&mut file_buf)) - .context("Failed to read data from WAL file")?; - - // Write some data to the network socket. - pgb.write_message(&BeMessage::XLogData(XLogDataBody { - wal_start: start_pos.0, - wal_end: end_pos.0, - timestamp: get_current_timestamp(), - data: &file_buf, - })) - .context("Failed to send XLogData")?; - - start_pos += send_size as u64; - - info!("sent WAL up to {}", start_pos); - - // Decide whether to reuse this file. If we don't set wal_file here - // a new file will be opened next time. - if start_pos.segment_offset(wal_seg_size) != 0 { - wal_file = Some(file); - } - } - Ok(()) - } -} diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 89b5a8cdea..da2fff41e3 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -18,7 +18,7 @@ use tracing::*; use lazy_static::lazy_static; -use crate::replication::HotStandbyFeedback; +use crate::send_wal::HotStandbyFeedback; use postgres_ffi::xlog_utils::MAX_SEND_SIZE; use zenith_metrics::{ register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec, diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index b379f5ce1e..a27308079e 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -1,166 +1,369 @@ -//! Part of Safekeeper pretending to be Postgres, streaming xlog to -//! pageserver/any other consumer. -//! +//! This module implements the streaming side of replication protocol, starting +//! with the "START_REPLICATION" message. -use crate::json_ctrl::handle_json_ctrl; -use crate::receive_wal::ReceiveWalConn; -use crate::replication::ReplicationConn; -use crate::timeline::{Timeline, TimelineTools}; -use crate::SafeKeeperConf; +use crate::handler::SafekeeperPostgresHandler; +use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; -use postgres_ffi::xlog_utils::PG_TLI; -use std::str::FromStr; +use postgres_ffi::xlog_utils::{ + get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, +}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use std::cmp::min; +use std::fs::File; +use std::io::{Read, Seek, SeekFrom}; +use std::net::Shutdown; +use std::path::Path; use std::sync::Arc; -use zenith_utils::postgres_backend; +use std::thread::sleep; +use std::time::Duration; +use std::{str, thread}; +use tracing::*; +use zenith_utils::bin_ser::BeSer; +use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, INT4_OID, TEXT_OID}; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody}; +use zenith_utils::sock_split::ReadStream; use crate::callmemaybe::CallmeEvent; -use crate::timeline::CreateControlFile; use tokio::sync::mpsc::UnboundedSender; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; -/// Handler for streaming WAL from acceptor -pub struct SendWalHandler { - pub conf: SafeKeeperConf, - /// assigned application name - pub appname: Option, - pub tenantid: Option, - pub timelineid: Option, - pub timeline: Option>, - //sender to communicate with callmemaybe thread - pub tx: UnboundedSender, +// See: https://www.postgresql.org/docs/13/protocol-replication.html +const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; +const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; + +type FullTransactionId = u64; + +/// Hot standby feedback received from replica +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub struct HotStandbyFeedback { + pub ts: TimestampTz, + pub xmin: FullTransactionId, + pub catalog_xmin: FullTransactionId, } -impl postgres_backend::Handler for SendWalHandler { - fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> { - let ztimelineid = sm - .params - .get("ztimelineid") - .ok_or_else(|| anyhow!("timelineid is required"))?; - self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?); - - let ztenantid = sm - .params - .get("ztenantid") - .ok_or_else(|| anyhow!("tenantid is required"))?; - self.tenantid = Some(ZTenantId::from_str(ztenantid)?); - - if let Some(app_name) = sm.params.get("application_name") { - self.appname = Some(app_name.clone()); +impl HotStandbyFeedback { + pub fn empty() -> HotStandbyFeedback { + HotStandbyFeedback { + ts: 0, + xmin: 0, + catalog_xmin: 0, } + } +} - Ok(()) +/// Standby status update +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StandbyReply { + pub write_lsn: Lsn, // last lsn received by pageserver + pub flush_lsn: Lsn, // not used + pub apply_lsn: Lsn, // pageserver's disk consistent lSN + pub reply_ts: TimestampTz, + pub reply_requested: bool, +} + +/// A network connection that's speaking the replication protocol. +pub struct ReplicationConn { + /// This is an `Option` because we will spawn a background thread that will + /// `take` it from us. + stream_in: Option, +} + +/// Scope guard to unregister replication connection from timeline +struct ReplicationConnGuard { + replica: usize, // replica internal ID assigned by timeline + timeline: Arc, +} + +impl Drop for ReplicationConnGuard { + fn drop(&mut self) { + self.timeline.update_replica_state(self.replica, None); + } +} + +// XXX: Naming is a bit messy here. +// This ReplicationStreamGuard lives as long as ReplicationConn +// and current ReplicationConnGuard is tied to the background thread +// that receives feedback. +struct ReplicationStreamGuard { + tx: UnboundedSender, + tenant_id: ZTenantId, + timelineid: ZTimelineId, +} + +impl Drop for ReplicationStreamGuard { + fn drop(&mut self) { + // the connection with pageserver is lost, + // resume callback subscription + debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}", + self.tenant_id, self.timelineid); + + self.tx + .send(CallmeEvent::Resume(self.tenant_id, self.timelineid)) + .unwrap_or_else(|e| { + error!("failed to send Resume request to callmemaybe thread {}", e); + }); + } +} + +impl ReplicationConn { + /// Create a new `ReplicationConn` + pub fn new(pgb: &mut PostgresBackend) -> Self { + Self { + stream_in: pgb.take_stream_in(), + } } - fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> { - // START_WAL_PUSH is the only command that initializes the timeline in production. - // There is also JSON_CTRL command, which should initialize the timeline for testing. - if self.timeline.is_none() { - if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL") - { - self.timeline.set( - &self.conf, - self.tenantid.unwrap(), - self.timelineid.unwrap(), - CreateControlFile::True, - )?; - } else { - self.timeline.set( - &self.conf, - self.tenantid.unwrap(), - self.timelineid.unwrap(), - CreateControlFile::False, - )?; + /// Handle incoming messages from the network. + /// This is spawned into the background by `handle_start_replication`. + fn background_thread( + mut stream_in: ReadStream, + timeline: Arc, + replica_id: usize, + ) -> Result<()> { + let mut state = ReplicaState::new(); + let _guard = ReplicationConnGuard { + replica: replica_id, + timeline: timeline.clone(), + }; + // Wait for replica's feedback. + while let Some(msg) = FeMessage::read(&mut stream_in)? { + match &msg { + FeMessage::CopyData(m) => { + // There's two possible data messages that the client is supposed to send here: + // `HotStandbyFeedback` and `StandbyStatusUpdate`. + + match m.first().cloned() { + Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { + // Note: deserializing is on m[1..] because we skip the tag byte. + state.hs_feedback = HotStandbyFeedback::des(&m[1..]) + .context("failed to deserialize HotStandbyFeedback")?; + timeline.update_replica_state(replica_id, Some(state)); + } + Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { + let reply = StandbyReply::des(&m[1..]) + .context("failed to deserialize StandbyReply")?; + state.disk_consistent_lsn = reply.apply_lsn; + state.last_received_lsn = reply.write_lsn; + timeline.update_replica_state(replica_id, Some(state)); + } + _ => warn!("unexpected message {:?}", msg), + } + } + FeMessage::Sync => {} + FeMessage::CopyFail => { + // Shutdown the connection, because rust-postgres client cannot be dropped + // when connection is alive. + let _ = stream_in.shutdown(Shutdown::Both); + return Err(anyhow!("Copy failed")); + } + _ => { + // We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored. + info!("unexpected message {:?}", msg); + } } } - if query_string.starts_with(b"IDENTIFY_SYSTEM") { - self.handle_identify_system(pgb)?; - } else if query_string.starts_with(b"START_REPLICATION") { - ReplicationConn::new(pgb) - .run(self, pgb, &query_string) - .with_context(|| "failed to run ReplicationConn")?; - } else if query_string.starts_with(b"START_WAL_PUSH") { - // TODO: this repeats query decoding logic from page_service so it is probably - // a good idea to refactor it in pgbackend and pass string to process query instead of bytes - let decoded_query_string = match query_string.last() { - Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?, - _ => std::str::from_utf8(&query_string)?, - }; - let pageserver_connstr = decoded_query_string - .split_whitespace() - .nth(1) - .map(|s| s.to_owned()); - ReceiveWalConn::new(pgb, pageserver_connstr) - .run(self) - .with_context(|| "failed to run ReceiveWalConn")?; - } else if query_string.starts_with(b"JSON_CTRL") { - handle_json_ctrl(self, pgb, &query_string)?; + + Ok(()) + } + + /// Helper function that parses a single LSN. + fn parse_start(cmd: &[u8]) -> Result { + let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); + let caps = re.captures_iter(str::from_utf8(cmd)?); + let mut lsns = caps.map(|cap| cap[1].parse::()); + let start_pos = lsns + .next() + .ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??; + assert!(lsns.next().is_none()); + Ok(start_pos) + } + + /// Helper function for opening a wal file. + fn open_wal_file(wal_file_path: &Path) -> Result { + // First try to open the .partial file. + let mut partial_path = wal_file_path.to_owned(); + partial_path.set_extension("partial"); + if let Ok(opened_file) = File::open(&partial_path) { + return Ok(opened_file); + } + + // If that failed, try it without the .partial extension. + File::open(&wal_file_path) + .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) + .map_err(|e| { + error!("{}", e); + e + }) + } + + /// + /// Handle START_REPLICATION replication command + /// + pub fn run( + &mut self, + spg: &mut SafekeeperPostgresHandler, + pgb: &mut PostgresBackend, + cmd: &Bytes, + ) -> Result<()> { + let _enter = info_span!("WAL sender", timeline = %spg.timelineid.unwrap()).entered(); + + // spawn the background thread which receives HotStandbyFeedback messages. + let bg_timeline = Arc::clone(spg.timeline.get()); + let bg_stream_in = self.stream_in.take().unwrap(); + + let state = ReplicaState::new(); + // This replica_id is used below to check if it's time to stop replication. + let replica_id = bg_timeline.add_replica(state); + + // TODO: here we got two threads, one for writing WAL and one for receiving + // feedback. If one of them fails, we should shutdown the other one too. + let _ = thread::Builder::new() + .name("HotStandbyFeedback thread".into()) + .spawn(move || { + if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) { + error!("Replication background thread failed: {}", err); + } + }) + .unwrap(); + + let mut start_pos = Self::parse_start(cmd)?; + + let mut wal_seg_size: usize; + loop { + wal_seg_size = spg.timeline.get().get_info().server.wal_seg_size as usize; + if wal_seg_size == 0 { + error!("Cannot start replication before connecting to wal_proposer"); + sleep(Duration::from_secs(1)); + } else { + break; + } + } + let wal_end = spg.timeline.get().get_end_of_wal(); + // Walproposer gets special handling: safekeeper must give proposer all + // local WAL till the end, whether committed or not (walproposer will + // hang otherwise). That's because walproposer runs the consensus and + // synchronizes safekeepers on the most advanced one. + // + // There is a small risk of this WAL getting concurrently garbaged if + // another compute rises which collects majority and starts fixing log + // on this safekeeper itself. That's ok as (old) proposer will never be + // able to commit such WAL. + let stop_pos: Option = if spg.appname == Some("wal_proposer_recovery".to_string()) { + Some(wal_end) } else { - bail!("Unexpected command {:?}", query_string); + None + }; + info!("Start replication from {:?} till {:?}", start_pos, stop_pos); + + // Don't spam pageserver with callmemaybe queries + // when replication connection with pageserver is already established. + let _guard = { + if spg.appname == Some("wal_proposer_recovery".to_string()) { + None + } else { + let timelineid = spg.timeline.get().timelineid; + let tenant_id = spg.tenantid.unwrap(); + let tx_clone = spg.tx.clone(); + spg.tx + .send(CallmeEvent::Pause(tenant_id, timelineid)) + .unwrap_or_else(|e| { + error!("failed to send Pause request to callmemaybe thread {}", e); + }); + + // create a guard to subscribe callback again, when this connection will exit + Some(ReplicationStreamGuard { + tx: tx_clone, + tenant_id, + timelineid, + }) + } + }; + + // switch to copy + pgb.write_message(&BeMessage::CopyBothResponse)?; + + let mut end_pos = Lsn(0); + let mut wal_file: Option = None; + + loop { + if let Some(stop_pos) = stop_pos { + if start_pos >= stop_pos { + break; /* recovery finished */ + } + end_pos = stop_pos; + } else { + /* Wait until we have some data to stream */ + let lsn = spg.timeline.get().wait_for_lsn(start_pos); + + if let Some(lsn) = lsn { + end_pos = lsn; + } else { + // Is is time to end streaming to this replica? + if spg.timeline.get().check_stop_streaming(replica_id) { + // TODO create proper error type for this + bail!("end streaming to {:?}", spg.appname); + } + + // timeout expired: request pageserver status + pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + sent_ptr: end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .context("Failed to send KeepAlive message")?; + continue; + } + } + + // Take the `File` from `wal_file`, or open a new file. + let mut file = match wal_file.take() { + Some(file) => file, + None => { + // Open a new file. + let segno = start_pos.segment_number(wal_seg_size); + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let timeline_id = spg.timeline.get().timelineid; + let wal_file_path = spg.conf.timeline_dir(&timeline_id).join(wal_file_name); + Self::open_wal_file(&wal_file_path)? + } + }; + + let xlogoff = start_pos.segment_offset(wal_seg_size) as usize; + + // How much to read and send in message? We cannot cross the WAL file + // boundary, and we don't want send more than MAX_SEND_SIZE. + let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; + let send_size = min(send_size, wal_seg_size - xlogoff); + let send_size = min(send_size, MAX_SEND_SIZE); + + // Read some data from the file. + let mut file_buf = vec![0u8; send_size]; + file.seek(SeekFrom::Start(xlogoff as u64)) + .and_then(|_| file.read_exact(&mut file_buf)) + .context("Failed to read data from WAL file")?; + + // Write some data to the network socket. + pgb.write_message(&BeMessage::XLogData(XLogDataBody { + wal_start: start_pos.0, + wal_end: end_pos.0, + timestamp: get_current_timestamp(), + data: &file_buf, + })) + .context("Failed to send XLogData")?; + + start_pos += send_size as u64; + + info!("sent WAL up to {}", start_pos); + + // Decide whether to reuse this file. If we don't set wal_file here + // a new file will be opened next time. + if start_pos.segment_offset(wal_seg_size) != 0 { + wal_file = Some(file); + } } Ok(()) } } - -impl SendWalHandler { - pub fn new(conf: SafeKeeperConf, tx: UnboundedSender) -> Self { - SendWalHandler { - conf, - appname: None, - tenantid: None, - timelineid: None, - timeline: None, - tx, - } - } - - /// - /// Handle IDENTIFY_SYSTEM replication command - /// - fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { - let start_pos = self.timeline.get().get_end_of_wal(); - let lsn = start_pos.to_string(); - let sysid = self.timeline.get().get_info().server.system_id.to_string(); - let lsn_bytes = lsn.as_bytes(); - let tli = PG_TLI.to_string(); - let tli_bytes = tli.as_bytes(); - let sysid_bytes = sysid.as_bytes(); - - pgb.write_message_noflush(&BeMessage::RowDescription(&[ - RowDescriptor { - name: b"systemid", - typoid: TEXT_OID, - typlen: -1, - ..Default::default() - }, - RowDescriptor { - name: b"timeline", - typoid: INT4_OID, - typlen: 4, - ..Default::default() - }, - RowDescriptor { - name: b"xlogpos", - typoid: TEXT_OID, - typlen: -1, - ..Default::default() - }, - RowDescriptor { - name: b"dbname", - typoid: TEXT_OID, - typlen: -1, - ..Default::default() - }, - ]))? - .write_message_noflush(&BeMessage::DataRow(&[ - Some(sysid_bytes), - Some(tli_bytes), - Some(lsn_bytes), - None, - ]))? - .write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?; - Ok(()) - } -} diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index b8063db5d4..96deba84ee 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -18,11 +18,11 @@ use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::zid::{ZTenantId, ZTimelineId}; -use crate::replication::HotStandbyFeedback; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, Storage, SK_FORMAT_VERSION, SK_MAGIC, }; +use crate::send_wal::HotStandbyFeedback; use crate::upgrade::upgrade_control_file; use crate::SafeKeeperConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index a715c0a37f..305e59bcd3 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -9,7 +9,7 @@ use std::thread; use tracing::*; use crate::callmemaybe::CallmeEvent; -use crate::send_wal::SendWalHandler; +use crate::handler::SafekeeperPostgresHandler; use crate::SafeKeeperConf; use tokio::sync::mpsc::UnboundedSender; use zenith_utils::postgres_backend::{AuthType, PostgresBackend}; @@ -60,7 +60,7 @@ fn handle_socket( socket.set_nodelay(true)?; - let mut conn_handler = SendWalHandler::new(conf, tx); + let mut conn_handler = SafekeeperPostgresHandler::new(conf, tx); let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?; // libpq replication protocol between safekeeper and replicas/pagers pgbackend.run(&mut conn_handler)?;