diff --git a/walkeeper/src/control_file.rs b/walkeeper/src/control_file.rs index 8b4e618661..9dc6526bcf 100644 --- a/walkeeper/src/control_file.rs +++ b/walkeeper/src/control_file.rs @@ -38,8 +38,8 @@ lazy_static! { } pub trait Storage { - /// Persist safekeeper state on disk. - fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; + fn persisted_state(&self) -> &SafeKeeperState; + fn update_state(&mut self, f: F) -> Result<()>; } #[derive(Debug)] @@ -48,19 +48,27 @@ pub struct FileStorage { timeline_dir: PathBuf, conf: SafeKeeperConf, persist_control_file_seconds: Histogram, + + /// Last state persisted to disk. + state: SafeKeeperState, } impl FileStorage { - pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage { + pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result { let timeline_dir = conf.timeline_dir(zttid); let tenant_id = zttid.tenant_id.to_string(); let timeline_id = zttid.timeline_id.to_string(); - FileStorage { + + let state = Self::load_control_file_conf(conf, zttid) + .context("failed to load from control file")?; + + Ok(FileStorage { timeline_dir, conf: conf.clone(), persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS .with_label_values(&[&tenant_id, &timeline_id]), - } + state: state, + }) } // Check the magic/version in the on-disk data and deserialize it, if possible. @@ -139,9 +147,7 @@ impl FileStorage { })?; Ok(state) } -} -impl Storage for FileStorage { // persists state durably to underlying storage // for description see https://lwn.net/Articles/457667/ fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { @@ -205,6 +211,20 @@ impl Storage for FileStorage { } } +impl Storage for FileStorage { + fn persisted_state(&self) -> &SafeKeeperState { + &self.state + } + + fn update_state(&mut self, f: F) -> Result<()> { + let mut new_state = self.state.clone(); + f(&mut new_state); + self.persist(&new_state)?; + self.state = new_state; + Ok(()) + } +} + #[cfg(test)] mod test { use super::FileStorage; diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 2b6c707def..a5a125d376 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -490,13 +490,10 @@ pub struct SafeKeeper { /// LSN since the proposer appends WAL; determines epoch switch point. epoch_start_lsn: Lsn, - // not-yet-flushed pairs of same named fields in s.* + // not-yet-flushed pairs of same named fields in control_store.persisted_state() pub commit_lsn: Lsn, pub peer_horizon_lsn: Lsn, - // persisted to disk - pub s: SafeKeeperState, - pub control_store: CTRL, pub wal_store: WAL, } @@ -511,8 +508,9 @@ where ztli: ZTimelineId, control_store: CTRL, wal_store: WAL, - state: SafeKeeperState, ) -> SafeKeeper { + let state = control_store.persisted_state(); + if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id { panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id); } @@ -523,7 +521,6 @@ where epoch_start_lsn: Lsn(0), commit_lsn: state.commit_lsn, peer_horizon_lsn: state.peer_horizon_lsn, - s: state, control_store, wal_store, } @@ -531,7 +528,8 @@ where /// Get history of term switches for the available WAL fn get_term_history(&self) -> TermHistory { - self.s + self.control_store + .persisted_state() .acceptor_state .term_history .up_to(self.wal_store.flush_lsn()) @@ -539,7 +537,10 @@ where #[cfg(test)] fn get_epoch(&self) -> Term { - self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn()) + self.control_store + .persisted_state() + .acceptor_state + .get_epoch(self.wal_store.flush_lsn()) } /// Process message from proposer and possibly form reply. Concurrent @@ -566,6 +567,8 @@ where &mut self, msg: &ProposerGreeting, ) -> Result> { + let state = self.control_store.persisted_state(); + /* Check protocol compatibility */ if msg.protocol_version != SK_PROTOCOL_VERSION { bail!( @@ -575,46 +578,47 @@ where ); } /* Postgres upgrade is not treated as fatal error */ - if msg.pg_version != self.s.server.pg_version - && self.s.server.pg_version != UNKNOWN_SERVER_VERSION + if msg.pg_version != state.server.pg_version + && state.server.pg_version != UNKNOWN_SERVER_VERSION { info!( "incompatible server version {}, expected {}", - msg.pg_version, self.s.server.pg_version + msg.pg_version, state.server.pg_version ); } - if msg.tenant_id != self.s.tenant_id { + if msg.tenant_id != state.tenant_id { bail!( "invalid tenant ID, got {}, expected {}", msg.tenant_id, - self.s.tenant_id + state.tenant_id ); } - if msg.ztli != self.s.timeline_id { + if msg.ztli != state.timeline_id { bail!( "invalid timeline ID, got {}, expected {}", msg.ztli, - self.s.timeline_id + state.timeline_id ); } // set basic info about server, if not yet // TODO: verify that is doesn't change after - self.s.server.system_id = msg.system_id; - self.s.server.wal_seg_size = msg.wal_seg_size; - self.control_store - .persist(&self.s) - .context("failed to persist shared state")?; + self.control_store.update_state(|state| { + state.server.system_id = msg.system_id; + state.server.wal_seg_size = msg.wal_seg_size; + }).context("failed to persist shared state")?; + + let state = self.control_store.persisted_state(); // pass wal_seg_size to read WAL and find flush_lsn - self.wal_store.init_storage(&self.s)?; + self.wal_store.init_storage(state)?; info!( "processed greeting from proposer {:?}, sending term {:?}", - msg.proposer_id, self.s.acceptor_state.term + msg.proposer_id, state.acceptor_state.term ); Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { - term: self.s.acceptor_state.term, + term: state.acceptor_state.term, }))) } @@ -623,20 +627,24 @@ where &mut self, msg: &VoteRequest, ) -> Result> { + let state = self.control_store.persisted_state(); + // initialize with refusal let mut resp = VoteResponse { - term: self.s.acceptor_state.term, + term: state.acceptor_state.term, vote_given: false as u64, flush_lsn: self.wal_store.flush_lsn(), - truncate_lsn: self.s.peer_horizon_lsn, + truncate_lsn: state.peer_horizon_lsn, term_history: self.get_term_history(), }; - if self.s.acceptor_state.term < msg.term { - self.s.acceptor_state.term = msg.term; - // persist vote before sending it out - self.control_store.persist(&self.s)?; - resp.term = self.s.acceptor_state.term; + if state.acceptor_state.term < msg.term { + resp.term = msg.term; resp.vote_given = true as u64; + + // persist vote before sending it out + self.control_store.update_state(|state| { + state.acceptor_state.term = msg.term; + })?; } info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); Ok(Some(AcceptorProposerMessage::VoteResponse(resp))) @@ -644,19 +652,22 @@ where /// Bump our term if received a note from elected proposer with higher one fn bump_if_higher(&mut self, term: Term) -> Result<()> { - if self.s.acceptor_state.term < term { - self.s.acceptor_state.term = term; - self.control_store.persist(&self.s)?; + if self.control_store.persisted_state().acceptor_state.term < term { + self.control_store.update_state(|state| { + state.acceptor_state.term = term; + })?; } Ok(()) } /// Form AppendResponse from current state. fn append_response(&self) -> AppendResponse { + let state = self.control_store.persisted_state(); + let ar = AppendResponse { - term: self.s.acceptor_state.term, + term: state.acceptor_state.term, flush_lsn: self.wal_store.flush_lsn(), - commit_lsn: self.s.commit_lsn, + commit_lsn: state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), zenith_feedback: ZenithFeedback::empty(), @@ -669,7 +680,7 @@ where info!("received ProposerElected {:?}", msg); self.bump_if_higher(msg.term)?; // If our term is higher, ignore the message (next feedback will inform the compute) - if self.s.acceptor_state.term > msg.term { + if self.control_store.persisted_state().acceptor_state.term > msg.term { return Ok(None); } @@ -680,8 +691,9 @@ where self.wal_store.truncate_wal(msg.start_streaming_at)?; // and now adopt term history from proposer - self.s.acceptor_state.term_history = msg.term_history.clone(); - self.control_store.persist(&self.s)?; + self.control_store.update_state(|state| { + state.acceptor_state.term_history = msg.term_history.clone(); + })?; info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -691,7 +703,8 @@ where /// Advance commit_lsn taking into account what we have locally fn update_commit_lsn(&mut self) -> Result<()> { let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn()); - assert!(commit_lsn >= self.s.commit_lsn); + let persisted_commit_lsn = self.control_store.persisted_state().commit_lsn; + assert!(commit_lsn >= persisted_commit_lsn); self.commit_lsn = commit_lsn; self.metrics.commit_lsn.set(self.commit_lsn.0 as f64); @@ -703,7 +716,7 @@ where // Also note that commit_lsn can reach epoch_start_lsn earlier // that we receive new epoch_start_lsn, and we still need to sync // control file in this case. - if commit_lsn == self.epoch_start_lsn && self.s.commit_lsn != commit_lsn { + if commit_lsn == self.epoch_start_lsn && persisted_commit_lsn != commit_lsn { self.sync_control_file()?; }