From d1f0b1eda400c974d0548b26b408bed035997475 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 31 Aug 2021 21:57:42 +0300 Subject: [PATCH] Adapt safekeepers to --sync-safekeepers walproposer mode. 1) Do epoch switch without record from new epoch, immediately after recovery -- --sync-safekeepers mode doesn't generate new records. 2) Fix commit_lsn advancement by taking into account wal we have locally -- setting it further is incorrect. 3) Report it back to walproposer so he knows when sync is done. 4) Remove system id check as it is unknown in sync mode. And make logging slightly better. ref #439 --- pageserver/src/walreceiver.rs | 2 +- walkeeper/src/replication.rs | 2 +- walkeeper/src/safekeeper.rs | 81 ++++++++++++++++++++++------------- walkeeper/src/timeline.rs | 2 +- 4 files changed, 54 insertions(+), 33 deletions(-) diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 227f45fbf4..debcda3789 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -164,7 +164,7 @@ fn walreceiver_main( // There might be some padding after the last full record, skip it. startpoint += startpoint.calc_padding(8u32); - debug!( + info!( "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", last_rec_lsn, startpoint, timelineid, end_of_wal ); diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 752226ee8b..589f4298fa 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -229,7 +229,7 @@ impl ReplicationConn { start_pos += send_size as u64; - debug!("Sent WAL to page server up to {}", end_pos); + debug!("sent WAL up to {}", end_pos); // Decide whether to reuse this file. If we don't set wal_file here // a new file will be opened next time. diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 348b782758..c6f636d3fe 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -10,6 +10,7 @@ use log::*; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; use std::cmp::max; +use std::cmp::min; use std::io; use std::io::Read; @@ -47,6 +48,7 @@ pub struct ServerInfo { /// Postgres server version pub pg_version: u32, pub system_id: SystemId, + pub tenant_id: ZTenantId, /// Zenith timelineid pub ztli: ZTimelineId, pub tli: TimeLineID, @@ -65,10 +67,9 @@ pub struct SafeKeeperState { /// information about server pub server: ServerInfo, /// Unique id of the last *elected* proposer we dealed with. Not needed - /// correctness, exists for monitoring purposes. + /// for correctness, exists for monitoring purposes. pub proposer_uuid: PgUuid, - /// part of WAL acknowledged by quorum (note that we might not have wal to - /// up this point locally) + /// part of WAL acknowledged by quorum and available locally pub commit_lsn: Lsn, /// minimal LSN which may be needed for recovery of some safekeeper (end lsn /// + 1 of last record streamed to everyone) @@ -84,6 +85,7 @@ impl SafeKeeperState { server: ServerInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ system_id: 0, /* Postgres system identifier */ + tenant_id: ZTenantId::from([0u8; 16]), ztli: ZTimelineId::from([0u8; 16]), tli: 0, wal_seg_size: 0, @@ -161,7 +163,7 @@ pub struct AppendRequestHeader { end_lsn: Lsn, /// LSN committed by quorum of safekeepers commit_lsn: Lsn, - /// restart LSN position (minimal LSN which may be needed by proposer to perform recovery) + /// restart LSN position (minimal LSN which may be needed by proposer to perform recovery) restart_lsn: Lsn, // only for logging/debugging proposer_uuid: PgUuid, @@ -178,6 +180,9 @@ pub struct AppendResponse { // make much sense without taking epoch into account, as history can be // diverged. pub flush_lsn: Lsn, + // We report back our awareness about which WAL is committed, as this is + // a criterion for walproposer --sync mode exit + pub commit_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, } @@ -277,7 +282,9 @@ pub struct SafeKeeper { /// reading wal. pub flush_lsn: Lsn, pub tli: u32, - pub flushed_truncate_lsn: Lsn, + /// not-yet-flushed pairs of same named fields in s.* + pub commit_lsn: Lsn, + pub truncate_lsn: Lsn, pub storage: ST, pub s: SafeKeeperState, // persistent part pub elected_proposer_term: Term, // for monitoring/debugging @@ -292,7 +299,8 @@ where SafeKeeper { flush_lsn, tli, - flushed_truncate_lsn: Lsn(0), + commit_lsn: state.commit_lsn, + truncate_lsn: state.truncate_lsn, storage, s: state, elected_proposer_term: 0, @@ -323,13 +331,6 @@ where SK_PROTOCOL_VERSION ); } - if self.s.server.system_id != 0 && self.s.server.system_id != msg.system_id { - bail!( - "system identifier changed: got {}, expected {}", - msg.system_id, - self.s.server.system_id, - ); - } /* Postgres upgrade is not treated as fatal error */ if msg.pg_version != self.s.server.pg_version && self.s.server.pg_version != UNKNOWN_SERVER_VERSION @@ -342,6 +343,7 @@ where // set basic info about server, if not yet self.s.server.system_id = msg.system_id; + self.s.server.tenant_id = msg.tenant_id; self.s.server.ztli = msg.ztli; self.s.server.tli = msg.tli; self.s.server.wal_seg_size = msg.wal_seg_size; @@ -386,8 +388,8 @@ where // log first AppendRequest from this proposer if self.elected_proposer_term < msg.h.term { info!( - "start receiving WAL from timeline {} term {}", - self.s.server.ztli, msg.h.term, + "start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}", + self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn, ); self.elected_proposer_term = msg.h.term; } @@ -402,6 +404,7 @@ where let resp = AppendResponse { term: self.s.acceptor_state.term, epoch: self.s.acceptor_state.epoch, + commit_lsn: Lsn(0), flush_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), }; @@ -418,9 +421,13 @@ where * maximum (vcl) determined by WAL proposer during handshake. * Switching epoch means that node completes recovery and start writing in the WAL new data. * XXX: this is wrong, we must actively truncate not matching part of log. + * + * The non-strict inequality is important for us, as proposer in --sync mode doesn't + * generate new records, but to advance commit_lsn epoch switch must happen on majority. + * We can regard this as commit of empty entry in new epoch, this should be safe. */ if self.s.acceptor_state.epoch < msg.h.term - && msg.h.end_lsn > max(self.flush_lsn, msg.h.epoch_start_lsn) + && msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn) { info!("switched to new epoch {}", msg.h.term); self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ @@ -431,8 +438,20 @@ where } self.s.proposer_uuid = msg.h.proposer_uuid; - self.s.commit_lsn = msg.h.commit_lsn; - self.s.truncate_lsn = msg.h.restart_lsn; + // Advance commit_lsn taking into account what we have locally. + // xxx this is wrapped into epoch check because we overwrite wal + // instead of truncating it, so without it commit_lsn might include + // wrong part. Anyway, nobody is much interested in our commit_lsn while + // epoch switch hasn't happened, right? + if self.s.acceptor_state.epoch == msg.h.term { + let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); + // If new commit_lsn reached epoch switch, force sync of control file: + // walproposer in sync mode is very interested when this happens. + sync_control_file |= + commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; + self.commit_lsn = commit_lsn; + } + self.truncate_lsn = msg.h.restart_lsn; /* * Update restart LSN in control file. @@ -440,24 +459,26 @@ where * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= - self.flushed_truncate_lsn + (self.s.server.wal_seg_size as u64) < self.s.truncate_lsn; - self.storage.persist(&self.s, sync_control_file)?; + self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn; if sync_control_file { - self.flushed_truncate_lsn = self.s.truncate_lsn; + self.s.commit_lsn = self.commit_lsn; + self.s.truncate_lsn = self.truncate_lsn; } + self.storage.persist(&self.s, sync_control_file)?; let resp = AppendResponse { term: self.s.acceptor_state.term, epoch: self.s.acceptor_state.epoch, flush_lsn: self.flush_lsn, + commit_lsn: self.s.commit_lsn, // will be filled by caller code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), }; - trace!( - "processed AppendRequest of len {}, flush_lsn={:X}/{:>08X}, resp {:?}", + debug!( + "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, resp {:?}", msg.wal_data.len(), - (self.flush_lsn.0 >> 32) as u32, - self.flush_lsn.0 as u32, + msg.h.end_lsn, + msg.h.commit_lsn, &resp, ); Ok(AcceptorProposerMessage::AppendResponse(resp)) @@ -523,7 +544,7 @@ mod tests { let mut ar_hdr = AppendRequestHeader { term: 1, - epoch_start_lsn: Lsn(2), + epoch_start_lsn: Lsn(3), begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0), @@ -535,12 +556,12 @@ mod tests { wal_data: Bytes::from_static(b"b"), }; - // check that AppendRequest before VCL doesn't switch epoch + // check that AppendRequest before epochStartLsn doesn't switch epoch let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert!(sk.storage.persisted_state.acceptor_state.epoch == 0); + assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0); - // but record after VCL does the switch + // but record at epochStartLsn does the switch ar_hdr.begin_lsn = Lsn(2); ar_hdr.end_lsn = Lsn(3); append_request = AppendRequest { @@ -549,6 +570,6 @@ mod tests { }; let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert!(sk.storage.persisted_state.acceptor_state.epoch == 1); + assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1); } } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index a35387f6e9..e177e72ba5 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -217,7 +217,7 @@ impl Timeline { rmsg = shared_state.sk.process_msg(msg)?; // locally available commit lsn. flush_lsn can be smaller than // commit_lsn if we are catching up safekeeper. - commit_lsn = min(shared_state.sk.flush_lsn, shared_state.sk.s.commit_lsn); + commit_lsn = shared_state.sk.commit_lsn; // if this is AppendResponse, fill in proper hot standby feedback if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {