safekeeper: follow membership configuration rules (#10781)

## Problem

safekeepers must ignore walproposer messages with non matching
membership conf.

## Summary of changes

Make safekeepers reject vote request, proposer elected and append
request messages with non matching generation. Switch to the
configuration in the greeting message if it is higher.

In passing, fix one comment and WAL truncation.

Last part of https://github.com/neondatabase/neon/issues/9965
This commit is contained in:
Arseny Sher
2025-02-27 09:13:30 +03:00
committed by GitHub
parent c50b38ab72
commit 8b86cd1154
3 changed files with 69 additions and 11 deletions

View File

@@ -279,7 +279,7 @@ pub struct VoteResponse {
* Proposer -> Acceptor message announcing proposer is elected and communicating
* term history to it.
*/
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ProposerElected {
pub generation: Generation, // membership conf generation
pub term: Term,
@@ -1013,6 +1013,9 @@ where
self.state.finish_change(&state).await?;
}
// Switch into conf given by proposer conf if it is higher.
self.state.membership_switch(msg.mconf.clone()).await?;
let apg = AcceptorGreeting {
node_id: self.node_id,
mconf: self.state.mconf.clone(),
@@ -1030,16 +1033,18 @@ where
&mut self,
msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> {
if self.state.mconf.generation != msg.generation {
bail!(
"refusing {:?} due to generation mismatch: sk generation {}",
msg,
self.state.mconf.generation
);
}
// Once voted, we won't accept data from older proposers; flush
// everything we've already received so that new proposer starts
// streaming at end of our WAL, without overlap. Currently we truncate
// WAL at streaming point, so this avoids truncating already committed
// WAL.
//
// TODO: it would be smoother to not truncate committed piece at
// handle_elected instead. Currently not a big deal, as proposer is the
// only source of WAL; with peer2peer recovery it would be more
// important.
// streaming at end of our WAL, without overlap. WAL is truncated at
// streaming point and commit_lsn may be advanced from peers, so this
// also avoids possible spurious attempt to truncate committed WAL.
self.wal_store.flush_wal().await?;
// initialize with refusal
let mut resp = VoteResponse {
@@ -1093,6 +1098,13 @@ where
self.get_last_log_term(),
self.flush_lsn()
);
if self.state.mconf.generation != msg.generation {
bail!(
"refusing {:?} due to generation mismatch: sk generation {}",
msg,
self.state.mconf.generation
);
}
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
state.acceptor_state.term = msg.term;
@@ -1263,11 +1275,24 @@ where
msg: &AppendRequest,
require_flush: bool,
) -> Result<Option<AcceptorProposerMessage>> {
// Refuse message on generation mismatch. On reconnect wp will get full
// configuration from greeting.
if self.state.mconf.generation != msg.h.generation {
bail!(
"refusing append request due to generation mismatch: request {}, sk {}",
msg.h.generation,
self.state.mconf.generation
);
}
if self.state.acceptor_state.term < msg.h.term {
bail!("got AppendRequest before ProposerElected");
}
// If our term is higher, immediately refuse the message.
// If our term is higher, immediately refuse the message. Send term only
// response; elected walproposer can never advance the term, so it will
// figure out the refusal from it -- which is important as term change
// should cause not just reconnection but whole walproposer re-election.
if self.state.acceptor_state.term > msg.h.term {
let resp = AppendResponse::term_only(
self.state.mconf.generation,
@@ -1468,6 +1493,13 @@ mod tests {
let wal_store = DummyWalStore { lsn: Lsn(0) };
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
// Vote with generation mismatch should be rejected.
let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: SafekeeperGeneration::new(42),
term: 1,
});
assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: Generation::new(0),
@@ -1532,6 +1564,16 @@ mod tests {
},
]),
};
// check that elected msg with generation mismatch is rejected
let mut pem_gen_mismatch = pem.clone();
pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
assert!(
sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
.await
.is_err()
);
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
@@ -1590,6 +1632,21 @@ mod tests {
wal_data: Bytes::from_static(b"b"),
};
// check that append request with generation mismatch is rejected
let mut ar_hdr_gen_mismatch = ar_hdr.clone();
ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
let append_request_gen_mismatch = AppendRequest {
h: ar_hdr_gen_mismatch,
wal_data: Bytes::from_static(b"b"),
};
assert!(
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
append_request_gen_mismatch
))
.await
.is_err()
);
// do write ending at 2, it should be ok
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await

View File

@@ -268,7 +268,7 @@ where
// Is switch allowed?
if to.generation <= self.mconf.generation {
info!(
"ignoring request to switch membership conf to lower {}, current conf {}",
"ignoring request to switch membership conf to {}, current conf {}",
to, self.mconf
);
} else {

View File

@@ -574,6 +574,7 @@ impl Storage for PhysicalStorage {
}
self.pending_wal_truncation = false;
info!("truncated WAL to {}", end_pos);
Ok(())
}