diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index b099887940..836c9272a5 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -27,6 +27,8 @@ pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = XLP_REM_LEN_OFFS + 4 + 4; pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = XLOG_SIZE_OF_XLOG_SHORT_PHD + 8 + 4 + 4; pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2; pub const XLOG_SIZE_OF_XLOG_RECORD: usize = XLOG_RECORD_CRC_OFFS + 4; +pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; + pub type XLogRecPtr = u64; pub type TimeLineID = u32; pub type TimestampTz = u64; diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 8adcefcd6f..e296d8ec82 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use std::time::Duration; pub mod pq_protocol; +pub mod receive_wal; pub mod replication; pub mod s3_offload; pub mod send_wal; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs new file mode 100644 index 0000000000..47fc734197 --- /dev/null +++ b/walkeeper/src/receive_wal.rs @@ -0,0 +1,447 @@ +//! This implements the Safekeeper protocol. +//! +//! FIXME: better description needed here + +use anyhow::{bail, Result}; +use log::*; +use postgres::{Client, NoTls}; +use serde::{Deserialize, Serialize}; +use std::cmp::{max, min}; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufReader, Read, Seek, SeekFrom, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::str; +use std::sync::Arc; +use zenith_utils::bin_ser::LeSer; +use zenith_utils::lsn::Lsn; + +use crate::pq_protocol::*; +use crate::replication::HotStandbyFeedback; +use crate::timeline::{Timeline, TimelineTools}; +use crate::WalAcceptorConf; +use pageserver::ZTimelineId; +use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; + +pub const SK_MAGIC: u32 = 0xCafeCeefu32; +pub const SK_FORMAT_VERSION: u32 = 1; +const SK_PROTOCOL_VERSION: u32 = 1; +const UNKNOWN_SERVER_VERSION: u32 = 0; +const END_OF_STREAM: Lsn = Lsn(0); +pub const CONTROL_FILE_NAME: &str = "safekeeper.control"; + +/// Unique node identifier used by Paxos +#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] +pub struct NodeId { + term: u64, + uuid: [u8; 16], +} + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub struct ServerInfo { + /// proxy-safekeeper protocol version + pub protocol_version: u32, + /// Postgres server version + pub pg_version: u32, + pub node_id: NodeId, + pub system_id: SystemId, + /// Zenith timelineid + pub timeline_id: ZTimelineId, + pub wal_end: Lsn, + pub timeline: TimeLineID, + pub wal_seg_size: u32, +} + +/// Vote request sent from proxy to safekeepers +#[derive(Debug, PartialEq, Serialize, Deserialize)] +struct RequestVote { + node_id: NodeId, + /// volume commit LSN + vcl: Lsn, + /// new epoch when safekeeper reaches vcl + epoch: u64, +} + +/// Information of about storage node +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SafeKeeperInfo { + /// magic for verifying content the control file + pub magic: u32, + /// safekeeper format version + pub format_version: u32, + /// safekeeper's epoch + pub epoch: u64, + /// information about server + pub server: ServerInfo, + /// part of WAL acknowledged by quorum + pub commit_lsn: Lsn, + /// locally flushed part of WAL + pub flush_lsn: Lsn, + /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers + pub restart_lsn: Lsn, +} + +impl SafeKeeperInfo { + pub fn new() -> SafeKeeperInfo { + SafeKeeperInfo { + magic: SK_MAGIC, + format_version: SK_FORMAT_VERSION, + epoch: 0, + server: ServerInfo { + protocol_version: SK_PROTOCOL_VERSION, /* proxy-safekeeper protocol version */ + pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ + node_id: NodeId { + term: 0, + uuid: [0; 16], + }, + system_id: 0, /* Postgres system identifier */ + timeline_id: ZTimelineId::from([0u8; 16]), + wal_end: Lsn(0), + timeline: 0, + wal_seg_size: 0, + }, + commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ + flush_lsn: Lsn(0), /* locally flushed part of WAL */ + restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ + } + } +} + +/// Request with WAL message sent from proxy to safekeeper. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +struct SafeKeeperRequest { + /// Sender's node identifier (looks like we do not need it for TCP streaming connection) + sender_id: NodeId, + /// start position of message in WAL + begin_lsn: Lsn, + /// end position of message in WAL + end_lsn: Lsn, + /// restart LSN position (minimal LSN which may be needed by proxy to perform recovery) + restart_lsn: Lsn, + /// LSN committed by quorum of safekeepers + commit_lsn: Lsn, +} + +/// Report safekeeper state to proxy +#[derive(Debug, PartialEq, Serialize, Deserialize)] +struct SafeKeeperResponse { + epoch: u64, + flush_lsn: Lsn, + hs_feedback: HotStandbyFeedback, +} + +#[derive(Debug)] +pub struct ReceiveWalConn { + pub timeline: Option>, + /// Postgres connection, buffered input + pub stream_in: BufReader, + /// Postgres connection, output + pub stream_out: TcpStream, + /// The cached result of socket.peer_addr() + pub peer_addr: SocketAddr, + /// wal acceptor configuration + pub conf: WalAcceptorConf, +} + +impl ReceiveWalConn { + pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result { + let peer_addr = socket.peer_addr()?; + let conn = ReceiveWalConn { + timeline: None, + stream_in: BufReader::new(socket.try_clone()?), + stream_out: socket, + peer_addr, + conf, + }; + Ok(conn) + } + + fn read_req(&mut self) -> Result { + // As the trait bound implies, this always encodes little-endian. + Ok(T::des_from(&mut self.stream_in)?) + } + + fn request_callback(&self) -> std::result::Result<(), postgres::error::Error> { + if let Some(addr) = self.conf.pageserver_addr { + let ps_connstr = format!( + "host={} port={} dbname={} user={}", + addr.ip(), + addr.port(), + "no_db", + "no_user", + ); + let callme = format!( + "callmemaybe {} host={} port={} options='-c ztimelineid={}'", + self.timeline.get().timelineid, + self.conf.listen_addr.ip(), + self.conf.listen_addr.port(), + self.timeline.get().timelineid + ); + info!( + "requesting page server to connect to us: start {} {}", + ps_connstr, callme + ); + let mut client = Client::connect(&ps_connstr, NoTls)?; + client.simple_query(&callme)?; + } + Ok(()) + } + + /// Receive WAL from wal_proposer + pub fn run(&mut self) -> Result<()> { + // Receive information about server + let server_info = self.read_req::()?; + info!( + "Start handshake with wal_proposer {} sysid {} timeline {}", + self.peer_addr, server_info.system_id, server_info.timeline_id, + ); + // FIXME: also check that the system identifier matches + self.timeline.set(server_info.timeline_id)?; + self.timeline.get().load_control_file(&self.conf)?; + + let mut my_info = self.timeline.get().get_info(); + + /* Check protocol compatibility */ + if server_info.protocol_version != SK_PROTOCOL_VERSION { + bail!( + "Incompatible protocol version {}, expected {}", + server_info.protocol_version, + SK_PROTOCOL_VERSION + ); + } + /* Postgres upgrade is not treated as fatal error */ + if server_info.pg_version != my_info.server.pg_version + && my_info.server.pg_version != UNKNOWN_SERVER_VERSION + { + info!( + "Incompatible server version {}, expected {}", + server_info.pg_version, my_info.server.pg_version + ); + } + + /* Update information about server, but preserve locally stored node_id */ + let node_id = my_info.server.node_id; + my_info.server = server_info; + my_info.server.node_id = node_id; + + /* Calculate WAL end based on local data */ + let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true); + my_info.flush_lsn = flush_lsn; + my_info.server.timeline = timeline; + + /* Report my identifier to proxy */ + my_info.ser_into(&mut self.stream_out)?; + + /* Wait for vote request */ + let prop = self.read_req::()?; + /* This is Paxos check which should ensure that only one master can perform commits */ + if prop.node_id < my_info.server.node_id { + /* Send my node-id to inform proxy that it's candidate was rejected */ + my_info.server.node_id.ser_into(&mut self.stream_out)?; + bail!( + "Reject connection attempt with term {} because my term is {}", + prop.node_id.term, + my_info.server.node_id.term, + ); + } + my_info.server.node_id = prop.node_id; + self.timeline.get().set_info(&my_info); + /* Need to persist our vote first */ + self.timeline.get().save_control_file(true)?; + + let mut flushed_restart_lsn = Lsn(0); + let wal_seg_size = server_info.wal_seg_size as usize; + + /* Acknowledge the proposed candidate by returning it to the proxy */ + prop.node_id.ser_into(&mut self.stream_out)?; + + // Need to establish replication channel with page server. + // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + if let Err(e) = self.request_callback() { + // Do not treate it as fatal error and continue work + // FIXME: we should retry after a while... + error!("Failed to send callme request to pageserver: {}", e); + } + + info!( + "Start streaming from timeline {} address {:?}", + server_info.timeline_id, self.peer_addr, + ); + + // Main loop + loop { + let mut sync_control_file = false; + + /* Receive message header */ + let req = self.read_req::()?; + if req.sender_id != my_info.server.node_id { + bail!("Sender NodeId is changed"); + } + if req.begin_lsn == END_OF_STREAM { + info!("Server stops streaming"); + break; + } + let start_pos = req.begin_lsn; + let end_pos = req.end_lsn; + let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; + assert!(rec_size <= MAX_SEND_SIZE); + + debug!( + "received for {} bytes between {} and {}", + rec_size, start_pos, end_pos, + ); + + /* Receive message body */ + let mut inbuf = vec![0u8; rec_size]; + self.stream_in.read_exact(&mut inbuf)?; + + /* Save message in file */ + self.write_wal_file(start_pos, timeline, wal_seg_size, &inbuf)?; + + my_info.restart_lsn = req.restart_lsn; + my_info.commit_lsn = req.commit_lsn; + + /* + * Epoch switch happen when written WAL record cross the boundary. + * The boundary is maximum of last WAL position at this node (FlushLSN) and global + * maximum (vcl) determined by safekeeper_proxy during handshake. + * Switching epoch means that node completes recovery and start writing in the WAL new data. + */ + if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) { + info!("Switch to new epoch {}", prop.epoch); + my_info.epoch = prop.epoch; /* bump epoch */ + sync_control_file = true; + } + if end_pos > my_info.flush_lsn { + my_info.flush_lsn = end_pos; + } + /* + * Update restart LSN in control file. + * To avoid negative impact on performance of extra fsync, do it only + * when restart_lsn delta exceeds WAL segment size. + */ + sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; + self.timeline.get().save_control_file(sync_control_file)?; + + if sync_control_file { + flushed_restart_lsn = my_info.restart_lsn; + } + + /* Report flush position */ + //info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32); + let resp = SafeKeeperResponse { + epoch: my_info.epoch, + flush_lsn: end_pos, + hs_feedback: self.timeline.get().get_hs_feedback(), + }; + resp.ser_into(&mut self.stream_out)?; + + /* + * Ping wal sender that new data is available. + * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. + */ + self.timeline + .get() + .notify_wal_senders(min(req.commit_lsn, end_pos)); + } + Ok(()) + } + + fn write_wal_file( + &self, + startpos: Lsn, + timeline: TimeLineID, + wal_seg_size: usize, + buf: &[u8], + ) -> Result<()> { + let mut bytes_left: usize = buf.len(); + let mut bytes_written: usize = 0; + let mut partial; + let mut start_pos = startpos; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + + /* Extract WAL location for this block */ + let mut xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize; + + while bytes_left != 0 { + let bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach wal + * segment size. + */ + if xlogoff + bytes_left > wal_seg_size { + bytes_to_write = wal_seg_size - xlogoff; + } else { + bytes_to_write = bytes_left; + } + + /* Open file */ + let segno = start_pos.segment_number(wal_seg_size as u64); + let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); + let wal_file_path = self + .conf + .data_dir + .join(self.timeline.get().timelineid.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .data_dir + .join(self.timeline.get().timelineid.to_string()) + .join(wal_file_name.clone() + ".partial"); + + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) + { + /* Try to open existed partial file */ + wal_file = file; + partial = true; + } else { + /* Create and fill new partial file */ + partial = true; + match OpenOptions::new() + .create(true) + .write(true) + .open(&wal_file_partial_path) + { + Ok(mut file) => { + for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { + file.write_all(&ZERO_BLOCK)?; + } + wal_file = file; + } + Err(e) => { + error!("Failed to open log file {:?}: {}", &wal_file_path, e); + return Err(e.into()); + } + } + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; + + // Flush file is not prohibited + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + start_pos += bytes_to_write as u64; + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if start_pos.segment_offset(wal_seg_size as u64) == 0 { + xlogoff = 0; + if partial { + fs::rename(&wal_file_partial_path, &wal_file_path)?; + } + } + } + Ok(()) + } +} diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 4f7abe0e10..87faa57bfc 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -4,13 +4,13 @@ use crate::pq_protocol::{BeMessage, FeMessage}; use crate::send_wal::SendWalConn; use crate::timeline::{Timeline, TimelineTools}; -use crate::wal_service::{HotStandbyFeedback, END_REPLICATION_MARKER, MAX_SEND_SIZE}; use crate::WalAcceptorConf; use anyhow::{anyhow, bail, Result}; use bytes::{BufMut, Bytes, BytesMut}; use log::*; -use postgres_ffi::xlog_utils::{get_current_timestamp, XLogFileName}; +use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE}; use regex::Regex; +use serde::{Deserialize, Serialize}; use std::cmp::min; use std::fs::File; use std::io::{BufReader, Read, Seek, SeekFrom, Write}; @@ -24,6 +24,17 @@ use zenith_utils::lsn::Lsn; const XLOG_HDR_SIZE: usize = 1 + 8 * 3; /* 'w' + startPos + walEnd + timestamp */ const LIBPQ_HDR_SIZE: usize = 5; /* 1 byte with message type + 4 bytes length */ const LIBPQ_MSG_SIZE_OFFS: usize = 1; +pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; + +type FullTransactionId = u64; + +/// Hot standby feedback received from replica +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct HotStandbyFeedback { + pub ts: TimestampTz, + pub xmin: FullTransactionId, + pub catalog_xmin: FullTransactionId, +} /// A network connection that's speaking the replication protocol. pub struct ReplicationConn { diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 1642aaa787..07cad4c081 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -1,19 +1,136 @@ //! This module contains tools for managing timelines. //! -use crate::wal_service::{HotStandbyFeedback, SafeKeeperInfo, SharedState, END_REPLICATION_MARKER}; -use crate::WalAcceptorConf; -use anyhow::Result; +use anyhow::{bail, Result}; +use fs2::FileExt; use lazy_static::lazy_static; use log::*; use pageserver::ZTimelineId; use postgres_ffi::xlog_utils::{find_end_of_wal, TimeLineID}; +use std::cmp::{max, min}; use std::collections::HashMap; -use std::fs; +use std::fs::{self, File, OpenOptions}; +use std::io::{Seek, SeekFrom}; use std::path::Path; use std::sync::{Arc, Condvar, Mutex}; +use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; +use crate::receive_wal::{SafeKeeperInfo, CONTROL_FILE_NAME, SK_FORMAT_VERSION, SK_MAGIC}; +use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; +use crate::WalAcceptorConf; + +/// Shared state associated with database instance (tenant) +#[derive(Debug)] +pub struct SharedState { + /// quorum commit LSN + pub commit_lsn: Lsn, + /// information about this safekeeper + pub info: SafeKeeperInfo, + /// opened file control file handle (needed to hold exlusive file lock + pub control_file: Option, + /// combined hot standby feedback from all replicas + pub hs_feedback: HotStandbyFeedback, +} + +impl SharedState { + pub fn new() -> Self { + Self { + commit_lsn: Lsn(0), + info: SafeKeeperInfo::new(), + control_file: None, + hs_feedback: HotStandbyFeedback { + ts: 0, + xmin: u64::MAX, + catalog_xmin: u64::MAX, + }, + } + } + + /// Accumulate hot standby feedbacks from replicas + pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) { + self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin); + self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin); + self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts); + } + + /// Load and lock control file (prevent running more than one instance of safekeeper) + pub fn load_control_file( + &mut self, + conf: &WalAcceptorConf, + timelineid: ZTimelineId, + ) -> Result<()> { + if self.control_file.is_some() { + info!("control file for timeline {} is already open", timelineid); + return Ok(()); + } + + let control_file_path = conf + .data_dir + .join(timelineid.to_string()) + .join(CONTROL_FILE_NAME); + info!("loading control file {}", control_file_path.display()); + match OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&control_file_path) + { + Ok(file) => { + // Lock file to prevent two or more active wal_acceptors + match file.try_lock_exclusive() { + Ok(()) => {} + Err(e) => { + bail!( + "Control file {:?} is locked by some other process: {}", + &control_file_path, + e + ); + } + } + self.control_file = Some(file); + + let cfile_ref = self.control_file.as_mut().unwrap(); + match SafeKeeperInfo::des_from(cfile_ref) { + Err(e) => { + warn!("read from {:?} failed: {}", control_file_path, e); + } + Ok(info) => { + if info.magic != SK_MAGIC { + bail!("Invalid control file magic: {}", info.magic); + } + if info.format_version != SK_FORMAT_VERSION { + bail!( + "Incompatible format version: {} vs. {}", + info.format_version, + SK_FORMAT_VERSION + ); + } + self.info = info; + } + } + } + Err(e) => { + panic!( + "Failed to open control file {:?}: {}", + &control_file_path, e + ); + } + } + Ok(()) + } + + pub fn save_control_file(&mut self, sync: bool) -> Result<()> { + let file = self.control_file.as_mut().unwrap(); + file.seek(SeekFrom::Start(0))?; + self.info.ser_into(file)?; + if sync { + file.sync_all()?; + } + Ok(()) + } +} + /// Database instance (tenant) #[derive(Debug)] pub struct Timeline { diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index eb005059f7..5e60bb646e 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -2,270 +2,15 @@ //! WAL service listens for client connections and //! receive WAL from wal_proposer and send it to WAL receivers //! -use anyhow::{bail, Result}; -use fs2::FileExt; +use anyhow::Result; use log::*; -use postgres::{Client, NoTls}; -use serde::{Deserialize, Serialize}; -use std::cmp::{max, min}; -use std::fs::{self, File, OpenOptions}; -use std::io::{BufReader, Read, Seek, SeekFrom, Write}; -use std::net::{SocketAddr, TcpListener, TcpStream}; -use std::str; -use std::sync::Arc; +use std::io::Read; +use std::net::{TcpListener, TcpStream}; use std::thread; -use zenith_utils::bin_ser::LeSer; -use zenith_utils::lsn::Lsn; -use crate::pq_protocol::*; +use crate::receive_wal::ReceiveWalConn; use crate::send_wal::SendWalConn; -use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; -use pageserver::ZTimelineId; -use postgres_ffi::xlog_utils::{TimeLineID, TimestampTz, XLogFileName, XLOG_BLCKSZ}; - -type FullTransactionId = u64; - -const SK_MAGIC: u32 = 0xCafeCeefu32; -const SK_FORMAT_VERSION: u32 = 1; -const SK_PROTOCOL_VERSION: u32 = 1; -const UNKNOWN_SERVER_VERSION: u32 = 0; -pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; -pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; -const END_OF_STREAM: Lsn = Lsn(0); -const CONTROL_FILE_NAME: &str = "safekeeper.control"; - -/// Unique node identifier used by Paxos -#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] -pub struct NodeId { - term: u64, - uuid: [u8; 16], -} - -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -pub struct ServerInfo { - /// proxy-safekeeper protocol version - pub protocol_version: u32, - /// Postgres server version - pub pg_version: u32, - pub node_id: NodeId, - pub system_id: SystemId, - /// Zenith timelineid - pub timeline_id: ZTimelineId, - pub wal_end: Lsn, - pub timeline: TimeLineID, - pub wal_seg_size: u32, -} - -/// Vote request sent from proxy to safekeepers -#[derive(Debug, PartialEq, Serialize, Deserialize)] -struct RequestVote { - node_id: NodeId, - /// volume commit LSN - vcl: Lsn, - /// new epoch when safekeeper reaches vcl - epoch: u64, -} - -/// Information of about storage node -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct SafeKeeperInfo { - /// magic for verifying content the control file - pub magic: u32, - /// safekeeper format version - pub format_version: u32, - /// safekeeper's epoch - pub epoch: u64, - /// information about server - pub server: ServerInfo, - /// part of WAL acknowledged by quorum - pub commit_lsn: Lsn, - /// locally flushed part of WAL - pub flush_lsn: Lsn, - /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers - pub restart_lsn: Lsn, -} - -/// Hot standby feedback received from replica -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct HotStandbyFeedback { - pub ts: TimestampTz, - pub xmin: FullTransactionId, - pub catalog_xmin: FullTransactionId, -} - -/// Request with WAL message sent from proxy to safekeeper. -#[derive(Debug, PartialEq, Serialize, Deserialize)] -struct SafeKeeperRequest { - /// Sender's node identifier (looks like we do not need it for TCP streaming connection) - sender_id: NodeId, - /// start position of message in WAL - begin_lsn: Lsn, - /// end position of message in WAL - end_lsn: Lsn, - /// restart LSN position (minimal LSN which may be needed by proxy to perform recovery) - restart_lsn: Lsn, - /// LSN committed by quorum of safekeepers - commit_lsn: Lsn, -} - -/// Report safekeeper state to proxy -#[derive(Debug, PartialEq, Serialize, Deserialize)] -struct SafeKeeperResponse { - epoch: u64, - flush_lsn: Lsn, - hs_feedback: HotStandbyFeedback, -} - -/// Shared state associated with database instance (tenant) -#[derive(Debug)] -pub struct SharedState { - /// quorum commit LSN - pub commit_lsn: Lsn, - /// information about this safekeeper - pub info: SafeKeeperInfo, - /// opened file control file handle (needed to hold exlusive file lock - pub control_file: Option, - /// combined hot standby feedback from all replicas - pub hs_feedback: HotStandbyFeedback, -} - -impl SharedState { - pub fn new() -> Self { - Self { - commit_lsn: Lsn(0), - info: SafeKeeperInfo::new(), - control_file: None, - hs_feedback: HotStandbyFeedback { - ts: 0, - xmin: u64::MAX, - catalog_xmin: u64::MAX, - }, - } - } - - /// Accumulate hot standby feedbacks from replicas - pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) { - self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin); - self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin); - self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts); - } - - /// Load and lock control file (prevent running more than one instance of safekeeper) - pub fn load_control_file( - &mut self, - conf: &WalAcceptorConf, - timelineid: ZTimelineId, - ) -> Result<()> { - if self.control_file.is_some() { - info!("control file for timeline {} is already open", timelineid); - return Ok(()); - } - - let control_file_path = conf - .data_dir - .join(timelineid.to_string()) - .join(CONTROL_FILE_NAME); - info!("loading control file {}", control_file_path.display()); - match OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&control_file_path) - { - Ok(file) => { - // Lock file to prevent two or more active wal_acceptors - match file.try_lock_exclusive() { - Ok(()) => {} - Err(e) => { - bail!( - "Control file {:?} is locked by some other process: {}", - &control_file_path, - e - ); - } - } - self.control_file = Some(file); - - let cfile_ref = self.control_file.as_mut().unwrap(); - match SafeKeeperInfo::des_from(cfile_ref) { - Err(e) => { - warn!("read from {:?} failed: {}", control_file_path, e); - } - Ok(info) => { - if info.magic != SK_MAGIC { - bail!("Invalid control file magic: {}", info.magic); - } - if info.format_version != SK_FORMAT_VERSION { - bail!( - "Incompatible format version: {} vs. {}", - info.format_version, - SK_FORMAT_VERSION - ); - } - self.info = info; - } - } - } - Err(e) => { - panic!( - "Failed to open control file {:?}: {}", - &control_file_path, e - ); - } - } - Ok(()) - } - - pub fn save_control_file(&mut self, sync: bool) -> Result<()> { - let file = self.control_file.as_mut().unwrap(); - file.seek(SeekFrom::Start(0))?; - self.info.ser_into(file)?; - if sync { - file.sync_all()?; - } - Ok(()) - } -} - -#[derive(Debug)] -pub struct ReceiveWalConn { - pub timeline: Option>, - /// Postgres connection, buffered input - pub stream_in: BufReader, - /// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain. - pub stream_out: TcpStream, - /// The cached result of socket.peer_addr() - pub peer_addr: SocketAddr, - /// wal acceptor configuration - pub conf: WalAcceptorConf, -} - -impl SafeKeeperInfo { - fn new() -> SafeKeeperInfo { - SafeKeeperInfo { - magic: SK_MAGIC, - format_version: SK_FORMAT_VERSION, - epoch: 0, - server: ServerInfo { - protocol_version: SK_PROTOCOL_VERSION, /* proxy-safekeeper protocol version */ - pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ - node_id: NodeId { - term: 0, - uuid: [0; 16], - }, - system_id: 0, /* Postgres system identifier */ - timeline_id: ZTimelineId::from([0u8; 16]), - wal_end: Lsn(0), - timeline: 0, - wal_seg_size: 0, - }, - commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ - flush_lsn: Lsn(0), /* locally flushed part of WAL */ - restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ - } - } -} /// Accept incoming TCP connections and spawn them into a background thread. pub fn thread_main(conf: WalAcceptorConf) -> Result<()> { @@ -320,307 +65,3 @@ fn peek_u32(stream: &mut TcpStream) -> Result { } } } - -impl ReceiveWalConn { - pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result { - let peer_addr = socket.peer_addr()?; - let conn = ReceiveWalConn { - timeline: None, - stream_in: BufReader::new(socket.try_clone()?), - stream_out: socket, - peer_addr, - conf, - }; - Ok(conn) - } - - fn read_req(&mut self) -> Result { - // As the trait bound implies, this always encodes little-endian. - Ok(T::des_from(&mut self.stream_in)?) - } - - fn request_callback(&self) -> std::result::Result<(), postgres::error::Error> { - if let Some(addr) = self.conf.pageserver_addr { - let ps_connstr = format!( - "host={} port={} dbname={} user={}", - addr.ip(), - addr.port(), - "no_db", - "no_user", - ); - let callme = format!( - "callmemaybe {} host={} port={} options='-c ztimelineid={}'", - self.timeline.get().timelineid, - self.conf.listen_addr.ip(), - self.conf.listen_addr.port(), - self.timeline.get().timelineid - ); - info!( - "requesting page server to connect to us: start {} {}", - ps_connstr, callme - ); - let mut client = Client::connect(&ps_connstr, NoTls)?; - client.simple_query(&callme)?; - } - Ok(()) - } - - /// Receive WAL from wal_proposer - fn run(&mut self) -> Result<()> { - // Receive information about server - let server_info = self.read_req::()?; - info!( - "Start handshake with wal_proposer {} sysid {} timeline {}", - self.peer_addr, server_info.system_id, server_info.timeline_id, - ); - // FIXME: also check that the system identifier matches - self.timeline.set(server_info.timeline_id)?; - self.timeline.get().load_control_file(&self.conf)?; - - let mut my_info = self.timeline.get().get_info(); - - /* Check protocol compatibility */ - if server_info.protocol_version != SK_PROTOCOL_VERSION { - bail!( - "Incompatible protocol version {}, expected {}", - server_info.protocol_version, - SK_PROTOCOL_VERSION - ); - } - /* Postgres upgrade is not treated as fatal error */ - if server_info.pg_version != my_info.server.pg_version - && my_info.server.pg_version != UNKNOWN_SERVER_VERSION - { - info!( - "Incompatible server version {}, expected {}", - server_info.pg_version, my_info.server.pg_version - ); - } - - /* Update information about server, but preserve locally stored node_id */ - let node_id = my_info.server.node_id; - my_info.server = server_info; - my_info.server.node_id = node_id; - - /* Calculate WAL end based on local data */ - let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true); - my_info.flush_lsn = flush_lsn; - my_info.server.timeline = timeline; - - /* Report my identifier to proxy */ - my_info.ser_into(&mut self.stream_out)?; - - /* Wait for vote request */ - let prop = self.read_req::()?; - /* This is Paxos check which should ensure that only one master can perform commits */ - if prop.node_id < my_info.server.node_id { - /* Send my node-id to inform proxy that it's candidate was rejected */ - my_info.server.node_id.ser_into(&mut self.stream_out)?; - bail!( - "Reject connection attempt with term {} because my term is {}", - prop.node_id.term, - my_info.server.node_id.term, - ); - } - my_info.server.node_id = prop.node_id; - self.timeline.get().set_info(&my_info); - /* Need to persist our vote first */ - self.timeline.get().save_control_file(true)?; - - let mut flushed_restart_lsn = Lsn(0); - let wal_seg_size = server_info.wal_seg_size as usize; - - /* Acknowledge the proposed candidate by returning it to the proxy */ - prop.node_id.ser_into(&mut self.stream_out)?; - - // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver, we should use callme mechanism - if let Err(e) = self.request_callback() { - // Do not treate it as fatal error and continue work - // FIXME: we should retry after a while... - error!("Failed to send callme request to pageserver: {}", e); - } - - info!( - "Start streaming from timeline {} address {:?}", - server_info.timeline_id, self.peer_addr, - ); - - // Main loop - loop { - let mut sync_control_file = false; - - /* Receive message header */ - let req = self.read_req::()?; - if req.sender_id != my_info.server.node_id { - bail!("Sender NodeId is changed"); - } - if req.begin_lsn == END_OF_STREAM { - info!("Server stops streaming"); - break; - } - let start_pos = req.begin_lsn; - let end_pos = req.end_lsn; - let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; - assert!(rec_size <= MAX_SEND_SIZE); - - debug!( - "received for {} bytes between {} and {}", - rec_size, start_pos, end_pos, - ); - - /* Receive message body */ - let mut inbuf = vec![0u8; rec_size]; - self.stream_in.read_exact(&mut inbuf)?; - - /* Save message in file */ - self.write_wal_file(start_pos, timeline, wal_seg_size, &inbuf)?; - - my_info.restart_lsn = req.restart_lsn; - my_info.commit_lsn = req.commit_lsn; - - /* - * Epoch switch happen when written WAL record cross the boundary. - * The boundary is maximum of last WAL position at this node (FlushLSN) and global - * maximum (vcl) determined by safekeeper_proxy during handshake. - * Switching epoch means that node completes recovery and start writing in the WAL new data. - */ - if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) { - info!("Switch to new epoch {}", prop.epoch); - my_info.epoch = prop.epoch; /* bump epoch */ - sync_control_file = true; - } - if end_pos > my_info.flush_lsn { - my_info.flush_lsn = end_pos; - } - /* - * Update restart LSN in control file. - * To avoid negative impact on performance of extra fsync, do it only - * when restart_lsn delta exceeds WAL segment size. - */ - sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; - self.timeline.get().save_control_file(sync_control_file)?; - - if sync_control_file { - flushed_restart_lsn = my_info.restart_lsn; - } - - /* Report flush position */ - //info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32); - let resp = SafeKeeperResponse { - epoch: my_info.epoch, - flush_lsn: end_pos, - hs_feedback: self.timeline.get().get_hs_feedback(), - }; - resp.ser_into(&mut self.stream_out)?; - - /* - * Ping wal sender that new data is available. - * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. - */ - self.timeline - .get() - .notify_wal_senders(min(req.commit_lsn, end_pos)); - } - Ok(()) - } - - fn write_wal_file( - &self, - startpos: Lsn, - timeline: TimeLineID, - wal_seg_size: usize, - buf: &[u8], - ) -> Result<()> { - let mut bytes_left: usize = buf.len(); - let mut bytes_written: usize = 0; - let mut partial; - let mut start_pos = startpos; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - - /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize; - - while bytes_left != 0 { - let bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach wal - * segment size. - */ - if xlogoff + bytes_left > wal_seg_size { - bytes_to_write = wal_seg_size - xlogoff; - } else { - bytes_to_write = bytes_left; - } - - /* Open file */ - let segno = start_pos.segment_number(wal_seg_size as u64); - let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let wal_file_path = self - .conf - .data_dir - .join(self.timeline.get().timelineid.to_string()) - .join(wal_file_name.clone()); - let wal_file_partial_path = self - .conf - .data_dir - .join(self.timeline.get().timelineid.to_string()) - .join(wal_file_name.clone() + ".partial"); - - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) - { - /* Try to open existed partial file */ - wal_file = file; - partial = true; - } else { - /* Create and fill new partial file */ - partial = true; - match OpenOptions::new() - .create(true) - .write(true) - .open(&wal_file_partial_path) - { - Ok(mut file) => { - for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { - file.write_all(&ZERO_BLOCK)?; - } - wal_file = file; - } - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - - // Flush file is not prohibited - if !self.conf.no_sync { - wal_file.sync_all()?; - } - } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - start_pos += bytes_to_write as u64; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size as u64) == 0 { - xlogoff = 0; - if partial { - fs::rename(&wal_file_partial_path, &wal_file_path)?; - } - } - } - Ok(()) - } -}