diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 227f45fbf4..debcda3789 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -164,7 +164,7 @@ fn walreceiver_main( // There might be some padding after the last full record, skip it. startpoint += startpoint.calc_padding(8u32); - debug!( + info!( "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", last_rec_lsn, startpoint, timelineid, end_of_wal ); diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 752226ee8b..589f4298fa 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -229,7 +229,7 @@ impl ReplicationConn { start_pos += send_size as u64; - debug!("Sent WAL to page server up to {}", end_pos); + debug!("sent WAL up to {}", end_pos); // Decide whether to reuse this file. If we don't set wal_file here // a new file will be opened next time. diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 348b782758..c6f636d3fe 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -10,6 +10,7 @@ use log::*; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; use std::cmp::max; +use std::cmp::min; use std::io; use std::io::Read; @@ -47,6 +48,7 @@ pub struct ServerInfo { /// Postgres server version pub pg_version: u32, pub system_id: SystemId, + pub tenant_id: ZTenantId, /// Zenith timelineid pub ztli: ZTimelineId, pub tli: TimeLineID, @@ -65,10 +67,9 @@ pub struct SafeKeeperState { /// information about server pub server: ServerInfo, /// Unique id of the last *elected* proposer we dealed with. Not needed - /// correctness, exists for monitoring purposes. + /// for correctness, exists for monitoring purposes. pub proposer_uuid: PgUuid, - /// part of WAL acknowledged by quorum (note that we might not have wal to - /// up this point locally) + /// part of WAL acknowledged by quorum and available locally pub commit_lsn: Lsn, /// minimal LSN which may be needed for recovery of some safekeeper (end lsn /// + 1 of last record streamed to everyone) @@ -84,6 +85,7 @@ impl SafeKeeperState { server: ServerInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ system_id: 0, /* Postgres system identifier */ + tenant_id: ZTenantId::from([0u8; 16]), ztli: ZTimelineId::from([0u8; 16]), tli: 0, wal_seg_size: 0, @@ -161,7 +163,7 @@ pub struct AppendRequestHeader { end_lsn: Lsn, /// LSN committed by quorum of safekeepers commit_lsn: Lsn, - /// restart LSN position (minimal LSN which may be needed by proposer to perform recovery) + /// restart LSN position (minimal LSN which may be needed by proposer to perform recovery) restart_lsn: Lsn, // only for logging/debugging proposer_uuid: PgUuid, @@ -178,6 +180,9 @@ pub struct AppendResponse { // make much sense without taking epoch into account, as history can be // diverged. pub flush_lsn: Lsn, + // We report back our awareness about which WAL is committed, as this is + // a criterion for walproposer --sync mode exit + pub commit_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, } @@ -277,7 +282,9 @@ pub struct SafeKeeper { /// reading wal. pub flush_lsn: Lsn, pub tli: u32, - pub flushed_truncate_lsn: Lsn, + /// not-yet-flushed pairs of same named fields in s.* + pub commit_lsn: Lsn, + pub truncate_lsn: Lsn, pub storage: ST, pub s: SafeKeeperState, // persistent part pub elected_proposer_term: Term, // for monitoring/debugging @@ -292,7 +299,8 @@ where SafeKeeper { flush_lsn, tli, - flushed_truncate_lsn: Lsn(0), + commit_lsn: state.commit_lsn, + truncate_lsn: state.truncate_lsn, storage, s: state, elected_proposer_term: 0, @@ -323,13 +331,6 @@ where SK_PROTOCOL_VERSION ); } - if self.s.server.system_id != 0 && self.s.server.system_id != msg.system_id { - bail!( - "system identifier changed: got {}, expected {}", - msg.system_id, - self.s.server.system_id, - ); - } /* Postgres upgrade is not treated as fatal error */ if msg.pg_version != self.s.server.pg_version && self.s.server.pg_version != UNKNOWN_SERVER_VERSION @@ -342,6 +343,7 @@ where // set basic info about server, if not yet self.s.server.system_id = msg.system_id; + self.s.server.tenant_id = msg.tenant_id; self.s.server.ztli = msg.ztli; self.s.server.tli = msg.tli; self.s.server.wal_seg_size = msg.wal_seg_size; @@ -386,8 +388,8 @@ where // log first AppendRequest from this proposer if self.elected_proposer_term < msg.h.term { info!( - "start receiving WAL from timeline {} term {}", - self.s.server.ztli, msg.h.term, + "start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}", + self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn, ); self.elected_proposer_term = msg.h.term; } @@ -402,6 +404,7 @@ where let resp = AppendResponse { term: self.s.acceptor_state.term, epoch: self.s.acceptor_state.epoch, + commit_lsn: Lsn(0), flush_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), }; @@ -418,9 +421,13 @@ where * maximum (vcl) determined by WAL proposer during handshake. * Switching epoch means that node completes recovery and start writing in the WAL new data. * XXX: this is wrong, we must actively truncate not matching part of log. + * + * The non-strict inequality is important for us, as proposer in --sync mode doesn't + * generate new records, but to advance commit_lsn epoch switch must happen on majority. + * We can regard this as commit of empty entry in new epoch, this should be safe. */ if self.s.acceptor_state.epoch < msg.h.term - && msg.h.end_lsn > max(self.flush_lsn, msg.h.epoch_start_lsn) + && msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn) { info!("switched to new epoch {}", msg.h.term); self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ @@ -431,8 +438,20 @@ where } self.s.proposer_uuid = msg.h.proposer_uuid; - self.s.commit_lsn = msg.h.commit_lsn; - self.s.truncate_lsn = msg.h.restart_lsn; + // Advance commit_lsn taking into account what we have locally. + // xxx this is wrapped into epoch check because we overwrite wal + // instead of truncating it, so without it commit_lsn might include + // wrong part. Anyway, nobody is much interested in our commit_lsn while + // epoch switch hasn't happened, right? + if self.s.acceptor_state.epoch == msg.h.term { + let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); + // If new commit_lsn reached epoch switch, force sync of control file: + // walproposer in sync mode is very interested when this happens. + sync_control_file |= + commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; + self.commit_lsn = commit_lsn; + } + self.truncate_lsn = msg.h.restart_lsn; /* * Update restart LSN in control file. @@ -440,24 +459,26 @@ where * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= - self.flushed_truncate_lsn + (self.s.server.wal_seg_size as u64) < self.s.truncate_lsn; - self.storage.persist(&self.s, sync_control_file)?; + self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn; if sync_control_file { - self.flushed_truncate_lsn = self.s.truncate_lsn; + self.s.commit_lsn = self.commit_lsn; + self.s.truncate_lsn = self.truncate_lsn; } + self.storage.persist(&self.s, sync_control_file)?; let resp = AppendResponse { term: self.s.acceptor_state.term, epoch: self.s.acceptor_state.epoch, flush_lsn: self.flush_lsn, + commit_lsn: self.s.commit_lsn, // will be filled by caller code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), }; - trace!( - "processed AppendRequest of len {}, flush_lsn={:X}/{:>08X}, resp {:?}", + debug!( + "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, resp {:?}", msg.wal_data.len(), - (self.flush_lsn.0 >> 32) as u32, - self.flush_lsn.0 as u32, + msg.h.end_lsn, + msg.h.commit_lsn, &resp, ); Ok(AcceptorProposerMessage::AppendResponse(resp)) @@ -523,7 +544,7 @@ mod tests { let mut ar_hdr = AppendRequestHeader { term: 1, - epoch_start_lsn: Lsn(2), + epoch_start_lsn: Lsn(3), begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0), @@ -535,12 +556,12 @@ mod tests { wal_data: Bytes::from_static(b"b"), }; - // check that AppendRequest before VCL doesn't switch epoch + // check that AppendRequest before epochStartLsn doesn't switch epoch let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert!(sk.storage.persisted_state.acceptor_state.epoch == 0); + assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0); - // but record after VCL does the switch + // but record at epochStartLsn does the switch ar_hdr.begin_lsn = Lsn(2); ar_hdr.end_lsn = Lsn(3); append_request = AppendRequest { @@ -549,6 +570,6 @@ mod tests { }; let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert!(sk.storage.persisted_state.acceptor_state.epoch == 1); + assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1); } } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index a35387f6e9..e177e72ba5 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -217,7 +217,7 @@ impl Timeline { rmsg = shared_state.sk.process_msg(msg)?; // locally available commit lsn. flush_lsn can be smaller than // commit_lsn if we are catching up safekeeper. - commit_lsn = min(shared_state.sk.flush_lsn, shared_state.sk.s.commit_lsn); + commit_lsn = shared_state.sk.commit_lsn; // if this is AppendResponse, fill in proper hot standby feedback if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {