diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index e94da1de0a..38a9b1612e 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -121,6 +121,7 @@ fn find_end_of_wal_segment( let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS]; while offs < wal_seg_size { + // we are at the beginning of the page; read it in if offs % XLOG_BLCKSZ == 0 { if let Ok(bytes_read) = file.read(&mut buf) { if bytes_read != buf.len() { @@ -144,11 +145,12 @@ fn find_end_of_wal_segment( } else { offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; } + // beginning of the next record } else if contlen == 0 { let page_offs = offs % XLOG_BLCKSZ; let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize; if xl_tot_len == 0 { - break; + break; // zeros, reached the end } last_valid_rec_pos = offs; offs += 4; @@ -156,12 +158,13 @@ fn find_end_of_wal_segment( contlen = xl_tot_len - 4; rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]); } else { - let page_offs = offs % XLOG_BLCKSZ; // we're continuing a record, possibly from previous page. + let page_offs = offs % XLOG_BLCKSZ; let pageleft = XLOG_BLCKSZ - page_offs; // read the rest of the record, or as much as fits on this page. let n = min(contlen, pageleft); + // fill rec_hdr (header up to (but not including) xl_crc field) if rec_offs < XLOG_RECORD_CRC_OFFS { let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n); rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]); @@ -185,6 +188,8 @@ fn find_end_of_wal_segment( crc = crc32c_append(crc, &rec_hdr); offs = (offs + 7) & !7; // pad on 8 bytes boundary */ if crc == wal_crc { + // record is valid, advance the result to its end (with + // alignment to the next record taken into account) last_valid_rec_pos = offs; } else { info!( @@ -201,6 +206,9 @@ fn find_end_of_wal_segment( /// /// Scan a directory that contains PostgreSQL WAL files, for the end of WAL. +/// If precise, returns end LSN (next insertion point, basically); +/// otherwise, start of the last segment. +/// Returns (0, 0) if there is no WAL. /// pub fn find_end_of_wal( data_dir: &Path, diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 399669130b..1f49738950 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -5,6 +5,7 @@ use std::time::Duration; pub mod receive_wal; pub mod replication; pub mod s3_offload; +pub mod safekeeper; pub mod send_wal; pub mod timeline; pub mod wal_service; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 0ca2d2252c..646f6d9f88 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -1,142 +1,26 @@ -//! This implements the Safekeeper protocol, picking up immediately after the "START_WAL_PUSH" message -//! -//! FIXME: better description needed here +//! Safekeeper communication endpoint to wal proposer (compute node). +//! Gets messages from the network, passes them down to consensus module and +//! sends replies back. use anyhow::{bail, Context, Result}; -use bincode::config::Options; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use log::*; use postgres::{Client, Config, NoTls}; -use serde::{Deserialize, Serialize}; -use std::cmp::{max, min}; -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Seek, SeekFrom, Write}; + use std::net::SocketAddr; -use std::str; -use std::sync::Arc; use std::thread; use std::thread::sleep; -use zenith_utils::bin_ser::{self, le_coder, LeSer}; -use zenith_utils::connstring::connection_host_port; -use zenith_utils::lsn::Lsn; -use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeMessage, SystemId}; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; -use crate::replication::HotStandbyFeedback; +use crate::safekeeper::AcceptorProposerMessage; +use crate::safekeeper::ProposerAcceptorMessage; + use crate::send_wal::SendWalHandler; -use crate::timeline::{Timeline, TimelineTools}; +use crate::timeline::TimelineTools; use crate::WalAcceptorConf; -use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; - -pub const SK_MAGIC: u32 = 0xcafeceefu32; -pub const SK_FORMAT_VERSION: u32 = 1; -const SK_PROTOCOL_VERSION: u32 = 1; -const UNKNOWN_SERVER_VERSION: u32 = 0; -const END_OF_STREAM: Lsn = Lsn(0); -pub const CONTROL_FILE_NAME: &str = "safekeeper.control"; - -/// Unique node identifier used by Paxos -#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] -pub struct NodeId { - term: u64, - uuid: [u8; 16], -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct ServerInfo { - /// proposer-safekeeper protocol version - pub protocol_version: u32, - /// Postgres server version - pub pg_version: u32, - pub node_id: NodeId, - pub system_id: SystemId, - /// Zenith timelineid - pub timeline_id: ZTimelineId, - pub wal_end: Lsn, - pub timeline: TimeLineID, - pub wal_seg_size: u32, - pub tenant_id: ZTenantId, -} - -/// Vote request sent from proposer to safekeepers -#[derive(Debug, PartialEq, Serialize, Deserialize)] -struct RequestVote { - node_id: NodeId, - /// volume commit LSN - vcl: Lsn, - /// new epoch when safekeeper reaches vcl - epoch: u64, -} - -/// Information of about storage node -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct SafeKeeperInfo { - /// magic for verifying content the control file - pub magic: u32, - /// safekeeper format version - pub format_version: u32, - /// safekeeper's epoch - pub epoch: u64, - /// information about server - pub server: ServerInfo, - /// part of WAL acknowledged by quorum - pub commit_lsn: Lsn, - /// locally flushed part of WAL - pub flush_lsn: Lsn, - /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers - pub restart_lsn: Lsn, -} - -impl SafeKeeperInfo { - pub fn new() -> SafeKeeperInfo { - SafeKeeperInfo { - magic: SK_MAGIC, - format_version: SK_FORMAT_VERSION, - epoch: 0, - server: ServerInfo { - protocol_version: SK_PROTOCOL_VERSION, /* proposer-safekeeper protocol version */ - pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ - node_id: NodeId { - term: 0, - uuid: [0; 16], - }, - system_id: 0, /* Postgres system identifier */ - timeline_id: ZTimelineId::from([0u8; 16]), - wal_end: Lsn(0), - timeline: 0, - wal_seg_size: 0, - tenant_id: ZTenantId::from([0u8; 16]), - }, - commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ - flush_lsn: Lsn(0), /* locally flushed part of WAL */ - restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ - } - } -} - -/// Request with WAL message sent from proposer to safekeeper. -#[derive(Debug, PartialEq, Serialize, Deserialize)] -struct SafeKeeperRequest { - /// Sender's node identifier (looks like we do not need it for TCP streaming connection) - sender_id: NodeId, - /// start position of message in WAL - begin_lsn: Lsn, - /// end position of message in WAL - end_lsn: Lsn, - /// restart LSN position (minimal LSN which may be needed by proposer to perform recovery) - restart_lsn: Lsn, - /// LSN committed by quorum of safekeepers - commit_lsn: Lsn, -} - -/// Report safekeeper state to proposer -#[derive(Debug, PartialEq, Serialize, Deserialize)] -struct SafeKeeperResponse { - epoch: u64, - flush_lsn: Lsn, - hs_feedback: HotStandbyFeedback, -} +use zenith_utils::connstring::connection_host_port; +use zenith_utils::postgres_backend::PostgresBackend; +use zenith_utils::pq_proto::{BeMessage, FeMessage}; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; pub struct ReceiveWalConn<'pg> { /// Postgres connection @@ -206,315 +90,58 @@ impl<'pg> ReceiveWalConn<'pg> { } } - // Read the result of a `CopyData` message sent from the postgres instance - // - // As the trait bound implies, this always encodes little-endian. - fn read_msg(&mut self) -> Result { + // Read and parse message sent from the postgres instance + fn read_msg(&mut self) -> Result { let data = self.read_msg_bytes()?; - // Taken directly from `LeSer::des`: - let value = le_coder() - .reject_trailing_bytes() - .deserialize(&data) - .or(Err(bin_ser::DeserializeError::BadInput))?; - Ok(value) + ProposerAcceptorMessage::parse(data) } - // Writes the value into a `CopyData` message sent to the postgres instance - fn write_msg(&mut self, value: &T) -> Result<()> { + // Send message to the postgres + fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> { let mut buf = Vec::new(); - value.ser_into(&mut buf)?; + msg.serialize(&mut buf)?; self.pg_backend.write_message(&BeMessage::CopyData(&buf))?; Ok(()) } /// Receive WAL from wal_proposer pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> { - let mut this_timeline: Option> = None; - // Notify the libpq client that it's allowed to send `CopyData` messages self.pg_backend .write_message(&BeMessage::CopyBothResponse)?; // Receive information about server - let server_info = self - .read_msg::() - .context("Failed to receive server info")?; - info!( - "Start handshake with wal_proposer {} sysid {} timeline {} tenant {}", - self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id, - ); - // FIXME: also check that the system identifier matches - this_timeline.set(server_info.timeline_id)?; - this_timeline.get().load_control_file(&swh.conf)?; - - let mut my_info = this_timeline.get().get_info(); - - /* Check protocol compatibility */ - if server_info.protocol_version != SK_PROTOCOL_VERSION { - bail!( - "Incompatible protocol version {}, expected {}", - server_info.protocol_version, - SK_PROTOCOL_VERSION - ); - } - /* Postgres upgrade is not treated as fatal error */ - if server_info.pg_version != my_info.server.pg_version - && my_info.server.pg_version != UNKNOWN_SERVER_VERSION - { - info!( - "Incompatible server version {}, expected {}", - server_info.pg_version, my_info.server.pg_version - ); + let mut msg = self + .read_msg() + .context("failed to receive proposer greeting")?; + let tenant_id: ZTenantId; + match msg { + ProposerAcceptorMessage::Greeting(ref greeting) => { + info!( + "start handshake with wal proposer {} sysid {} timeline {}", + self.peer_addr, greeting.system_id, greeting.tli, + ); + tenant_id = greeting.tenant_id; + } + _ => bail!("unexpected message {:?} instead of greeting", msg), } - /* Update information about server, but preserve locally stored node_id */ - let node_id = my_info.server.node_id; - my_info.server = server_info.clone(); - my_info.server.node_id = node_id; - - /* Calculate WAL end based on local data */ - let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true); - my_info.flush_lsn = flush_lsn; - my_info.server.timeline = timeline_id; - - info!( - "find_end_of_wal in {:?}: timeline={} flush_lsn={}", - &swh.conf.data_dir, timeline_id, flush_lsn - ); - - /* Report my identifier to proposer */ - self.write_msg(&my_info)?; - - /* Wait for vote request */ - let prop = self - .read_msg::() - .context("Failed to read vote request")?; - /* This is Paxos check which should ensure that only one master can perform commits */ - if prop.node_id < my_info.server.node_id { - /* Send my node-id to inform proposer that it's candidate was rejected */ - self.write_msg(&my_info.server.node_id)?; - bail!( - "Reject connection attempt with term {} because my term is {}", - prop.node_id.term, - my_info.server.node_id.term, - ); - } - my_info.server.node_id = prop.node_id; - this_timeline.get().set_info(&my_info); - /* Need to persist our vote first */ - this_timeline.get().save_control_file(true)?; - - let mut flushed_restart_lsn = Lsn(0); - let wal_seg_size = server_info.wal_seg_size as usize; - - /* Acknowledge the proposed candidate by returning it to the proposer */ - self.write_msg(&prop.node_id)?; - + // if requested, ask pageserver to fetch wal from us + // xxx: this place seems not really fitting if swh.conf.pageserver_addr.is_some() { // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver, we should use callme mechanism let conf = swh.conf.clone(); - let timelineid = this_timeline.get().timelineid; - let tenantid = server_info.tenant_id; + let timelineid = swh.timeline.get().timelineid; thread::spawn(move || { - request_callback(conf, timelineid, tenantid); + request_callback(conf, timelineid, tenant_id); }); } - info!( - "Start streaming from timeline {} tenant {} address {:?} flush_lsn={}", - server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn - ); - - // Main loop loop { - let mut sync_control_file = false; - - /* Receive message header */ - let msg_bytes = self.read_msg_bytes()?; - let mut msg_reader = msg_bytes.reader(); - - let req = SafeKeeperRequest::des_from(&mut msg_reader) - .context("Failed to get WAL message header")?; - if req.sender_id != my_info.server.node_id { - bail!("Sender NodeId is changed"); - } - if req.begin_lsn == END_OF_STREAM { - info!("Server stops streaming"); - break; - } - let start_pos = req.begin_lsn; - let end_pos = req.end_lsn; - let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; - assert!(rec_size <= MAX_SEND_SIZE); - - debug!( - "received for {} bytes between {} and {}", - rec_size, start_pos, end_pos, - ); - - /* Receive message body (from the rest of the message) */ - let mut buf = Vec::with_capacity(rec_size); - msg_reader.read_to_end(&mut buf)?; - assert_eq!(buf.len(), rec_size); - - /* Save message in file */ - Self::write_wal_file( - swh, - start_pos, - timeline_id, - this_timeline.get(), - wal_seg_size, - &buf, - )?; - - my_info.restart_lsn = req.restart_lsn; - my_info.commit_lsn = req.commit_lsn; - - /* - * Epoch switch happen when written WAL record cross the boundary. - * The boundary is maximum of last WAL position at this node (FlushLSN) and global - * maximum (vcl) determined by WAL proposer during handshake. - * Switching epoch means that node completes recovery and start writing in the WAL new data. - */ - if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) { - info!("Switch to new epoch {}", prop.epoch); - my_info.epoch = prop.epoch; /* bump epoch */ - sync_control_file = true; - } - if end_pos > my_info.flush_lsn { - my_info.flush_lsn = end_pos; - } - /* - * Update restart LSN in control file. - * To avoid negative impact on performance of extra fsync, do it only - * when restart_lsn delta exceeds WAL segment size. - */ - sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; - this_timeline.get().save_control_file(sync_control_file)?; - - if sync_control_file { - flushed_restart_lsn = my_info.restart_lsn; - } - - /* Report flush position */ - //info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32); - let resp = SafeKeeperResponse { - epoch: my_info.epoch, - flush_lsn: end_pos, - hs_feedback: this_timeline.get().get_hs_feedback(), - }; - self.write_msg(&resp)?; - - /* - * Ping wal sender that new data is available. - * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. - */ - this_timeline - .get() - .notify_wal_senders(min(req.commit_lsn, end_pos)); + let reply = swh.timeline.get().process_msg(&msg)?; + self.write_msg(&reply)?; + msg = self.read_msg()?; } - - Ok(()) - } - - fn write_wal_file( - swh: &SendWalHandler, - startpos: Lsn, - timeline_id: TimeLineID, - timeline: &Arc, - wal_seg_size: usize, - buf: &[u8], - ) -> Result<()> { - let mut bytes_left: usize = buf.len(); - let mut bytes_written: usize = 0; - let mut partial; - let mut start_pos = startpos; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - - /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize; - - while bytes_left != 0 { - let bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach wal - * segment size. - */ - if xlogoff + bytes_left > wal_seg_size { - bytes_to_write = wal_seg_size - xlogoff; - } else { - bytes_to_write = bytes_left; - } - - /* Open file */ - let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName(timeline_id, segno, wal_seg_size); - let wal_file_path = swh - .conf - .data_dir - .join(timeline.timelineid.to_string()) - .join(wal_file_name.clone()); - let wal_file_partial_path = swh - .conf - .data_dir - .join(timeline.timelineid.to_string()) - .join(wal_file_name.clone() + ".partial"); - - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) - { - /* Try to open existed partial file */ - wal_file = file; - partial = true; - } else { - /* Create and fill new partial file */ - partial = true; - match OpenOptions::new() - .create(true) - .write(true) - .open(&wal_file_partial_path) - { - Ok(mut file) => { - for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { - file.write_all(&ZERO_BLOCK)?; - } - wal_file = file; - } - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - - // Flush file is not prohibited - if !swh.conf.no_sync { - wal_file.sync_all()?; - } - } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - start_pos += bytes_to_write as u64; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size) == 0 { - xlogoff = 0; - if partial { - fs::rename(&wal_file_partial_path, &wal_file_path)?; - } - } - } - Ok(()) } } diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 9012e8e10d..e011b428f5 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -35,6 +35,16 @@ pub struct HotStandbyFeedback { pub catalog_xmin: FullTransactionId, } +impl HotStandbyFeedback { + pub fn empty() -> HotStandbyFeedback { + HotStandbyFeedback { + ts: 0, + xmin: 0, + catalog_xmin: 0, + } + } +} + /// A network connection that's speaking the replication protocol. pub struct ReplicationConn { /// This is an `Option` because we will spawn a background thread that will @@ -150,7 +160,7 @@ impl ReplicationConn { break; } } - let (wal_end, timeline) = swh.timeline.find_end_of_wal(&swh.conf.data_dir, true); + let (wal_end, timeline) = swh.timeline.get().get_end_of_wal(); if start_pos == Lsn(0) { start_pos = wal_end; } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs new file mode 100644 index 0000000000..b9fcae80f3 --- /dev/null +++ b/walkeeper/src/safekeeper.rs @@ -0,0 +1,548 @@ +//! Acceptor part of proposer-acceptor consensus algorithm. + +use anyhow::{anyhow, bail, Result}; +use byteorder::LittleEndian; +use byteorder::ReadBytesExt; +use byteorder::WriteBytesExt; +use bytes::Buf; +use bytes::Bytes; +use log::*; +use postgres_ffi::xlog_utils::TimeLineID; +use serde::{Deserialize, Serialize}; +use std::cmp::max; +use std::io; +use std::io::Read; + +use crate::replication::HotStandbyFeedback; +use postgres_ffi::xlog_utils::MAX_SEND_SIZE; +use zenith_utils::bin_ser::LeSer; +use zenith_utils::lsn::Lsn; +use zenith_utils::pq_proto::SystemId; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; + +pub const SK_MAGIC: u32 = 0xcafeceefu32; +pub const SK_FORMAT_VERSION: u32 = 1; +const SK_PROTOCOL_VERSION: u32 = 1; +const UNKNOWN_SERVER_VERSION: u32 = 0; + +/// Consensus logical timestamp. +type Term = u64; + +/// Unique id of proposer. Not needed for correctness, used for monitoring. +type PgUuid = [u8; 16]; + +/// Persistent consensus state of the acceptor. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AcceptorState { + /// acceptor's last term it voted for (advanced in 1 phase) + pub term: Term, + /// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached). + pub epoch: Term, +} + +/// Information about Postgres. Safekeeper gets it once and then verifies +/// all further connections from computes match. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ServerInfo { + /// Postgres server version + pub pg_version: u32, + pub system_id: SystemId, + /// Zenith timelineid + pub ztli: ZTimelineId, + pub tli: TimeLineID, + pub wal_seg_size: u32, +} + +/// Persistent information stored on safekeeper node +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SafeKeeperState { + /// magic for verifying content the control file + pub magic: u32, + /// safekeeper format version + pub format_version: u32, + /// persistent acceptor state + pub acceptor_state: AcceptorState, + /// information about server + pub server: ServerInfo, + /// Unique id of the last *elected* proposer we dealed with. Not needed + /// correctness, exists for monitoring purposes. + pub proposer_uuid: PgUuid, + /// part of WAL acknowledged by quorum + pub commit_lsn: Lsn, + /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers + pub restart_lsn: Lsn, +} + +impl SafeKeeperState { + pub fn new() -> SafeKeeperState { + SafeKeeperState { + magic: SK_MAGIC, + format_version: SK_FORMAT_VERSION, + acceptor_state: AcceptorState { term: 0, epoch: 0 }, + server: ServerInfo { + pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ + system_id: 0, /* Postgres system identifier */ + ztli: ZTimelineId::from([0u8; 16]), + tli: 0, + wal_seg_size: 0, + }, + proposer_uuid: [0; 16], + commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ + restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ + } + } +} + +// protocol messages + +/// Initial Proposer -> Acceptor message +#[derive(Debug, Serialize, Deserialize)] +pub struct ProposerGreeting { + /// proposer-acceptor protocol version + pub protocol_version: u32, + /// Postgres server version + pub pg_version: u32, + pub proposer_id: PgUuid, + pub system_id: SystemId, + /// Zenith timelineid + pub ztli: ZTimelineId, + pub tenant_id: ZTenantId, + pub tli: TimeLineID, + pub wal_seg_size: u32, +} + +/// Acceptor -> Proposer initial response: the highest term known to me +/// (acceptor voted for). +#[derive(Debug, Serialize, Deserialize)] +pub struct AcceptorGreeting { + term: u64, +} + +/// Vote request sent from proposer to safekeepers +#[derive(Debug, Serialize, Deserialize)] +pub struct VoteRequest { + term: Term, +} + +/// Vote itself, sent from safekeeper to proposer +#[derive(Debug, Serialize, Deserialize)] +pub struct VoteResponse { + term: Term, // not really needed, just a sanity check + vote_given: u64, // fixme u64 due to padding + /// Safekeeper's log position, to let proposer choose the most advanced one + epoch: Term, + flush_lsn: Lsn, + restart_lsn: Lsn, +} + +/// Request with WAL message sent from proposer to safekeeper. Along the way it +/// announces 1) successful election (with VCL); 2) commit_lsn. +#[derive(Debug, Serialize, Deserialize)] +pub struct AppendRequest { + h: AppendRequestHeader, + wal_data: Bytes, +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AppendRequestHeader { + term: Term, + /// volume commit LSN + vcl: Lsn, + /// start position of message in WAL + begin_lsn: Lsn, + /// end position of message in WAL + end_lsn: Lsn, + /// LSN committed by quorum of safekeepers + commit_lsn: Lsn, + /// restart LSN position (minimal LSN which may be needed by proposer to perform recovery) + restart_lsn: Lsn, + // only for logging/debugging + proposer_uuid: PgUuid, +} + +/// Report safekeeper state to proposer +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct AppendResponse { + // Current term of the safekeeper; if it is higher than proposer's, the + // compute is out of date. + pub term: Term, + pub epoch: Term, + // NOTE: this is physical end of wal on safekeeper; currently it doesn't + // make much sense without taking epoch into account, as history can be + // diverged. + pub flush_lsn: Lsn, + pub hs_feedback: HotStandbyFeedback, +} + +/// Proposer -> Acceptor messages +#[derive(Debug)] +pub enum ProposerAcceptorMessage { + Greeting(ProposerGreeting), + VoteRequest(VoteRequest), + AppendRequest(AppendRequest), +} + +impl ProposerAcceptorMessage { + /// Parse proposer message. + pub fn parse(msg: Bytes) -> Result { + // xxx using Reader is inefficient but easy to work with bincode + let mut stream = msg.reader(); + // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is + let tag = stream.read_u64::()? as u8 as char; + match tag { + 'g' => { + let msg = ProposerGreeting::des_from(&mut stream)?; + Ok(ProposerAcceptorMessage::Greeting(msg)) + } + 'v' => { + let msg = VoteRequest::des_from(&mut stream)?; + Ok(ProposerAcceptorMessage::VoteRequest(msg)) + } + 'a' => { + // read header followed by wal data + let hdr = AppendRequestHeader::des_from(&mut stream)?; + let rec_size = hdr + .end_lsn + .checked_sub(hdr.begin_lsn) + .ok_or(anyhow!("begin_lsn > end_lsn in AppendRequest"))? + .0 as usize; + if rec_size > MAX_SEND_SIZE { + bail!( + "AppendRequest is longer than MAX_SEND_SIZE ({})", + MAX_SEND_SIZE + ); + } + + let mut wal_data_vec: Vec = vec![0; rec_size]; + stream.read_exact(&mut wal_data_vec)?; + let wal_data = Bytes::from(wal_data_vec); + let msg = AppendRequest { + h: hdr, + wal_data: wal_data, + }; + + Ok(ProposerAcceptorMessage::AppendRequest(msg)) + } + _ => Err(anyhow!("unknown proposer-acceptor message tag: {}", tag,)), + } + } +} + +/// Acceptor -> Proposer messages +#[derive(Debug)] +pub enum AcceptorProposerMessage { + Greeting(AcceptorGreeting), + VoteResponse(VoteResponse), + AppendResponse(AppendResponse), +} + +impl AcceptorProposerMessage { + /// Serialize acceptor -> proposer message. + pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> { + match self { + AcceptorProposerMessage::Greeting(msg) => { + stream.write_u64::('g' as u64)?; + msg.ser_into(stream)?; + } + AcceptorProposerMessage::VoteResponse(msg) => { + stream.write_u64::('v' as u64)?; + msg.ser_into(stream)?; + } + AcceptorProposerMessage::AppendResponse(msg) => { + stream.write_u64::('a' as u64)?; + msg.ser_into(stream)?; + } + } + + Ok(()) + } +} + +pub trait Storage { + /// Persist safekeeper state on disk, optionally syncing it. + fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>; + /// Write piece of wal in buf to disk. + fn write_wal(&mut self, s: &SafeKeeperState, startpos: Lsn, buf: &[u8]) -> Result<()>; +} + +/// SafeKeeper which consumes events (messages from compute) and provides +/// replies. +#[derive(Debug)] +pub struct SafeKeeper { + /// Locally flushed part of WAL (end_lsn of last record). Established by + /// reading wal. + pub flush_lsn: Lsn, + pub tli: u32, + pub flushed_restart_lsn: Lsn, + pub storage: ST, + pub s: SafeKeeperState, // persistent part + pub elected_proposer_term: Term, // for monitoring/debugging +} + +impl SafeKeeper +where + ST: Storage, +{ + // constructor + pub fn new(flush_lsn: Lsn, tli: u32, storage: ST, state: SafeKeeperState) -> SafeKeeper { + SafeKeeper { + flush_lsn, + tli, + flushed_restart_lsn: Lsn(0), + storage, + s: state, + elected_proposer_term: 0, + } + } + + /// Process message from proposer and possibly form reply. Concurrent + /// callers must exclude each other. + pub fn process_msg( + &mut self, + msg: &ProposerAcceptorMessage, + ) -> Result { + match msg { + ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), + ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), + ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg), + } + } + + /// Handle initial message from proposer: check its sanity and send my + /// current term. + fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result { + /* Check protocol compatibility */ + if msg.protocol_version != SK_PROTOCOL_VERSION { + bail!( + "incompatible protocol version {}, expected {}", + msg.protocol_version, + SK_PROTOCOL_VERSION + ); + } + if self.s.server.system_id != 0 && self.s.server.system_id != msg.system_id { + bail!( + "system identifier changed: got {}, expected {}", + msg.system_id, + self.s.server.system_id, + ); + } + /* Postgres upgrade is not treated as fatal error */ + if msg.pg_version != self.s.server.pg_version + && self.s.server.pg_version != UNKNOWN_SERVER_VERSION + { + info!( + "incompatible server version {}, expected {}", + msg.pg_version, self.s.server.pg_version + ); + } + + // set basic info about server, if not yet + self.s.server.system_id = msg.system_id; + self.s.server.ztli = msg.ztli; + self.s.server.tli = msg.tli; + self.s.server.wal_seg_size = msg.wal_seg_size; + self.s.proposer_uuid = msg.proposer_id; + self.storage.persist(&self.s, true)?; + + info!( + "processed greeting from proposer {:?}, sending term {:?}", + msg.proposer_id, self.s.acceptor_state.term + ); + Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting { + term: self.s.acceptor_state.term, + })) + } + + /// Give vote for the given term, if we haven't done that previously. + fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result { + // initialize with refusal + let mut resp = VoteResponse { + term: msg.term, + vote_given: false as u64, + epoch: 0, + flush_lsn: Lsn(0), + restart_lsn: Lsn(0), + }; + if self.s.acceptor_state.term < msg.term { + self.s.acceptor_state.term = msg.term; + // persist vote before sending it out + self.storage.persist(&self.s, true)?; + resp.vote_given = true as u64; + resp.epoch = self.s.acceptor_state.epoch; + resp.flush_lsn = self.flush_lsn; + resp.restart_lsn = self.s.restart_lsn; + } + info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); + Ok(AcceptorProposerMessage::VoteResponse(resp)) + } + + /// Handle request to append WAL. + fn handle_append_request(&mut self, msg: &AppendRequest) -> Result { + // log first AppendRequest from this proposer + if self.elected_proposer_term < msg.h.term { + info!( + "start receiving WAL from timeline {} term {}", + self.s.server.ztli, msg.h.term, + ); + self.elected_proposer_term = msg.h.term; + } + + // If our term is lower than elected proposer one, bump it. + if self.s.acceptor_state.term < msg.h.term { + self.s.acceptor_state.term = msg.h.term; + self.storage.persist(&self.s, true)?; + } + // OTOH, if it is higher, immediately refuse the message. + else if self.s.acceptor_state.term > msg.h.term { + let resp = AppendResponse { + term: self.s.acceptor_state.term, + epoch: self.s.acceptor_state.epoch, + flush_lsn: Lsn(0), + hs_feedback: HotStandbyFeedback::empty(), + }; + return Ok(AcceptorProposerMessage::AppendResponse(resp)); + } + + // do the job + self.storage + .write_wal(&self.s, msg.h.begin_lsn, &msg.wal_data)?; + let mut sync_control_file = false; + /* + * Epoch switch happen when written WAL record cross the boundary. + * The boundary is maximum of last WAL position at this node (FlushLSN) and global + * maximum (vcl) determined by WAL proposer during handshake. + * Switching epoch means that node completes recovery and start writing in the WAL new data. + * XXX: this is wrong, we must actively truncate not matching part of log. + */ + if self.s.acceptor_state.epoch < msg.h.term + && msg.h.end_lsn > max(self.flush_lsn, msg.h.vcl) + { + info!("switched to new epoch {}", msg.h.term); + self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ + sync_control_file = true; + } + if msg.h.end_lsn > self.flush_lsn { + self.flush_lsn = msg.h.end_lsn; + } + + self.s.proposer_uuid = msg.h.proposer_uuid; + self.s.commit_lsn = msg.h.commit_lsn; + self.s.restart_lsn = msg.h.restart_lsn; + + /* + * Update restart LSN in control file. + * To avoid negative impact on performance of extra fsync, do it only + * when restart_lsn delta exceeds WAL segment size. + */ + sync_control_file |= + self.flushed_restart_lsn + (self.s.server.wal_seg_size as u64) < self.s.restart_lsn; + self.storage.persist(&self.s, sync_control_file)?; + if sync_control_file { + self.flushed_restart_lsn = self.s.restart_lsn; + } + + let resp = AppendResponse { + term: self.s.acceptor_state.term, + epoch: self.s.acceptor_state.epoch, + flush_lsn: self.flush_lsn, + // will be filled by caller code to avoid bothering safekeeper + hs_feedback: HotStandbyFeedback::empty(), + }; + trace!( + "processed AppendRequest of len {}, flush_lsn={:X}/{:>08X}, resp {:?}", + msg.wal_data.len(), + (self.flush_lsn.0 >> 32) as u32, + self.flush_lsn.0 as u32, + &resp, + ); + Ok(AcceptorProposerMessage::AppendResponse(resp)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // fake storage for tests + struct InMemoryStorage { + persisted_state: SafeKeeperState, + } + + impl Storage for InMemoryStorage { + fn persist(&mut self, s: &SafeKeeperState, _sync: bool) -> Result<()> { + self.persisted_state = s.clone(); + Ok(()) + } + + fn write_wal(&mut self, _s: &SafeKeeperState, _startpos: Lsn, _buf: &[u8]) -> Result<()> { + Ok(()) + } + } + + #[test] + fn test_voting() { + let storage = InMemoryStorage { + persisted_state: SafeKeeperState::new(), + }; + let mut sk = SafeKeeper::new(Lsn(0), 0, storage, SafeKeeperState::new()); + + // check voting for 1 is ok + let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); + let mut vote_resp = sk.process_msg(&vote_request); + match vote_resp.unwrap() { + AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0), + _ => assert!(false), + } + + // reboot... + let state = sk.storage.persisted_state.clone(); + let storage = InMemoryStorage { + persisted_state: state.clone(), + }; + sk = SafeKeeper::new(Lsn(0), 0, storage, state); + + // and ensure voting second time for 1 is not ok + vote_resp = sk.process_msg(&vote_request); + match vote_resp.unwrap() { + AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0), + _ => assert!(false), + } + } + + #[test] + fn test_epoch_switch() { + let storage = InMemoryStorage { + persisted_state: SafeKeeperState::new(), + }; + let mut sk = SafeKeeper::new(Lsn(0), 0, storage, SafeKeeperState::new()); + + let mut ar_hdr = AppendRequestHeader { + term: 1, + vcl: Lsn(2), + begin_lsn: Lsn(1), + end_lsn: Lsn(2), + commit_lsn: Lsn(0), + restart_lsn: Lsn(0), + proposer_uuid: [0; 16], + }; + let mut append_request = AppendRequest { + h: ar_hdr.clone(), + wal_data: Bytes::from_static(b"b"), + }; + + // check that AppendRequest before VCL doesn't switch epoch + let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); + assert!(resp.is_ok()); + assert!(sk.storage.persisted_state.acceptor_state.epoch == 0); + + // but record after VCL does the switch + ar_hdr.begin_lsn = Lsn(2); + ar_hdr.end_lsn = Lsn(3); + append_request = AppendRequest { + h: ar_hdr.clone(), + wal_data: Bytes::from_static(b"b"), + }; + let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); + assert!(resp.is_ok()); + assert!(sk.storage.persisted_state.acceptor_state.epoch == 1); + } +} diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a6be98fc1a..7dd0a1b5f1 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -21,6 +21,7 @@ pub struct SendWalHandler { pub conf: WalAcceptorConf, /// assigned application name pub appname: Option, + pub timelineid: Option, pub timeline: Option>, } @@ -29,7 +30,7 @@ impl postgres_backend::Handler for SendWalHandler { match sm.params.get("ztimelineid") { Some(ref ztimelineid) => { let ztlid = ZTimelineId::from_str(ztimelineid)?; - self.timeline.set(ztlid)?; + self.timelineid = Some(ztlid); } _ => bail!("timelineid is required"), } @@ -40,6 +41,16 @@ impl postgres_backend::Handler for SendWalHandler { } fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> { + // START_WAL_PUSH is the only command that initializes the timeline + if self.timeline.is_none() { + if query_string.starts_with(b"START_WAL_PUSH") { + self.timeline + .set(&self.conf, self.timelineid.unwrap(), true)?; + } else { + self.timeline + .set(&self.conf, self.timelineid.unwrap(), false)?; + } + } if query_string.starts_with(b"IDENTIFY_SYSTEM") { self.handle_identify_system(pgb)?; Ok(()) @@ -60,6 +71,7 @@ impl SendWalHandler { SendWalHandler { conf, appname: None, + timelineid: None, timeline: None, } } @@ -68,7 +80,7 @@ impl SendWalHandler { /// Handle IDENTIFY_SYSTEM replication command /// fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { - let (start_pos, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false); + let (start_pos, timeline) = self.timeline.get().get_end_of_wal(); let lsn = start_pos.to_string(); let tli = timeline.to_string(); let sysid = self.timeline.get().get_info().server.system_id.to_string(); diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 6aaabeaa78..4293c773ef 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -1,50 +1,77 @@ -//! This module contains tools for managing timelines. -//! +//! This module contains timeline id -> safekeeper state map with file-backed +//! persistence and support for interaction between sending and receiving wal. use anyhow::{bail, Result}; use fs2::FileExt; use lazy_static::lazy_static; use log::*; -use postgres_ffi::xlog_utils::{find_end_of_wal, TimeLineID}; +use postgres_ffi::xlog_utils::find_end_of_wal; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; -use std::io::{Seek, SeekFrom}; -use std::path::Path; +use std::io::{Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; + use zenith_utils::zid::ZTimelineId; -use crate::receive_wal::{SafeKeeperInfo, CONTROL_FILE_NAME, SK_FORMAT_VERSION, SK_MAGIC}; use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; +use crate::safekeeper::{ + AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, Storage, + SK_FORMAT_VERSION, SK_MAGIC, +}; use crate::WalAcceptorConf; +use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; + +const CONTROL_FILE_NAME: &str = "safekeeper.control"; /// Shared state associated with database instance (tenant) #[derive(Debug)] struct SharedState { - /// quorum commit LSN + /// Safekeeper object + sk: SafeKeeper, + /// opened file control file handle (needed to hold exlusive file lock) + control_file: File, + /// For receiving-sending wal cooperation + /// quorum commit LSN we've notified walsenders about commit_lsn: Lsn, - /// information about this safekeeper - info: SafeKeeperInfo, - /// opened file control file handle (needed to hold exlusive file lock - control_file: Option, /// combined hot standby feedback from all replicas hs_feedback: HotStandbyFeedback, } impl SharedState { - fn new() -> Self { - Self { + /// Restore SharedState from control file. Locks the control file along the + /// way to prevent running more than one instance of safekeeper on the same + /// data dir. + /// If create=false and file doesn't exist, bails out. + fn create_restore( + conf: &WalAcceptorConf, + timelineid: ZTimelineId, + create: bool, + ) -> Result { + let (cf, state) = SharedState::load_control_file(conf, timelineid, create)?; + let storage = FileStorage { + control_file: cf.try_clone()?, + conf: conf.clone(), + }; + let (flush_lsn, tli) = if state.server.wal_seg_size != 0 { + let wal_dir = conf.data_dir.join(format!("{}", timelineid)); + find_end_of_wal(&wal_dir, state.server.wal_seg_size as usize, true) + } else { + (0, 0) + }; + + Ok(Self { commit_lsn: Lsn(0), - info: SafeKeeperInfo::new(), - control_file: None, + sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state), + control_file: cf, hs_feedback: HotStandbyFeedback { ts: 0, xmin: u64::MAX, catalog_xmin: u64::MAX, }, - } + }) } /// Accumulate hot standby feedbacks from replicas @@ -54,80 +81,75 @@ impl SharedState { self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts); } - /// Load and lock control file (prevent running more than one instance of safekeeper) - pub fn load_control_file( - &mut self, + /// Fetch and lock control file (prevent running more than one instance of safekeeper) + /// If create=false and file doesn't exist, bails out. + fn load_control_file( conf: &WalAcceptorConf, timelineid: ZTimelineId, - ) -> Result<()> { - if self.control_file.is_some() { - info!("control file for timeline {} is already open", timelineid); - return Ok(()); - } - + create: bool, + ) -> Result<(File, SafeKeeperState)> { let control_file_path = conf .data_dir .join(timelineid.to_string()) .join(CONTROL_FILE_NAME); - info!("loading control file {}", control_file_path.display()); - match OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&control_file_path) - { - Ok(file) => { + info!( + "loading control file {}, create={}", + control_file_path.display(), + create + ); + let mut opts = OpenOptions::new(); + opts.read(true).write(true); + if create { + opts.create(true); + } + match opts.open(&control_file_path) { + Ok(mut file) => { // Lock file to prevent two or more active wal_acceptors match file.try_lock_exclusive() { Ok(()) => {} Err(e) => { bail!( - "Control file {:?} is locked by some other process: {}", + "control file {:?} is locked by some other process: {}", &control_file_path, e ); } } - self.control_file = Some(file); - - let cfile_ref = self.control_file.as_mut().unwrap(); - match SafeKeeperInfo::des_from(cfile_ref) { - Err(e) => { - warn!("read from {:?} failed: {}", control_file_path, e); + // Empty file is legit on 'create', don't try to deser from it. + if file.metadata().unwrap().len() == 0 { + if !create { + bail!("control file is empty"); } - Ok(info) => { - if info.magic != SK_MAGIC { - bail!("Invalid control file magic: {}", info.magic); + return Ok((file, SafeKeeperState::new())); + } else { + match SafeKeeperState::des_from(&mut file) { + Err(e) => { + bail!("failed to read control file {:?}: {}", control_file_path, e); } - if info.format_version != SK_FORMAT_VERSION { - bail!( - "Incompatible format version: {} vs. {}", - info.format_version, - SK_FORMAT_VERSION - ); + Ok(s) => { + if s.magic != SK_MAGIC { + bail!("bad control file magic: {}", s.magic); + } + if s.format_version != SK_FORMAT_VERSION { + bail!( + "incompatible format version: {} vs. {}", + s.format_version, + SK_FORMAT_VERSION + ); + } + return Ok((file, s)); } - self.info = info; } } } Err(e) => { - panic!( - "Failed to open control file {:?}: {}", - &control_file_path, e + bail!( + "failed to open control file {:?}: {}", + &control_file_path, + e ); } } - Ok(()) - } - - pub fn save_control_file(&mut self, sync: bool) -> Result<()> { - let file = self.control_file.as_mut().unwrap(); - file.seek(SeekFrom::Start(0))?; - self.info.ser_into(file)?; - if sync { - file.sync_all()?; - } - Ok(()) } } @@ -179,12 +201,32 @@ impl Timeline { self.notify_wal_senders(END_REPLICATION_MARKER); } - pub fn get_info(&self) -> SafeKeeperInfo { - return self.mutex.lock().unwrap().info.clone(); + /// Pass arrived message to the safekeeper. + pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result { + let mut rmsg: AcceptorProposerMessage; + let commit_lsn: Lsn; + { + let mut shared_state = self.mutex.lock().unwrap(); + rmsg = shared_state.sk.process_msg(msg)?; + // locally available commit lsn. flush_lsn can be smaller than + // commit_lsn if we are catching up safekeeper. + commit_lsn = min(shared_state.sk.flush_lsn, shared_state.sk.s.commit_lsn); + + // if this is AppendResponse, fill in proper hot standby feedback + match rmsg { + AcceptorProposerMessage::AppendResponse(ref mut resp) => { + resp.hs_feedback = shared_state.hs_feedback.clone(); + } + _ => (), + } + } + // Ping wal sender that new data might be available. + self.notify_wal_senders(commit_lsn); + Ok(rmsg) } - pub fn set_info(&self, info: &SafeKeeperInfo) { - self.mutex.lock().unwrap().info = info.clone(); + pub fn get_info(&self) -> SafeKeeperState { + self.mutex.lock().unwrap().sk.s.clone() } // Accumulate hot standby feedbacks from replicas @@ -198,44 +240,36 @@ impl Timeline { shared_state.hs_feedback.clone() } - pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.load_control_file(conf, self.timelineid) - } - - pub fn save_control_file(&self, sync: bool) -> Result<()> { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.save_control_file(sync) + pub fn get_end_of_wal(&self) -> (Lsn, u32) { + let shared_state = self.mutex.lock().unwrap(); + (shared_state.sk.flush_lsn, shared_state.sk.tli) } } // Utilities needed by various Connection-like objects pub trait TimelineTools { - fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>; + fn set(&mut self, conf: &WalAcceptorConf, timeline_id: ZTimelineId, create: bool) + -> Result<()>; fn get(&self) -> &Arc; - fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID); } impl TimelineTools for Option> { - fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> { + fn set( + &mut self, + conf: &WalAcceptorConf, + timeline_id: ZTimelineId, + create: bool, + ) -> Result<()> { // We will only set the timeline once. If it were to ever change, // anyone who cloned the Arc would be out of date. assert!(self.is_none()); - *self = Some(GlobalTimelines::store(timeline_id)?); + *self = Some(GlobalTimelines::get(conf, timeline_id, create)?); Ok(()) } fn get(&self) -> &Arc { self.as_ref().unwrap() } - - /// Find last WAL record. If "precise" is false then just locate last partial segment - fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) { - let seg_size = self.get().get_info().server.wal_seg_size as usize; - let wal_dir = data_dir.join(format!("{}", self.get().timelineid)); - let (lsn, timeline) = find_end_of_wal(&wal_dir, seg_size, precise); - (Lsn(lsn), timeline) - } } lazy_static! { @@ -247,22 +281,143 @@ lazy_static! { struct GlobalTimelines; impl GlobalTimelines { - /// Store a new timeline into the global TIMELINES map. - fn store(timeline_id: ZTimelineId) -> Result> { + /// Get a timeline with control file loaded from the global TIMELINES map. + /// If control file doesn't exist and create=false, bails out. + pub fn get( + conf: &WalAcceptorConf, + timeline_id: ZTimelineId, + create: bool, + ) -> Result> { let mut timelines = TIMELINES.lock().unwrap(); match timelines.get(&timeline_id) { Some(result) => Ok(Arc::clone(result)), None => { - info!("creating timeline dir {}", timeline_id); + info!( + "creating timeline dir {}, create is {}", + timeline_id, create + ); fs::create_dir_all(timeline_id.to_string())?; - let shared_state = SharedState::new(); + let shared_state = SharedState::create_restore(conf, timeline_id, create)?; - let new_tid = Arc::new(Timeline::new(timeline_id, shared_state)); - timelines.insert(timeline_id, Arc::clone(&new_tid)); - Ok(new_tid) + let new_tli = Arc::new(Timeline::new(timeline_id, shared_state)); + timelines.insert(timeline_id, Arc::clone(&new_tli)); + Ok(new_tli) } } } } + +#[derive(Debug)] +struct FileStorage { + control_file: File, + conf: WalAcceptorConf, +} + +impl Storage for FileStorage { + fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()> { + self.control_file.seek(SeekFrom::Start(0))?; + s.ser_into(&mut self.control_file)?; + if sync { + self.control_file.sync_all()?; + } + Ok(()) + } + + fn write_wal(&mut self, s: &SafeKeeperState, startpos: Lsn, buf: &[u8]) -> Result<()> { + let mut bytes_left: usize = buf.len(); + let mut bytes_written: usize = 0; + let mut partial; + let mut start_pos = startpos; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + let wal_seg_size = s.server.wal_seg_size as usize; + let ztli = s.server.ztli; + + /* Extract WAL location for this block */ + let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize; + + while bytes_left != 0 { + let bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach wal + * segment size. + */ + if xlogoff + bytes_left > wal_seg_size { + bytes_to_write = wal_seg_size - xlogoff; + } else { + bytes_to_write = bytes_left; + } + + /* Open file */ + let segno = start_pos.segment_number(wal_seg_size); + // note: we basically don't support changing pg timeline + let wal_file_name = XLogFileName(s.server.tli, segno, wal_seg_size); + let wal_file_path = self + .conf + .data_dir + .join(ztli.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .data_dir + .join(ztli.to_string()) + .join(wal_file_name.clone() + ".partial"); + + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) + { + /* Try to open existed partial file */ + wal_file = file; + partial = true; + } else { + /* Create and fill new partial file */ + partial = true; + match OpenOptions::new() + .create(true) + .write(true) + .open(&wal_file_partial_path) + { + Ok(mut file) => { + for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { + file.write_all(&ZERO_BLOCK)?; + } + wal_file = file; + } + Err(e) => { + error!("Failed to open log file {:?}: {}", &wal_file_path, e); + return Err(e.into()); + } + } + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; + + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + start_pos += bytes_to_write as u64; + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if start_pos.segment_offset(wal_seg_size) == 0 { + xlogoff = 0; + if partial { + fs::rename(&wal_file_partial_path, &wal_file_path)?; + } + } + } + Ok(()) + } +}