diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 57cf379a96..120e6c769b 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -392,6 +392,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin): "lm_prefix": "prefix", "lm_message": "message", "set_commit_lsn": True, + "send_proposer_elected": True, "term": 2, "begin_lsn": begin_lsn, "epoch_start_lsn": epoch_start_lsn, diff --git a/vendor/postgres b/vendor/postgres index 08878b19d3..a70d892bb9 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 08878b19d3cae5a1bd765bf7396187b6b806c6ac +Subproject commit a70d892bb93e0a8a6cda8a8fccd4e4fbf408ea79 diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index 159ec17a9b..dc1905a5a7 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -7,7 +7,8 @@ use std::fmt::Display; use std::sync::Arc; use zenith_utils::lsn::Lsn; -use crate::safekeeper::AcceptorState; +use crate::safekeeper::Term; +use crate::safekeeper::TermHistory; use crate::timeline::CreateControlFile; use crate::timeline::GlobalTimelines; use crate::SafeKeeperConf; @@ -29,6 +30,7 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { .as_ref() } +/// Serialize through Display trait. fn display_serialize(z: &F, s: S) -> Result where S: Serializer, @@ -37,6 +39,14 @@ where s.serialize_str(&format!("{}", z)) } +/// Augment AcceptorState with epoch for convenience +#[derive(Debug, Serialize)] +struct AcceptorStateStatus { + term: Term, + epoch: Term, + term_history: TermHistory, +} + /// Info about timeline on safekeeper ready for reporting. #[derive(Debug, Serialize)] struct TimelineStatus { @@ -44,7 +54,7 @@ struct TimelineStatus { tenant_id: ZTenantId, #[serde(serialize_with = "display_serialize")] timeline_id: ZTimelineId, - acceptor_state: AcceptorState, + acceptor_state: AcceptorStateStatus, #[serde(serialize_with = "display_serialize")] commit_lsn: Lsn, #[serde(serialize_with = "display_serialize")] @@ -68,10 +78,16 @@ async fn timeline_status_handler(request: Request) -> Result Result<()> { let response = swh.timeline.get().process_msg(&greeting_request)?; match response { - AcceptorProposerMessage::Greeting(_) => Ok(()), + Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), _ => anyhow::bail!("not GreetingResponse"), } } +fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> { + // add new term to existing history + let history = swh.timeline.get().get_info().acceptor_state.term_history; + let history = history.up_to(lsn.checked_sub(1u64).unwrap()); + let mut history_entries = history.0; + history_entries.push(TermSwitchEntry { term, lsn }); + let history = TermHistory(history_entries); + + let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected { + term, + start_streaming_at: lsn, + term_history: history, + }); + + swh.timeline.get().process_msg(&proposer_elected_request)?; + Ok(()) +} + #[derive(Serialize, Deserialize)] struct InsertedWAL { begin_lsn: Lsn, @@ -150,7 +176,7 @@ fn append_logical_message( let response = swh.timeline.get().process_msg(&append_request)?; let append_response = match response { - AcceptorProposerMessage::AppendResponse(resp) => resp, + Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, _ => anyhow::bail!("not AppendResponse"), }; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 9498980802..a653c41922 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result}; use bytes::Bytes; +use bytes::BytesMut; use log::*; use postgres::{Client, Config, NoTls}; @@ -98,7 +99,7 @@ impl<'pg> ReceiveWalConn<'pg> { // Send message to the postgres fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> { - let mut buf = Vec::new(); + let mut buf = BytesMut::with_capacity(128); msg.serialize(&mut buf)?; self.pg_backend.write_message(&BeMessage::CopyData(&buf))?; Ok(()) @@ -147,7 +148,9 @@ impl<'pg> ReceiveWalConn<'pg> { .get() .process_msg(&msg) .with_context(|| "failed to process ProposerAcceptorMessage")?; - self.write_msg(&reply)?; + if let Some(reply) = reply { + self.write_msg(&reply)?; + } msg = self.read_msg()?; } } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 0b25241165..2a15bb3fc6 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -4,16 +4,16 @@ use anyhow::Context; use anyhow::{anyhow, bail, Result}; use byteorder::LittleEndian; use byteorder::ReadBytesExt; -use byteorder::WriteBytesExt; use bytes::Buf; +use bytes::BufMut; use bytes::Bytes; +use bytes::BytesMut; use log::*; use pageserver::waldecoder::WalStreamDecoder; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; -use std::cmp::max; use std::cmp::min; -use std::io; +use std::fmt; use std::io::Read; use lazy_static::lazy_static; @@ -37,6 +37,70 @@ const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct TermSwitchEntry { + pub term: Term, + pub lsn: Lsn, +} +#[derive(Clone, Serialize, Deserialize)] +pub struct TermHistory(pub Vec); + +impl TermHistory { + pub fn empty() -> TermHistory { + TermHistory(Vec::new()) + } + + // Parse TermHistory as n_entries followed by TermSwitchEntry pairs + pub fn from_bytes(mut bytes: Bytes) -> Result { + if bytes.remaining() < 4 { + bail!("TermHistory misses len"); + } + let n_entries = bytes.get_u32_le(); + let mut res = Vec::with_capacity(n_entries as usize); + for _ in 0..n_entries { + if bytes.remaining() < 16 { + bail!("TermHistory is incomplete"); + } + res.push(TermSwitchEntry { + term: bytes.get_u64_le(), + lsn: bytes.get_u64_le().into(), + }) + } + Ok(TermHistory(res)) + } + + /// Return copy of self with switches happening strictly after up_to + /// truncated. + pub fn up_to(&self, up_to: Lsn) -> TermHistory { + let mut res = Vec::with_capacity(self.0.len()); + for e in &self.0 { + if e.lsn > up_to { + break; + } + res.push(*e); + } + TermHistory(res) + } +} + +/// Display only latest entries for Debug. +impl fmt::Debug for TermHistory { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let n_printed = 20; + write!( + fmt, + "{}{:?}", + if self.0.len() > n_printed { "... " } else { "" }, + self.0 + .iter() + .rev() + .take(n_printed) + .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry + .collect::>() + ) + } +} + /// Unique id of proposer. Not needed for correctness, used for monitoring. type PgUuid = [u8; 16]; @@ -45,8 +109,21 @@ type PgUuid = [u8; 16]; pub struct AcceptorState { /// acceptor's last term it voted for (advanced in 1 phase) pub term: Term, - /// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached). - pub epoch: Term, + /// History of term switches for safekeeper's WAL. + /// Actually it often goes *beyond* WAL contents as we adopt term history + /// from the proposer before recovery. + pub term_history: TermHistory, +} + +impl AcceptorState { + /// acceptor's epoch is the term of the highest entry in the log + pub fn get_epoch(&self, flush_lsn: Lsn) -> Term { + let th = self.term_history.up_to(flush_lsn); + match th.0.last() { + Some(e) => e.term, + None => 0, + } + } } /// Information about Postgres. Safekeeper gets it once and then verifies @@ -91,7 +168,10 @@ impl SafeKeeperState { SafeKeeperState { magic: SK_MAGIC, format_version: SK_FORMAT_VERSION, - acceptor_state: AcceptorState { term: 0, epoch: 0 }, + acceptor_state: AcceptorState { + term: 0, + term_history: TermHistory::empty(), + }, server: ServerInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ system_id: 0, /* Postgres system identifier */ @@ -147,16 +227,28 @@ pub struct VoteRequest { /// Vote itself, sent from safekeeper to proposer #[derive(Debug, Serialize)] pub struct VoteResponse { - term: Term, // not really needed, just a sanity check + term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. vote_given: u64, // fixme u64 due to padding - /// Safekeeper's log position, to let proposer choose the most advanced one - epoch: Term, + // Safekeeper flush_lsn (end of WAL) + history of term switches allow + // proposer to choose the most advanced one. flush_lsn: Lsn, truncate_lsn: Lsn, + term_history: TermHistory, +} + +/* + * Proposer -> Acceptor message announcing proposer is elected and communicating + * term history to it. + */ +#[derive(Debug)] +pub struct ProposerElected { + pub term: Term, + pub start_streaming_at: Lsn, + pub term_history: TermHistory, } /// Request with WAL message sent from proposer to safekeeper. Along the way it -/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn. +/// communicates commit_lsn. #[derive(Debug)] pub struct AppendRequest { pub h: AppendRequestHeader, @@ -164,6 +256,7 @@ pub struct AppendRequest { } #[derive(Debug, Clone, Deserialize)] pub struct AppendRequestHeader { + // safekeeper's current term; if it is higher than proposer's, the compute is out of date. pub term: Term, // LSN since the proposer appends WAL; determines epoch switch point. pub epoch_start_lsn: Lsn, @@ -185,7 +278,6 @@ pub struct AppendResponse { // Current term of the safekeeper; if it is higher than proposer's, the // compute is out of date. pub term: Term, - pub epoch: Term, // NOTE: this is physical end of wal on safekeeper; currently it doesn't // make much sense without taking epoch into account, as history can be // diverged. @@ -198,19 +290,32 @@ pub struct AppendResponse { pub hs_feedback: HotStandbyFeedback, } +impl AppendResponse { + fn term_only(term: Term) -> AppendResponse { + AppendResponse { + term, + flush_lsn: Lsn(0), + commit_lsn: Lsn(0), + disk_consistent_lsn: Lsn(0), + hs_feedback: HotStandbyFeedback::empty(), + } + } +} + /// Proposer -> Acceptor messages #[derive(Debug)] pub enum ProposerAcceptorMessage { Greeting(ProposerGreeting), VoteRequest(VoteRequest), + Elected(ProposerElected), AppendRequest(AppendRequest), } impl ProposerAcceptorMessage { /// Parse proposer message. - pub fn parse(msg: Bytes) -> Result { + pub fn parse(msg_bytes: Bytes) -> Result { // xxx using Reader is inefficient but easy to work with bincode - let mut stream = msg.reader(); + let mut stream = msg_bytes.reader(); // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is let tag = stream.read_u64::()? as u8 as char; match tag { @@ -222,6 +327,21 @@ impl ProposerAcceptorMessage { let msg = VoteRequest::des_from(&mut stream)?; Ok(ProposerAcceptorMessage::VoteRequest(msg)) } + 'e' => { + let mut msg_bytes = stream.into_inner(); + if msg_bytes.remaining() < 16 { + bail!("ProposerElected message is not complete"); + } + let term = msg_bytes.get_u64_le(); + let start_streaming_at = msg_bytes.get_u64_le().into(); + let term_history = TermHistory::from_bytes(msg_bytes)?; + let msg = ProposerElected { + term, + start_streaming_at, + term_history, + }; + Ok(ProposerAcceptorMessage::Elected(msg)) + } 'a' => { // read header followed by wal data let hdr = AppendRequestHeader::des_from(&mut stream)?; @@ -259,19 +379,33 @@ pub enum AcceptorProposerMessage { impl AcceptorProposerMessage { /// Serialize acceptor -> proposer message. - pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> { + pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> { match self { AcceptorProposerMessage::Greeting(msg) => { - stream.write_u64::('g' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('g' as u64); + buf.put_u64_le(msg.term); } AcceptorProposerMessage::VoteResponse(msg) => { - stream.write_u64::('v' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('v' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.vote_given); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.truncate_lsn.into()); + buf.put_u32_le(msg.term_history.0.len() as u32); + for e in &msg.term_history.0 { + buf.put_u64_le(e.term); + buf.put_u64_le(e.lsn.into()); + } } AcceptorProposerMessage::AppendResponse(msg) => { - stream.write_u64::('a' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('a' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.commit_lsn.into()); + buf.put_u64_le(msg.disk_consistent_lsn.into()); + buf.put_i64_le(msg.hs_feedback.ts); + buf.put_u64_le(msg.hs_feedback.xmin); + buf.put_u64_le(msg.hs_feedback.catalog_xmin); } } @@ -284,6 +418,8 @@ pub trait Storage { fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>; /// Write piece of wal in buf to disk and sync it. fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>; + // Truncate WAL at specified LSN + fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>; } lazy_static! { @@ -357,8 +493,7 @@ pub struct SafeKeeper { pub commit_lsn: Lsn, pub truncate_lsn: Lsn, pub storage: ST, - pub s: SafeKeeperState, // persistent part - pub elected_proposer_term: Term, // for monitoring/debugging + pub s: SafeKeeperState, // persistent part decoder: WalStreamDecoder, } @@ -375,27 +510,40 @@ where truncate_lsn: state.truncate_lsn, storage, s: state, - elected_proposer_term: 0, decoder: WalStreamDecoder::new(Lsn(0)), } } + /// Get history of term switches for the available WAL + fn get_term_history(&self) -> TermHistory { + self.s.acceptor_state.term_history.up_to(self.flush_lsn) + } + + #[cfg(test)] + fn get_epoch(&self) -> Term { + self.s.acceptor_state.get_epoch(self.flush_lsn) + } + /// Process message from proposer and possibly form reply. Concurrent /// callers must exclude each other. pub fn process_msg( &mut self, msg: &ProposerAcceptorMessage, - ) -> Result { + ) -> Result> { match msg { ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), + ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg), ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg), } } /// Handle initial message from proposer: check its sanity and send my /// current term. - fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result { + fn handle_greeting( + &mut self, + msg: &ProposerGreeting, + ) -> Result> { /* Check protocol compatibility */ if msg.protocol_version != SK_PROTOCOL_VERSION { bail!( @@ -429,64 +577,106 @@ where "processed greeting from proposer {:?}, sending term {:?}", msg.proposer_id, self.s.acceptor_state.term ); - Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting { + Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { term: self.s.acceptor_state.term, - })) + }))) } /// Give vote for the given term, if we haven't done that previously. - fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result { + fn handle_vote_request( + &mut self, + msg: &VoteRequest, + ) -> Result> { // initialize with refusal let mut resp = VoteResponse { - term: msg.term, + term: self.s.acceptor_state.term, vote_given: false as u64, - epoch: 0, - flush_lsn: Lsn(0), - truncate_lsn: Lsn(0), + flush_lsn: self.flush_lsn, + truncate_lsn: self.s.truncate_lsn, + term_history: self.get_term_history(), }; if self.s.acceptor_state.term < msg.term { self.s.acceptor_state.term = msg.term; // persist vote before sending it out self.storage.persist(&self.s, true)?; + resp.term = self.s.acceptor_state.term; resp.vote_given = true as u64; - resp.epoch = self.s.acceptor_state.epoch; - resp.flush_lsn = self.flush_lsn; - resp.truncate_lsn = self.s.truncate_lsn; } info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); - Ok(AcceptorProposerMessage::VoteResponse(resp)) + Ok(Some(AcceptorProposerMessage::VoteResponse(resp))) + } + + /// Bump our term if received a note from elected proposer with higher one + fn bump_if_higher(&mut self, term: Term) -> Result<()> { + if self.s.acceptor_state.term < term { + self.s.acceptor_state.term = term; + self.storage.persist(&self.s, true)?; + } + Ok(()) + } + + /// Form AppendResponse from current state. + fn append_response(&self) -> AppendResponse { + AppendResponse { + term: self.s.acceptor_state.term, + flush_lsn: self.flush_lsn, + commit_lsn: self.s.commit_lsn, + disk_consistent_lsn: Lsn(0), + // will be filled by the upper code to avoid bothering safekeeper + hs_feedback: HotStandbyFeedback::empty(), + } + } + + fn handle_elected(&mut self, msg: &ProposerElected) -> Result> { + info!("received ProposerElected {:?}", msg); + self.bump_if_higher(msg.term)?; + // If our term is higher, ignore the message (next feedback will inform the compute) + if self.s.acceptor_state.term > msg.term { + return Ok(None); + } + + // TODO: cross check divergence point + + // streaming must not create a hole + assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at); + + // truncate obsolete part of WAL + if self.flush_lsn != Lsn(0) { + self.storage + .truncate_wal(&self.s.server, msg.start_streaming_at)?; + } + // update our end of WAL pointer + self.flush_lsn = msg.start_streaming_at; + // and now adopt term history from proposer + self.s.acceptor_state.term_history = msg.term_history.clone(); + self.storage.persist(&self.s, true)?; + + info!("start receiving WAL since {:?}", msg.start_streaming_at); + + Ok(None) } /// Handle request to append WAL. #[allow(clippy::comparison_chain)] - fn handle_append_request(&mut self, msg: &AppendRequest) -> Result { - // log first AppendRequest from this proposer - if self.elected_proposer_term < msg.h.term { - info!( - "start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}", - self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn, - ); - self.elected_proposer_term = msg.h.term; + fn handle_append_request( + &mut self, + msg: &AppendRequest, + ) -> Result> { + if self.s.acceptor_state.term < msg.h.term { + bail!("got AppendRequest before ProposerElected"); } - // If our term is lower than elected proposer one, bump it. - if self.s.acceptor_state.term < msg.h.term { - self.s.acceptor_state.term = msg.h.term; - self.storage.persist(&self.s, true)?; - } - // OTOH, if it is higher, immediately refuse the message. - else if self.s.acceptor_state.term > msg.h.term { - let resp = AppendResponse { - term: self.s.acceptor_state.term, - epoch: self.s.acceptor_state.epoch, - commit_lsn: Lsn(0), - flush_lsn: Lsn(0), - disk_consistent_lsn: Lsn(0), - hs_feedback: HotStandbyFeedback::empty(), - }; - return Ok(AcceptorProposerMessage::AppendResponse(resp)); + // If our term is higher, immediately refuse the message. + if self.s.acceptor_state.term > msg.h.term { + let resp = AppendResponse::term_only(self.s.acceptor_state.term); + return Ok(Some(AcceptorProposerMessage::AppendResponse(resp))); } + // After ProposerElected, which performs truncation, we should get only + // indeed append requests (but flush_lsn is advanced only on record + // boundary, so might be less). + assert!(self.flush_lsn <= msg.h.begin_lsn); + self.s.proposer_uuid = msg.h.proposer_uuid; let mut sync_control_file = false; @@ -530,48 +720,21 @@ where } } - /* - * Epoch switch happen when written WAL record cross the boundary. - * The boundary is maximum of last WAL position at this node (FlushLSN) and global - * maximum (vcl) determined by WAL proposer during handshake. - * Switching epoch means that node completes recovery and start writing in the WAL new data. - * XXX: this is wrong, we must actively truncate not matching part of log. - * - * The non-strict inequality is important for us, as proposer in --sync mode doesn't - * generate new records, but to advance commit_lsn epoch switch must happen on majority. - * We can regard this as commit of empty entry in new epoch, this should be safe. - */ - if self.s.acceptor_state.epoch < msg.h.term - && msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn) - { - info!( - "switched to new epoch {} on receival of request end_lsn={:?}, len={:?}", - msg.h.term, - msg.h.end_lsn, - msg.wal_data.len(), - ); - self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ - sync_control_file = true; - } if last_rec_lsn > self.flush_lsn { self.flush_lsn = last_rec_lsn; self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); } - // Advance commit_lsn taking into account what we have locally. xxx this - // is wrapped into epoch check because we overwrite wal instead of - // truncating it, so without it commit_lsn might include wrong part. - // Anyway, nobody is much interested in our commit_lsn while epoch - // switch hasn't happened, right? - // + // Advance commit_lsn taking into account what we have locally. // commit_lsn can be 0, being unknown to new walproposer while he hasn't // collected majority of its epoch acks yet, ignore it in this case. - if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) { + if msg.h.commit_lsn != Lsn(0) { let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); - // If new commit_lsn reached epoch switch, force sync of control file: - // walproposer in sync mode is very interested when this happens. - sync_control_file |= - commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; + // If new commit_lsn reached epoch switch, force sync of control + // file: walproposer in sync mode is very interested when this + // happens. Note: this is for sync-safekeepers mode only, as + // otherwise commit_lsn might jump over epoch_start_lsn. + sync_control_file |= commit_lsn == msg.h.epoch_start_lsn; self.commit_lsn = commit_lsn; self.metrics .commit_lsn @@ -592,15 +755,7 @@ where } self.storage.persist(&self.s, sync_control_file)?; - let resp = AppendResponse { - term: self.s.acceptor_state.term, - epoch: self.s.acceptor_state.epoch, - flush_lsn: self.flush_lsn, - commit_lsn: self.s.commit_lsn, - disk_consistent_lsn: Lsn(0), - // will be filled by caller code to avoid bothering safekeeper - hs_feedback: HotStandbyFeedback::empty(), - }; + let resp = self.append_response(); info!( "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}", msg.wal_data.len(), @@ -609,7 +764,7 @@ where msg.h.truncate_lsn, &resp, ); - Ok(AcceptorProposerMessage::AppendResponse(resp)) + Ok(Some(AcceptorProposerMessage::AppendResponse(resp))) } } @@ -631,6 +786,10 @@ mod tests { fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> { Ok(()) } + + fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> { + Ok(()) + } } #[test] @@ -644,7 +803,7 @@ mod tests { let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); let mut vote_resp = sk.process_msg(&vote_request); match vote_resp.unwrap() { - AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), r => panic!("unexpected response: {:?}", r), } @@ -658,7 +817,7 @@ mod tests { // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request); match vote_resp.unwrap() { - AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), r => panic!("unexpected response: {:?}", r), } } @@ -684,10 +843,21 @@ mod tests { wal_data: Bytes::from_static(b"b"), }; + let pem = ProposerElected { + term: 1, + start_streaming_at: Lsn(1), + term_history: TermHistory(vec![TermSwitchEntry { + term: 1, + lsn: Lsn(3), + }]), + }; + sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) + .unwrap(); + // check that AppendRequest before epochStartLsn doesn't switch epoch let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0); + assert_eq!(sk.get_epoch(), 0); // but record at epochStartLsn does the switch ar_hdr.begin_lsn = Lsn(2); @@ -698,6 +868,7 @@ mod tests { }; let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1); + sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %) + assert_eq!(sk.get_epoch(), 1); } } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index b2698faa82..9e48a833d4 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -317,8 +317,11 @@ impl Timeline { } /// Pass arrived message to the safekeeper. - pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result { - let mut rmsg: AcceptorProposerMessage; + pub fn process_msg( + &self, + msg: &ProposerAcceptorMessage, + ) -> Result> { + let mut rmsg: Option; let commit_lsn: Lsn; { let mut shared_state = self.mutex.lock().unwrap(); @@ -328,7 +331,7 @@ impl Timeline { commit_lsn = shared_state.sk.commit_lsn; // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn - if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg { + if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; resp.disk_consistent_lsn = state.disk_consistent_lsn; @@ -596,6 +599,82 @@ impl Storage for FileStorage { } Ok(()) } + + fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> { + let partial; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + let wal_seg_size = server.wal_seg_size as usize; + let ztli = server.ztli; + + /* Extract WAL location for this block */ + let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; + + /* Open file */ + let mut segno = end_pos.segment_number(wal_seg_size); + // note: we basically don't support changing pg timeline + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name + ".partial"); + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else { + wal_file = OpenOptions::new() + .write(true) + .open(&wal_file_partial_path)?; + partial = true; + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + while xlogoff < wal_seg_size { + let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff); + wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?; + xlogoff += bytes_to_write; + } + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + if !partial { + // Make segment partial once again + fs::rename(&wal_file_path, &wal_file_partial_path)?; + } + // Remove all subsequent segments + loop { + segno += 1; + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone() + ".partial"); + // TODO: better use fs::try_exists which is currenty avaialble only in nightly build + if wal_file_path.exists() { + fs::remove_file(&wal_file_path)?; + } else if wal_file_partial_path.exists() { + fs::remove_file(&wal_file_partial_path)?; + } else { + break; + } + } + Ok(()) + } } #[cfg(test)]