diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 0edac04b97..886cac869d 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -279,7 +279,7 @@ pub struct VoteResponse { * Proposer -> Acceptor message announcing proposer is elected and communicating * term history to it. */ -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ProposerElected { pub generation: Generation, // membership conf generation pub term: Term, @@ -1013,6 +1013,9 @@ where self.state.finish_change(&state).await?; } + // Switch into conf given by proposer conf if it is higher. + self.state.membership_switch(msg.mconf.clone()).await?; + let apg = AcceptorGreeting { node_id: self.node_id, mconf: self.state.mconf.clone(), @@ -1030,16 +1033,18 @@ where &mut self, msg: &VoteRequest, ) -> Result> { + if self.state.mconf.generation != msg.generation { + bail!( + "refusing {:?} due to generation mismatch: sk generation {}", + msg, + self.state.mconf.generation + ); + } // Once voted, we won't accept data from older proposers; flush // everything we've already received so that new proposer starts - // streaming at end of our WAL, without overlap. Currently we truncate - // WAL at streaming point, so this avoids truncating already committed - // WAL. - // - // TODO: it would be smoother to not truncate committed piece at - // handle_elected instead. Currently not a big deal, as proposer is the - // only source of WAL; with peer2peer recovery it would be more - // important. + // streaming at end of our WAL, without overlap. WAL is truncated at + // streaming point and commit_lsn may be advanced from peers, so this + // also avoids possible spurious attempt to truncate committed WAL. self.wal_store.flush_wal().await?; // initialize with refusal let mut resp = VoteResponse { @@ -1093,6 +1098,13 @@ where self.get_last_log_term(), self.flush_lsn() ); + if self.state.mconf.generation != msg.generation { + bail!( + "refusing {:?} due to generation mismatch: sk generation {}", + msg, + self.state.mconf.generation + ); + } if self.state.acceptor_state.term < msg.term { let mut state = self.state.start_change(); state.acceptor_state.term = msg.term; @@ -1263,11 +1275,24 @@ where msg: &AppendRequest, require_flush: bool, ) -> Result> { + // Refuse message on generation mismatch. On reconnect wp will get full + // configuration from greeting. + if self.state.mconf.generation != msg.h.generation { + bail!( + "refusing append request due to generation mismatch: request {}, sk {}", + msg.h.generation, + self.state.mconf.generation + ); + } + if self.state.acceptor_state.term < msg.h.term { bail!("got AppendRequest before ProposerElected"); } - // If our term is higher, immediately refuse the message. + // If our term is higher, immediately refuse the message. Send term only + // response; elected walproposer can never advance the term, so it will + // figure out the refusal from it -- which is important as term change + // should cause not just reconnection but whole walproposer re-election. if self.state.acceptor_state.term > msg.h.term { let resp = AppendResponse::term_only( self.state.mconf.generation, @@ -1468,6 +1493,13 @@ mod tests { let wal_store = DummyWalStore { lsn: Lsn(0) }; let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); + // Vote with generation mismatch should be rejected. + let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { + generation: SafekeeperGeneration::new(42), + term: 1, + }); + assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err()); + // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { generation: Generation::new(0), @@ -1532,6 +1564,16 @@ mod tests { }, ]), }; + + // check that elected msg with generation mismatch is rejected + let mut pem_gen_mismatch = pem.clone(); + pem_gen_mismatch.generation = SafekeeperGeneration::new(42); + assert!( + sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch)) + .await + .is_err() + ); + sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) .await .unwrap(); @@ -1590,6 +1632,21 @@ mod tests { wal_data: Bytes::from_static(b"b"), }; + // check that append request with generation mismatch is rejected + let mut ar_hdr_gen_mismatch = ar_hdr.clone(); + ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42); + let append_request_gen_mismatch = AppendRequest { + h: ar_hdr_gen_mismatch, + wal_data: Bytes::from_static(b"b"), + }; + assert!( + sk.process_msg(&ProposerAcceptorMessage::AppendRequest( + append_request_gen_mismatch + )) + .await + .is_err() + ); + // do write ending at 2, it should be ok sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) .await diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index e437e6d2cd..7533005c35 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -268,7 +268,7 @@ where // Is switch allowed? if to.generation <= self.mconf.generation { info!( - "ignoring request to switch membership conf to lower {}, current conf {}", + "ignoring request to switch membership conf to {}, current conf {}", to, self.mconf ); } else { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index ed197a3f83..f0bac4b40a 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -574,6 +574,7 @@ impl Storage for PhysicalStorage { } self.pending_wal_truncation = false; + info!("truncated WAL to {}", end_pos); Ok(()) }