diff --git a/walkeeper/src/control_file.rs b/walkeeper/src/control_file.rs index 3570889fb1..7cc53edeb0 100644 --- a/walkeeper/src/control_file.rs +++ b/walkeeper/src/control_file.rs @@ -6,7 +6,7 @@ use lazy_static::lazy_static; use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; -use std::ops::{Deref, DerefMut}; +use std::ops::Deref; use std::path::{Path, PathBuf}; use tracing::*; @@ -38,60 +38,11 @@ lazy_static! { .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec"); } -pub trait StatePersister { - /// Persist safekeeper state on disk. - fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; -} - /// Storage should keep actual state inside of it. It should implement Deref /// trait to access state fields and have persist method for updating that state. -pub trait Storage: Deref + StatePersister { - /// Returns a guard which implements DeferMut trait and have persist method. - fn update_guard(&mut self) -> StateGuard - where - Self: Sized, - { - StateGuard::new(self.clone(), self) - } -} - -/// A guard that allows safekeeper state to be updated atomically. -pub struct StateGuard<'a, P: StatePersister> { - persister: &'a mut P, - state: SafeKeeperState, -} - -impl<'a, P> StateGuard<'a, P> -where - P: StatePersister, -{ - pub fn new(state: SafeKeeperState, persister: &'a mut P) -> Self { - StateGuard { persister, state } - } - - pub fn persist(&mut self) -> Result<()> { - self.persister.persist(&self.state) - } -} - -impl<'a, P> Deref for StateGuard<'a, P> -where - P: StatePersister, -{ - type Target = SafeKeeperState; - - fn deref(&self) -> &Self::Target { - &self.state - } -} - -impl<'a, P> DerefMut for StateGuard<'a, P> -where - P: StatePersister, -{ - fn deref_mut(&mut self) -> &mut SafeKeeperState { - &mut self.state - } +pub trait Storage: Deref { + /// Persist safekeeper state on disk and update internal state. + fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; } #[derive(Debug)] @@ -111,8 +62,7 @@ impl FileStorage { let tenant_id = zttid.tenant_id.to_string(); let timeline_id = zttid.timeline_id.to_string(); - let state = Self::load_control_file_conf(conf, zttid) - .context("failed to load from control file")?; + let state = Self::load_control_file_conf(conf, zttid)?; Ok(FileStorage { timeline_dir, @@ -222,8 +172,6 @@ impl FileStorage { } } -impl Storage for FileStorage {} - impl Deref for FileStorage { type Target = SafeKeeperState; @@ -232,7 +180,7 @@ impl Deref for FileStorage { } } -impl StatePersister for FileStorage { +impl Storage for FileStorage { // persists state durably to underlying storage // for description see https://lwn.net/Articles/457667/ fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index ed8eab16cf..a4747f2086 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -610,10 +610,10 @@ where // set basic info about server, if not yet // TODO: verify that is doesn't change after { - let mut state = self.state.update_guard(); + let mut state = self.state.clone(); state.server.system_id = msg.system_id; state.server.wal_seg_size = msg.wal_seg_size; - state.persist()?; + self.state.persist(&state)?; } // pass wal_seg_size to read WAL and find flush_lsn @@ -642,10 +642,10 @@ where term_history: self.get_term_history(), }; if self.state.acceptor_state.term < msg.term { - let mut state = self.state.update_guard(); + let mut state = self.state.clone(); state.acceptor_state.term = msg.term; // persist vote before sending it out - state.persist()?; + self.state.persist(&state)?; resp.term = self.state.acceptor_state.term; resp.vote_given = true as u64; @@ -657,9 +657,9 @@ where /// Bump our term if received a note from elected proposer with higher one fn bump_if_higher(&mut self, term: Term) -> Result<()> { if self.state.acceptor_state.term < term { - let mut state = self.state.update_guard(); + let mut state = self.state.clone(); state.acceptor_state.term = term; - state.persist()?; + self.state.persist(&state)?; } Ok(()) } @@ -694,9 +694,9 @@ where // and now adopt term history from proposer { - let mut state = self.state.update_guard(); + let mut state = self.state.clone(); state.acceptor_state.term_history = msg.term_history.clone(); - state.persist()?; + self.state.persist(&state)?; } info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -735,12 +735,12 @@ where /// Persist in-memory state to the disk. fn persist_control_file(&mut self) -> Result<()> { - let mut state = self.state.update_guard(); + let mut state = self.state.clone(); state.commit_lsn = self.inmem.commit_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; state.proposer_uuid = self.inmem.proposer_uuid; - state.persist() + self.state.persist(&state) } /// Handle request to append WAL. @@ -837,16 +837,14 @@ mod tests { use std::ops::Deref; use super::*; - use crate::{control_file::StatePersister, wal_storage::Storage}; + use crate::wal_storage::Storage; // fake storage for tests struct InMemoryState { persisted_state: SafeKeeperState, } - impl control_file::Storage for InMemoryState {} - - impl StatePersister for InMemoryState { + impl control_file::Storage for InMemoryState { fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { self.persisted_state = s.clone(); Ok(())