From 8b86cd115408942767175c8b9544babd18600209 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 27 Feb 2025 09:13:30 +0300 Subject: [PATCH] safekeeper: follow membership configuration rules (#10781) ## Problem safekeepers must ignore walproposer messages with non matching membership conf. ## Summary of changes Make safekeepers reject vote request, proposer elected and append request messages with non matching generation. Switch to the configuration in the greeting message if it is higher. In passing, fix one comment and WAL truncation. Last part of https://github.com/neondatabase/neon/issues/9965 --- safekeeper/src/safekeeper.rs | 77 ++++++++++++++++++++++++++++++----- safekeeper/src/state.rs | 2 +- safekeeper/src/wal_storage.rs | 1 + 3 files changed, 69 insertions(+), 11 deletions(-) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 0edac04b97..886cac869d 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -279,7 +279,7 @@ pub struct VoteResponse { * Proposer -> Acceptor message announcing proposer is elected and communicating * term history to it. */ -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ProposerElected { pub generation: Generation, // membership conf generation pub term: Term, @@ -1013,6 +1013,9 @@ where self.state.finish_change(&state).await?; } + // Switch into conf given by proposer conf if it is higher. + self.state.membership_switch(msg.mconf.clone()).await?; + let apg = AcceptorGreeting { node_id: self.node_id, mconf: self.state.mconf.clone(), @@ -1030,16 +1033,18 @@ where &mut self, msg: &VoteRequest, ) -> Result> { + if self.state.mconf.generation != msg.generation { + bail!( + "refusing {:?} due to generation mismatch: sk generation {}", + msg, + self.state.mconf.generation + ); + } // Once voted, we won't accept data from older proposers; flush // everything we've already received so that new proposer starts - // streaming at end of our WAL, without overlap. Currently we truncate - // WAL at streaming point, so this avoids truncating already committed - // WAL. - // - // TODO: it would be smoother to not truncate committed piece at - // handle_elected instead. Currently not a big deal, as proposer is the - // only source of WAL; with peer2peer recovery it would be more - // important. + // streaming at end of our WAL, without overlap. WAL is truncated at + // streaming point and commit_lsn may be advanced from peers, so this + // also avoids possible spurious attempt to truncate committed WAL. self.wal_store.flush_wal().await?; // initialize with refusal let mut resp = VoteResponse { @@ -1093,6 +1098,13 @@ where self.get_last_log_term(), self.flush_lsn() ); + if self.state.mconf.generation != msg.generation { + bail!( + "refusing {:?} due to generation mismatch: sk generation {}", + msg, + self.state.mconf.generation + ); + } if self.state.acceptor_state.term < msg.term { let mut state = self.state.start_change(); state.acceptor_state.term = msg.term; @@ -1263,11 +1275,24 @@ where msg: &AppendRequest, require_flush: bool, ) -> Result> { + // Refuse message on generation mismatch. On reconnect wp will get full + // configuration from greeting. + if self.state.mconf.generation != msg.h.generation { + bail!( + "refusing append request due to generation mismatch: request {}, sk {}", + msg.h.generation, + self.state.mconf.generation + ); + } + if self.state.acceptor_state.term < msg.h.term { bail!("got AppendRequest before ProposerElected"); } - // If our term is higher, immediately refuse the message. + // If our term is higher, immediately refuse the message. Send term only + // response; elected walproposer can never advance the term, so it will + // figure out the refusal from it -- which is important as term change + // should cause not just reconnection but whole walproposer re-election. if self.state.acceptor_state.term > msg.h.term { let resp = AppendResponse::term_only( self.state.mconf.generation, @@ -1468,6 +1493,13 @@ mod tests { let wal_store = DummyWalStore { lsn: Lsn(0) }; let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); + // Vote with generation mismatch should be rejected. + let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { + generation: SafekeeperGeneration::new(42), + term: 1, + }); + assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err()); + // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { generation: Generation::new(0), @@ -1532,6 +1564,16 @@ mod tests { }, ]), }; + + // check that elected msg with generation mismatch is rejected + let mut pem_gen_mismatch = pem.clone(); + pem_gen_mismatch.generation = SafekeeperGeneration::new(42); + assert!( + sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch)) + .await + .is_err() + ); + sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) .await .unwrap(); @@ -1590,6 +1632,21 @@ mod tests { wal_data: Bytes::from_static(b"b"), }; + // check that append request with generation mismatch is rejected + let mut ar_hdr_gen_mismatch = ar_hdr.clone(); + ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42); + let append_request_gen_mismatch = AppendRequest { + h: ar_hdr_gen_mismatch, + wal_data: Bytes::from_static(b"b"), + }; + assert!( + sk.process_msg(&ProposerAcceptorMessage::AppendRequest( + append_request_gen_mismatch + )) + .await + .is_err() + ); + // do write ending at 2, it should be ok sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) .await diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index e437e6d2cd..7533005c35 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -268,7 +268,7 @@ where // Is switch allowed? if to.generation <= self.mconf.generation { info!( - "ignoring request to switch membership conf to lower {}, current conf {}", + "ignoring request to switch membership conf to {}, current conf {}", to, self.mconf ); } else { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index ed197a3f83..f0bac4b40a 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -574,6 +574,7 @@ impl Storage for PhysicalStorage { } self.pending_wal_truncation = false; + info!("truncated WAL to {}", end_pos); Ok(()) }