Remove StateGuard

This commit is contained in:
Arthur Petukhovsky
2022-04-11 10:41:30 +00:00
parent b1bfc23318
commit f9c00e53de
2 changed files with 18 additions and 72 deletions

View File

@@ -6,7 +6,7 @@ use lazy_static::lazy_static;
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use tracing::*;
@@ -38,60 +38,11 @@ lazy_static! {
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec");
}
pub trait StatePersister {
/// Persist safekeeper state on disk.
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
}
/// Storage should keep actual state inside of it. It should implement Deref
/// trait to access state fields and have persist method for updating that state.
pub trait Storage: Deref<Target = SafeKeeperState> + StatePersister {
/// Returns a guard which implements DeferMut trait and have persist method.
fn update_guard(&mut self) -> StateGuard<Self>
where
Self: Sized,
{
StateGuard::new(self.clone(), self)
}
}
/// A guard that allows safekeeper state to be updated atomically.
pub struct StateGuard<'a, P: StatePersister> {
persister: &'a mut P,
state: SafeKeeperState,
}
impl<'a, P> StateGuard<'a, P>
where
P: StatePersister,
{
pub fn new(state: SafeKeeperState, persister: &'a mut P) -> Self {
StateGuard { persister, state }
}
pub fn persist(&mut self) -> Result<()> {
self.persister.persist(&self.state)
}
}
impl<'a, P> Deref for StateGuard<'a, P>
where
P: StatePersister,
{
type Target = SafeKeeperState;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl<'a, P> DerefMut for StateGuard<'a, P>
where
P: StatePersister,
{
fn deref_mut(&mut self) -> &mut SafeKeeperState {
&mut self.state
}
pub trait Storage: Deref<Target = SafeKeeperState> {
/// Persist safekeeper state on disk and update internal state.
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
}
#[derive(Debug)]
@@ -111,8 +62,7 @@ impl FileStorage {
let tenant_id = zttid.tenant_id.to_string();
let timeline_id = zttid.timeline_id.to_string();
let state = Self::load_control_file_conf(conf, zttid)
.context("failed to load from control file")?;
let state = Self::load_control_file_conf(conf, zttid)?;
Ok(FileStorage {
timeline_dir,
@@ -222,8 +172,6 @@ impl FileStorage {
}
}
impl Storage for FileStorage {}
impl Deref for FileStorage {
type Target = SafeKeeperState;
@@ -232,7 +180,7 @@ impl Deref for FileStorage {
}
}
impl StatePersister for FileStorage {
impl Storage for FileStorage {
// persists state durably to underlying storage
// for description see https://lwn.net/Articles/457667/
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {

View File

@@ -610,10 +610,10 @@ where
// set basic info about server, if not yet
// TODO: verify that is doesn't change after
{
let mut state = self.state.update_guard();
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
state.persist()?;
self.state.persist(&state)?;
}
// pass wal_seg_size to read WAL and find flush_lsn
@@ -642,10 +642,10 @@ where
term_history: self.get_term_history(),
};
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.update_guard();
let mut state = self.state.clone();
state.acceptor_state.term = msg.term;
// persist vote before sending it out
state.persist()?;
self.state.persist(&state)?;
resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64;
@@ -657,9 +657,9 @@ where
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.state.acceptor_state.term < term {
let mut state = self.state.update_guard();
let mut state = self.state.clone();
state.acceptor_state.term = term;
state.persist()?;
self.state.persist(&state)?;
}
Ok(())
}
@@ -694,9 +694,9 @@ where
// and now adopt term history from proposer
{
let mut state = self.state.update_guard();
let mut state = self.state.clone();
state.acceptor_state.term_history = msg.term_history.clone();
state.persist()?;
self.state.persist(&state)?;
}
info!("start receiving WAL since {:?}", msg.start_streaming_at);
@@ -735,12 +735,12 @@ where
/// Persist in-memory state to the disk.
fn persist_control_file(&mut self) -> Result<()> {
let mut state = self.state.update_guard();
let mut state = self.state.clone();
state.commit_lsn = self.inmem.commit_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.proposer_uuid = self.inmem.proposer_uuid;
state.persist()
self.state.persist(&state)
}
/// Handle request to append WAL.
@@ -837,16 +837,14 @@ mod tests {
use std::ops::Deref;
use super::*;
use crate::{control_file::StatePersister, wal_storage::Storage};
use crate::wal_storage::Storage;
// fake storage for tests
struct InMemoryState {
persisted_state: SafeKeeperState,
}
impl control_file::Storage for InMemoryState {}
impl StatePersister for InMemoryState {
impl control_file::Storage for InMemoryState {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
self.persisted_state = s.clone();
Ok(())