From a11558b84fa703f0de67cc44458848b7796abbd6 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Wed, 12 May 2021 22:14:33 -0700 Subject: [PATCH] break wal_service into multiple files + misc cleanups --- walkeeper/src/lib.rs | 2 + walkeeper/src/replication.rs | 241 ++++++++++++++++++ walkeeper/src/send_wal.rs | 151 +++++++++++ walkeeper/src/wal_service.rs | 481 +++++------------------------------ 4 files changed, 451 insertions(+), 424 deletions(-) create mode 100644 walkeeper/src/replication.rs create mode 100644 walkeeper/src/send_wal.rs diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index ab473b569e..5f6287f1f8 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -4,7 +4,9 @@ use std::path::PathBuf; use std::time::Duration; pub mod pq_protocol; +pub mod replication; pub mod s3_offload; +pub mod send_wal; pub mod wal_service; use crate::pq_protocol::SystemId; diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs new file mode 100644 index 0000000000..26aef50279 --- /dev/null +++ b/walkeeper/src/replication.rs @@ -0,0 +1,241 @@ +//! This module implements the replication protocol, starting with the +//! "START REPLICATION" message. + +use crate::pq_protocol::{BeMessage, FeMessage}; +use crate::send_wal::SendWal; +use crate::wal_service::{ + HotStandbyFeedback, Timeline, TimelineTools, END_REPLICATION_MARKER, MAX_SEND_SIZE, +}; +use crate::WalAcceptorConf; +use anyhow::{anyhow, bail, Result}; +use byteorder::{BigEndian, ByteOrder}; +use bytes::{BufMut, Bytes, BytesMut}; +use log::*; +use postgres_ffi::xlog_utils::{get_current_timestamp, XLogFileName}; +use regex::Regex; +use std::cmp::min; +use std::fs::File; +use std::io::{BufReader, Read, Seek, SeekFrom, Write}; +use std::net::TcpStream; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use std::{str, thread}; +use zenith_utils::lsn::Lsn; + +const XLOG_HDR_SIZE: usize = 1 + 8 * 3; /* 'w' + startPos + walEnd + timestamp */ +const LIBPQ_HDR_SIZE: usize = 5; /* 1 byte with message type + 4 bytes length */ +const LIBPQ_MSG_SIZE_OFFS: usize = 1; + +// FIXME: we don't use consistent endian on this data structure. +// In wal_service it's little-endian, but here it's big-endian. +// FIXME: This function should go away and be replaced by +// derived serde::Deserialize +impl HotStandbyFeedback { + fn parse(body: &Bytes) -> HotStandbyFeedback { + HotStandbyFeedback { + ts: BigEndian::read_u64(&body[0..8]), + xmin: BigEndian::read_u64(&body[8..16]), + catalog_xmin: BigEndian::read_u64(&body[16..24]), + } + } +} + +pub struct ReplicationHandler { + timeline: Option>, + /// Postgres connection, buffered input + /// + /// This is an `Option` because we will spawn a background thread that will + /// `take` it from us. + stream_in: Option>, + /// Postgres connection, output + stream_out: Mutex, + /// wal acceptor configuration + conf: WalAcceptorConf, + /// assigned application name + appname: Option, +} + +impl ReplicationHandler { + /// Create a new `SendWal`, consuming the `Connection`. + pub fn new(conn: SendWal) -> Self { + Self { + timeline: conn.timeline, + stream_in: Some(conn.stream_in), + stream_out: Mutex::new(conn.stream_out), + conf: conn.conf, + appname: None, + } + } + + /// Handle incoming messages from the network. + /// + /// This is spawned into the background by `handle_start_replication`. + /// + fn background_thread(mut stream_in: impl Read, timeline: Arc) -> Result<()> { + // Wait for replica's feedback. + // We only handle `CopyData` messages. Anything else is ignored. + loop { + match FeMessage::read_from(&mut stream_in)? { + FeMessage::CopyData(m) => { + timeline.add_hs_feedback(HotStandbyFeedback::parse(&m.body)) + } + msg => { + info!("unexpected message {:?}", msg); + } + } + } + } + + /// Helper function that parses a pair of LSNs. + fn parse_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> { + let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); + let caps = re.captures_iter(str::from_utf8(&cmd[..])?); + let mut lsns = caps.map(|cap| cap[1].parse::()); + let start_pos = lsns + .next() + .ok_or_else(|| anyhow!("failed to find start LSN"))??; + let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0)); + Ok((start_pos, stop_pos)) + } + + /// Helper function for opening a wal file. + fn open_wal_file(wal_file_path: &Path) -> Result { + // First try to open the .partial file. + let mut partial_path = wal_file_path.to_owned(); + partial_path.set_extension("partial"); + if let Ok(opened_file) = File::open(&partial_path) { + return Ok(opened_file); + } + + // If that failed, try it without the .partial extension. + match File::open(&wal_file_path) { + Ok(opened_file) => return Ok(opened_file), + Err(e) => { + error!("Failed to open log file {:?}: {}", &wal_file_path, e); + return Err(e.into()); + } + } + } + + /// + /// Handle START_REPLICATION replication command + /// + pub fn run(&mut self, cmd: &Bytes) -> Result<()> { + // spawn the background thread which receives HotStandbyFeedback messages. + let bg_timeline = Arc::clone(self.timeline.get()); + let bg_stream_in = self.stream_in.take().unwrap(); + + thread::spawn(move || { + if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) { + error!("socket error: {}", err); + } + }); + + let (mut start_pos, mut stop_pos) = Self::parse_start_stop(&cmd)?; + + let wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize; + if wal_seg_size == 0 { + bail!("Can not start replication before connecting to wal_proposer"); + } + let (wal_end, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false); + if start_pos == Lsn(0) { + start_pos = wal_end; + } + if stop_pos == Lsn(0) && self.appname == Some("wal_proposer_recovery".to_string()) { + stop_pos = wal_end; + } + info!("Start replication from {} till {}", start_pos, stop_pos); + + let mut outbuf = BytesMut::new(); + BeMessage::write(&mut outbuf, &BeMessage::Copy); + self.send(&outbuf)?; + outbuf.clear(); + + let mut end_pos: Lsn; + let mut wal_file: Option = None; + + loop { + /* Wait until we have some data to stream */ + if stop_pos != Lsn(0) { + /* recovery mode: stream up to the specified LSN (VCL) */ + if start_pos >= stop_pos { + /* recovery finished */ + break; + } + end_pos = stop_pos; + } else { + /* normal mode */ + let timeline = self.timeline.get(); + end_pos = timeline.wait_for_lsn(start_pos); + } + if end_pos == END_REPLICATION_MARKER { + break; + } + + // Take the `File` from `wal_file`, or open a new file. + let mut file = match wal_file.take() { + Some(file) => file, + None => { + // Open a new file. + let segno = start_pos.segment_number(wal_seg_size as u64); + let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); + let timeline_id = self.timeline.get().timelineid.to_string(); + let wal_file_path = self.conf.data_dir.join(timeline_id).join(wal_file_name); + Self::open_wal_file(&wal_file_path)? + } + }; + + let xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize; + + // How much to read and send in message? We cannot cross the WAL file + // boundary, and we don't want send more than MAX_SEND_SIZE. + let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; + let send_size = min(send_size, wal_seg_size - xlogoff); + let send_size = min(send_size, MAX_SEND_SIZE); + + let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size; + + // Read some data from the file. + let mut file_buf = vec![0u8; send_size]; + file.seek(SeekFrom::Start(xlogoff as u64))?; + file.read_exact(&mut file_buf)?; + + // Write some data to the network socket. + // FIXME: turn these into structs. + // 'd' is CopyData; + // 'w' is "WAL records" + // https://www.postgresql.org/docs/9.1/protocol-message-formats.html + // src/backend/replication/walreceiver.c + outbuf.clear(); + outbuf.put_u8(b'd'); + outbuf.put_u32((msg_size - LIBPQ_MSG_SIZE_OFFS) as u32); + outbuf.put_u8(b'w'); + outbuf.put_u64(start_pos.0); + outbuf.put_u64(end_pos.0); + outbuf.put_u64(get_current_timestamp()); + + assert!(outbuf.len() + file_buf.len() == msg_size); + // FIXME: combine these two into a single send, + // so that no other traffic can be sent in between them. + self.send(&outbuf)?; + self.send(&file_buf)?; + start_pos += send_size as u64; + + debug!("Sent WAL to page server up to {}", end_pos); + + // Decide whether to reuse this file. If we don't set wal_file here + // a new file will be opened next time. + if start_pos.segment_offset(wal_seg_size as u64) != 0 { + wal_file = Some(file); + } + } + Ok(()) + } + + /// Unlock the mutex and send bytes on the network. + fn send(&self, buf: &[u8]) -> Result<()> { + let mut writer = self.stream_out.lock().unwrap(); + writer.write_all(buf.as_ref())?; + Ok(()) + } +} diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs new file mode 100644 index 0000000000..59df68e320 --- /dev/null +++ b/walkeeper/src/send_wal.rs @@ -0,0 +1,151 @@ +//! This implements the libpq replication protocol between wal_acceptor and replicas/pagers +//! + +use crate::pq_protocol::{ + BeMessage, FeMessage, FeStartupMessage, RowDescriptor, StartupRequestCode, +}; +use crate::replication::ReplicationHandler; +use crate::wal_service::{Connection, Timeline, TimelineTools}; +use crate::WalAcceptorConf; +use anyhow::{bail, Result}; +use bytes::BytesMut; +use log::*; +use std::io::{BufReader, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::sync::Arc; + +pub struct SendWal { + pub timeline: Option>, + /// Postgres connection, buffered input + pub stream_in: BufReader, + /// Postgres connection, output + pub stream_out: TcpStream, + /// The cached result of socket.peer_addr() + pub peer_addr: SocketAddr, + /// wal acceptor configuration + pub conf: WalAcceptorConf, + /// assigned application name + appname: Option, +} + +impl SendWal { + /// Create a new `SendWal`, consuming the `Connection`. + pub fn new(conn: Connection) -> Self { + Self { + timeline: conn.timeline, + stream_in: conn.stream_in, + stream_out: conn.stream_out, + peer_addr: conn.peer_addr, + conf: conn.conf, + appname: None, + } + } + + /// + /// Send WAL to replica or WAL receiver using standard libpq replication protocol + /// + pub fn run(mut self) -> Result<()> { + let peer_addr = self.peer_addr.clone(); + info!("WAL sender to {:?} is started", peer_addr); + + // Handle the startup message first. + + let m = FeStartupMessage::read_from(&mut self.stream_in)?; + trace!("got startup message {:?}", m); + match m.kind { + StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { + let mut buf = BytesMut::new(); + BeMessage::write(&mut buf, &BeMessage::Negotiate); + info!("SSL requested"); + self.stream_out.write_all(&buf)?; + } + StartupRequestCode::Normal => { + let mut buf = BytesMut::new(); + BeMessage::write(&mut buf, &BeMessage::AuthenticationOk); + BeMessage::write(&mut buf, &BeMessage::ReadyForQuery); + self.stream_out.write_all(&buf)?; + self.timeline.set(m.timelineid)?; + self.appname = m.appname; + } + StartupRequestCode::Cancel => return Ok(()), + } + + loop { + let msg = FeMessage::read_from(&mut self.stream_in)?; + match msg { + FeMessage::Query(q) => { + trace!("got query {:?}", q.body); + + if q.body.starts_with(b"IDENTIFY_SYSTEM") { + self.handle_identify_system()?; + } else if q.body.starts_with(b"START_REPLICATION") { + // Create a new replication object, consuming `self`. + ReplicationHandler::new(self).run(&q.body)?; + break; + } else { + bail!("Unexpected command {:?}", q.body); + } + } + FeMessage::Terminate => { + break; + } + _ => { + bail!("unexpected message"); + } + } + } + info!("WAL sender to {:?} is finished", peer_addr); + Ok(()) + } + + /// + /// Handle IDENTIFY_SYSTEM replication command + /// + fn handle_identify_system(&mut self) -> Result<()> { + let (start_pos, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false); + let lsn = start_pos.to_string(); + let tli = timeline.to_string(); + let sysid = self.timeline.get().get_info().server.system_id.to_string(); + let lsn_bytes = lsn.as_bytes(); + let tli_bytes = tli.as_bytes(); + let sysid_bytes = sysid.as_bytes(); + + let mut outbuf = BytesMut::new(); + BeMessage::write( + &mut outbuf, + &BeMessage::RowDescription(&[ + RowDescriptor { + name: b"systemid\0", + typoid: 25, + typlen: -1, + }, + RowDescriptor { + name: b"timeline\0", + typoid: 23, + typlen: 4, + }, + RowDescriptor { + name: b"xlogpos\0", + typoid: 25, + typlen: -1, + }, + RowDescriptor { + name: b"dbname\0", + typoid: 25, + typlen: -1, + }, + ]), + ); + BeMessage::write( + &mut outbuf, + &BeMessage::DataRow(&[Some(sysid_bytes), Some(tli_bytes), Some(lsn_bytes), None]), + ); + BeMessage::write( + &mut outbuf, + &BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"), + ); + BeMessage::write(&mut outbuf, &BeMessage::ReadyForQuery); + self.stream_out.write_all(&outbuf)?; + Ok(()) + } +} diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index dd3fd77657..992baca7f4 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -4,7 +4,7 @@ //! use anyhow::{bail, Result}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use fs2::FileExt; use lazy_static::lazy_static; use log::*; @@ -24,6 +24,7 @@ use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use crate::pq_protocol::*; +use crate::send_wal::SendWal; use crate::WalAcceptorConf; use pageserver::ZTimelineId; use postgres_ffi::xlog_utils::{ @@ -36,34 +37,31 @@ const SK_MAGIC: u32 = 0xCafeCeefu32; const SK_FORMAT_VERSION: u32 = 1; const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; -const END_REPLICATION_MARKER: Lsn = Lsn::MAX; -const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; -const XLOG_HDR_SIZE: usize = 1 + 8 * 3; /* 'w' + startPos + walEnd + timestamp */ -const LIBPQ_HDR_SIZE: usize = 5; /* 1 byte with message type + 4 bytes length */ -const LIBPQ_MSG_SIZE_OFFS: usize = 1; +pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; +pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; const CONTROL_FILE_NAME: &str = "safekeeper.control"; const END_OF_STREAM: Lsn = Lsn(0); /// Unique node identifier used by Paxos #[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] -struct NodeId { +pub struct NodeId { term: u64, uuid: [u8; 16], } #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -struct ServerInfo { +pub struct ServerInfo { /// proxy-safekeeper protocol version - protocol_version: u32, + pub protocol_version: u32, /// Postgres server version - pg_version: u32, - node_id: NodeId, - system_id: SystemId, + pub pg_version: u32, + pub node_id: NodeId, + pub system_id: SystemId, /// Zenith timelineid - timeline_id: ZTimelineId, - wal_end: Lsn, - timeline: TimeLineID, - wal_seg_size: u32, + pub timeline_id: ZTimelineId, + pub wal_end: Lsn, + pub timeline: TimeLineID, + pub wal_seg_size: u32, } /// Vote request sent from proxy to safekeepers @@ -78,29 +76,29 @@ struct RequestVote { /// Information of about storage node #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -struct SafeKeeperInfo { +pub struct SafeKeeperInfo { /// magic for verifying content the control file - magic: u32, + pub magic: u32, /// safekeeper format version - format_version: u32, + pub format_version: u32, /// safekeeper's epoch - epoch: u64, + pub epoch: u64, /// information about server - server: ServerInfo, + pub server: ServerInfo, /// part of WAL acknowledged by quorum - commit_lsn: Lsn, + pub commit_lsn: Lsn, /// locally flushed part of WAL - flush_lsn: Lsn, + pub flush_lsn: Lsn, /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers - restart_lsn: Lsn, + pub restart_lsn: Lsn, } /// Hot standby feedback received from replica #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] -struct HotStandbyFeedback { - ts: TimestampTz, - xmin: FullTransactionId, - catalog_xmin: FullTransactionId, +pub struct HotStandbyFeedback { + pub ts: TimestampTz, + pub xmin: FullTransactionId, + pub catalog_xmin: FullTransactionId, } /// Request with WAL message sent from proxy to safekeeper. @@ -142,14 +140,14 @@ struct SharedState { /// Database instance (tenant) #[derive(Debug)] pub struct Timeline { - timelineid: ZTimelineId, + pub timelineid: ZTimelineId, mutex: Mutex, /// conditional variable used to notify wal senders cond: Condvar, } // Useful utilities needed by various Connection-like objects -trait TimelineTools { +pub trait TimelineTools { fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>; fn get(&self) -> &Arc; fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID); @@ -176,22 +174,19 @@ impl TimelineTools for Option> { } } -/// Private data #[derive(Debug)] pub struct Connection { - timeline: Option>, + pub timeline: Option>, /// Postgres connection, buffered input - stream_in: BufReader, + pub stream_in: BufReader, /// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain. - stream_out: TcpStream, + pub stream_out: TcpStream, /// The cached result of socket.peer_addr() - peer_addr: SocketAddr, - /// input buffer - //inbuf: BytesMut, + pub peer_addr: SocketAddr, /// output buffer outbuf: BytesMut, /// wal acceptor configuration - conf: WalAcceptorConf, + pub conf: WalAcceptorConf, } /// Serde adapter for BytesMut @@ -238,16 +233,6 @@ impl SafeKeeperInfo { } } -impl HotStandbyFeedback { - fn parse(body: &Bytes) -> HotStandbyFeedback { - HotStandbyFeedback { - ts: BigEndian::read_u64(&body[0..8]), - xmin: BigEndian::read_u64(&body[8..16]), - catalog_xmin: BigEndian::read_u64(&body[16..24]), - } - } -} - lazy_static! { pub static ref TIMELINES: Mutex>> = Mutex::new(HashMap::new()); @@ -328,6 +313,23 @@ impl Timeline { } } + /// Wait for an LSN to be committed. + /// + /// Returns the last committed LSN, which will be at least + /// as high as the LSN waited for. + /// + pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn { + let mut shared_state = self.mutex.lock().unwrap(); + loop { + let commit_lsn = shared_state.commit_lsn; + // This must be `>`, not `>=`. + if commit_lsn > lsn { + return commit_lsn; + } + shared_state = self.cond.wait(shared_state).unwrap(); + } + } + // Notify caught-up WAL senders about new WAL data received fn notify_wal_senders(&self, commit_lsn: Lsn) { let mut shared_state = self.mutex.lock().unwrap(); @@ -341,7 +343,7 @@ impl Timeline { self.notify_wal_senders(END_REPLICATION_MARKER); } - fn get_info(&self) -> SafeKeeperInfo { + pub fn get_info(&self) -> SafeKeeperInfo { return self.mutex.lock().unwrap().info; } @@ -350,7 +352,7 @@ impl Timeline { } // Accumulate hot standby feedbacks from replicas - fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { + pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { let mut shared_state = self.mutex.lock().unwrap(); shared_state.hs_feedback.xmin = min(shared_state.hs_feedback.xmin, feedback.xmin); shared_state.hs_feedback.catalog_xmin = @@ -466,10 +468,10 @@ impl Connection { } fn run(mut self) -> Result<()> { - // Peek at the first 4 bytes of the incoming data, to determine which protocol - // is being spoken. - // `fill_buf` does not consume any of the bytes we peek at; they are left - // in the BufReader's internal buffer for the next reader. + // Peek at the first 4 bytes of the incoming data, to determine which + // protocol is being spoken. fill_buf` does not consume any of the + // bytes we peek at; they are left in the BufReader's internal buffer + // for the next reader. let peek_buf = self.stream_in.fill_buf()?; if peek_buf.len() < 4 { // Empty peek_buf means the socket was closed. @@ -483,7 +485,7 @@ impl Connection { self.stream_in.read_u32::()?; self.receive_wal()?; // internal protocol between wal_proposer and wal_acceptor } else { - send_wal::SendWal::new(self).run()?; // libpq replication protocol between wal_acceptor and replicas/pagers + SendWal::new(self).run()?; // libpq replication protocol between wal_acceptor and replicas/pagers } Ok(()) } @@ -800,372 +802,3 @@ impl Connection { Ok(()) } } - -mod send_wal { - use super::{ - Connection, HotStandbyFeedback, Timeline, TimelineTools, END_REPLICATION_MARKER, - LIBPQ_HDR_SIZE, LIBPQ_MSG_SIZE_OFFS, MAX_SEND_SIZE, XLOG_HDR_SIZE, - }; - use crate::pq_protocol::{ - BeMessage, FeMessage, FeStartupMessage, RowDescriptor, StartupRequestCode, - }; - use crate::WalAcceptorConf; - use anyhow::{anyhow, bail, Result}; - use bytes::{BufMut, Bytes, BytesMut}; - use log::*; - use postgres_ffi::xlog_utils::{get_current_timestamp, XLogFileName}; - use regex::Regex; - use std::cmp::min; - use std::fs::File; - use std::io::{BufReader, Read, Seek, SeekFrom, Write}; - use std::net::{SocketAddr, TcpStream}; - use std::path::Path; - use std::sync::{Arc, Mutex}; - use std::{str, thread}; - use zenith_utils::lsn::Lsn; - - pub struct SendWal { - timeline: Option>, - /// Postgres connection, buffered input - stream_in: BufReader, - /// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain. - stream_out: TcpStream, - /// The cached result of socket.peer_addr() - peer_addr: SocketAddr, - /// wal acceptor configuration - conf: WalAcceptorConf, - /// assigned application name - appname: Option, - } - - impl SendWal { - /// Create a new `SendWal`, consuming the `Connection`. - pub fn new(conn: Connection) -> Self { - Self { - timeline: conn.timeline, - stream_in: conn.stream_in, - stream_out: conn.stream_out, - peer_addr: conn.peer_addr, - conf: conn.conf, - appname: None, - } - } - - /// - /// Send WAL to replica or WAL receiver using standard libpq replication protocol - /// - pub fn run(mut self) -> Result<()> { - let peer_addr = self.peer_addr.clone(); - info!("WAL sender to {:?} is started", peer_addr); - - // Handle the startup message first. - - let m = FeStartupMessage::read_from(&mut self.stream_in)?; - trace!("got startup message {:?}", m); - match m.kind { - StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { - let mut buf = BytesMut::new(); - BeMessage::write(&mut buf, &BeMessage::Negotiate); - info!("SSL requested"); - self.stream_out.write_all(&buf)?; - } - StartupRequestCode::Normal => { - let mut buf = BytesMut::new(); - BeMessage::write(&mut buf, &BeMessage::AuthenticationOk); - BeMessage::write(&mut buf, &BeMessage::ReadyForQuery); - self.stream_out.write_all(&buf)?; - self.timeline.set(m.timelineid)?; - self.appname = m.appname; - } - StartupRequestCode::Cancel => return Ok(()), - } - - loop { - let msg = FeMessage::read_from(&mut self.stream_in)?; - match msg { - FeMessage::Query(q) => { - trace!("got query {:?}", q.body); - - if q.body.starts_with(b"IDENTIFY_SYSTEM") { - self.handle_identify_system()?; - } else if q.body.starts_with(b"START_REPLICATION") { - // Create a new replication object, consuming `self`. - let mut replication = ReplicationHandler::new(self); - replication.run(&q.body)?; - break; - } else { - bail!("Unexpected command {:?}", q.body); - } - } - FeMessage::Terminate => { - break; - } - _ => { - bail!("unexpected message"); - } - } - } - info!("WAL sender to {:?} is finished", peer_addr); - Ok(()) - } - - /// - /// Handle IDENTIFY_SYSTEM replication command - /// - fn handle_identify_system(&mut self) -> Result<()> { - let (start_pos, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false); - let lsn = start_pos.to_string(); - let tli = timeline.to_string(); - let sysid = self.timeline.get().get_info().server.system_id.to_string(); - let lsn_bytes = lsn.as_bytes(); - let tli_bytes = tli.as_bytes(); - let sysid_bytes = sysid.as_bytes(); - - let mut outbuf = BytesMut::new(); - BeMessage::write( - &mut outbuf, - &BeMessage::RowDescription(&[ - RowDescriptor { - name: b"systemid\0", - typoid: 25, - typlen: -1, - }, - RowDescriptor { - name: b"timeline\0", - typoid: 23, - typlen: 4, - }, - RowDescriptor { - name: b"xlogpos\0", - typoid: 25, - typlen: -1, - }, - RowDescriptor { - name: b"dbname\0", - typoid: 25, - typlen: -1, - }, - ]), - ); - BeMessage::write( - &mut outbuf, - &BeMessage::DataRow(&[Some(sysid_bytes), Some(tli_bytes), Some(lsn_bytes), None]), - ); - BeMessage::write( - &mut outbuf, - &BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"), - ); - BeMessage::write(&mut outbuf, &BeMessage::ReadyForQuery); - self.stream_out.write_all(&outbuf)?; - Ok(()) - } - } - - pub struct ReplicationHandler { - timeline: Option>, - /// Postgres connection, buffered input - stream_in: Option>, - /// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain. - stream_out: Mutex, - /// wal acceptor configuration - conf: WalAcceptorConf, - /// assigned application name - appname: Option, - } - - impl ReplicationHandler { - /// Create a new `SendWal`, consuming the `Connection`. - pub fn new(conn: SendWal) -> Self { - Self { - timeline: conn.timeline, - stream_in: Some(conn.stream_in), - stream_out: Mutex::new(conn.stream_out), - conf: conn.conf, - appname: None, - } - } - - /// Handle incoming messages from the network. - /// - /// This is spawned into the background by `handle_start_replication`. - /// - fn background_thread(mut stream_in: impl Read, timeline: Arc) -> Result<()> { - // Wait for replica's feedback. - // We only handle `CopyData` messages. Anything else is ignored. - - loop { - match FeMessage::read_from(&mut stream_in)? { - FeMessage::CopyData(m) => { - timeline.add_hs_feedback(HotStandbyFeedback::parse(&m.body)) - } - msg => { - info!("unexpected message {:?}", msg); - } - } - } - } - - /// Helper function that parses a pair of LSNs. - fn parse_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> { - let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); - let caps = re.captures_iter(str::from_utf8(&cmd[..])?); - let mut lsns = caps.map(|cap| cap[1].parse::()); - let start_pos = lsns - .next() - .ok_or_else(|| anyhow!("failed to find start LSN"))??; - let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0)); - Ok((start_pos, stop_pos)) - } - - /// Helper function for opening a wal file. - fn open_wal_file(wal_file_path: &Path) -> Result { - // First try to open the .partial file. - let mut partial_path = wal_file_path.to_owned(); - partial_path.set_extension("partial"); - if let Ok(opened_file) = File::open(&partial_path) { - return Ok(opened_file); - } - - // If that failed, try it without the .partial extension. - match File::open(&wal_file_path) { - Ok(opened_file) => return Ok(opened_file), - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - - /// - /// Handle START_REPLICATION replication command - /// - fn run(&mut self, cmd: &Bytes) -> Result<()> { - // spawn the background thread which receives HotStandbyFeedback messages. - let bg_timeline = Arc::clone(self.timeline.get()); - let bg_stream_in = self.stream_in.take().unwrap(); - - thread::spawn(move || { - if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) { - error!("socket error: {}", err); - } - }); - - let (mut start_pos, mut stop_pos) = Self::parse_start_stop(&cmd)?; - - let wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize; - if wal_seg_size == 0 { - bail!("Can not start replication before connecting to wal_proposer"); - } - let (wal_end, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false); - if start_pos == Lsn(0) { - start_pos = wal_end; - } - if stop_pos == Lsn(0) && self.appname == Some("wal_proposer_recovery".to_string()) { - stop_pos = wal_end; - } - info!("Start replication from {} till {}", start_pos, stop_pos); - - let mut outbuf = BytesMut::new(); - BeMessage::write(&mut outbuf, &BeMessage::Copy); - self.send(&outbuf)?; - outbuf.clear(); - - let mut end_pos: Lsn; - let mut commit_lsn: Lsn; - let mut wal_file: Option = None; - - loop { - /* Wait until we have some data to stream */ - if stop_pos != Lsn(0) { - /* recovery mode: stream up to the specified LSN (VCL) */ - if start_pos >= stop_pos { - /* recovery finished */ - break; - } - end_pos = stop_pos; - } else { - /* normal mode */ - let timeline = self.timeline.get(); - let mut shared_state = timeline.mutex.lock().unwrap(); - loop { - commit_lsn = shared_state.commit_lsn; - if start_pos < commit_lsn { - end_pos = commit_lsn; - break; - } - shared_state = timeline.cond.wait(shared_state).unwrap(); - } - } - if end_pos == END_REPLICATION_MARKER { - break; - } - - // Take the `File` from `wal_file`, or open a new file. - let mut file = match wal_file.take() { - Some(file) => file, - None => { - // Open a new file. - let segno = start_pos.segment_number(wal_seg_size as u64); - let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let timeline_id = self.timeline.get().timelineid.to_string(); - let wal_file_path = - self.conf.data_dir.join(timeline_id).join(wal_file_name); - Self::open_wal_file(&wal_file_path)? - } - }; - - let xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize; - - // How much to read and send in message? We cannot cross the WAL file - // boundary, and we don't want send more than MAX_SEND_SIZE. - let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; - let send_size = min(send_size, wal_seg_size - xlogoff); - let send_size = min(send_size, MAX_SEND_SIZE); - - let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size; - - // Read some data from the file. - let mut file_buf = vec![0u8; send_size]; - file.seek(SeekFrom::Start(xlogoff as u64))?; - file.read_exact(&mut file_buf)?; - - // Write some data to the network socket. - // FIXME: turn these into structs. - // 'd' is CopyData; - // 'w' is "WAL records" - // https://www.postgresql.org/docs/9.1/protocol-message-formats.html - // src/backend/replication/walreceiver.c - outbuf.clear(); - outbuf.put_u8(b'd'); - outbuf.put_u32((msg_size - LIBPQ_MSG_SIZE_OFFS) as u32); - outbuf.put_u8(b'w'); - outbuf.put_u64(start_pos.0); - outbuf.put_u64(end_pos.0); - outbuf.put_u64(get_current_timestamp()); - - assert!(outbuf.len() + file_buf.len() == msg_size); - // FIXME: combine these two into a single send, - // so that no other traffic can be sent in between them. - self.send(&outbuf)?; - self.send(&file_buf)?; - start_pos += send_size as u64; - - debug!("Sent WAL to page server up to {}", end_pos); - - // Decide whether to reuse this file. If we don't set wal_file here - // a new file will be opened next time. - if start_pos.segment_offset(wal_seg_size as u64) != 0 { - wal_file = Some(file); - } - } - Ok(()) - } - - /// Unlock the mutex and send bytes on the network. - fn send(&self, buf: &[u8]) -> Result<()> { - let mut writer = self.stream_out.lock().unwrap(); - writer.write_all(buf.as_ref())?; - Ok(()) - } - } -}