mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
WIP update state with closure
This commit is contained in:
@@ -38,8 +38,8 @@ lazy_static! {
|
||||
}
|
||||
|
||||
pub trait Storage {
|
||||
/// Persist safekeeper state on disk.
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
|
||||
fn persisted_state(&self) -> &SafeKeeperState;
|
||||
fn update_state<F: Fn(&mut SafeKeeperState)>(&mut self, f: F) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -48,19 +48,27 @@ pub struct FileStorage {
|
||||
timeline_dir: PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
persist_control_file_seconds: Histogram,
|
||||
|
||||
/// Last state persisted to disk.
|
||||
state: SafeKeeperState,
|
||||
}
|
||||
|
||||
impl FileStorage {
|
||||
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage {
|
||||
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
|
||||
let timeline_dir = conf.timeline_dir(zttid);
|
||||
let tenant_id = zttid.tenant_id.to_string();
|
||||
let timeline_id = zttid.timeline_id.to_string();
|
||||
FileStorage {
|
||||
|
||||
let state = Self::load_control_file_conf(conf, zttid)
|
||||
.context("failed to load from control file")?;
|
||||
|
||||
Ok(FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
}
|
||||
state: state,
|
||||
})
|
||||
}
|
||||
|
||||
// Check the magic/version in the on-disk data and deserialize it, if possible.
|
||||
@@ -139,9 +147,7 @@ impl FileStorage {
|
||||
})?;
|
||||
Ok(state)
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage for FileStorage {
|
||||
// persists state durably to underlying storage
|
||||
// for description see https://lwn.net/Articles/457667/
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
@@ -205,6 +211,20 @@ impl Storage for FileStorage {
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage for FileStorage {
|
||||
fn persisted_state(&self) -> &SafeKeeperState {
|
||||
&self.state
|
||||
}
|
||||
|
||||
fn update_state<F: Fn(&mut SafeKeeperState)>(&mut self, f: F) -> Result<()> {
|
||||
let mut new_state = self.state.clone();
|
||||
f(&mut new_state);
|
||||
self.persist(&new_state)?;
|
||||
self.state = new_state;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::FileStorage;
|
||||
|
||||
@@ -490,13 +490,10 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
||||
/// LSN since the proposer appends WAL; determines epoch switch point.
|
||||
epoch_start_lsn: Lsn,
|
||||
|
||||
// not-yet-flushed pairs of same named fields in s.*
|
||||
// not-yet-flushed pairs of same named fields in control_store.persisted_state()
|
||||
pub commit_lsn: Lsn,
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
|
||||
// persisted to disk
|
||||
pub s: SafeKeeperState,
|
||||
|
||||
pub control_store: CTRL,
|
||||
pub wal_store: WAL,
|
||||
}
|
||||
@@ -511,8 +508,9 @@ where
|
||||
ztli: ZTimelineId,
|
||||
control_store: CTRL,
|
||||
wal_store: WAL,
|
||||
state: SafeKeeperState,
|
||||
) -> SafeKeeper<CTRL, WAL> {
|
||||
let state = control_store.persisted_state();
|
||||
|
||||
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
|
||||
panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
|
||||
}
|
||||
@@ -523,7 +521,6 @@ where
|
||||
epoch_start_lsn: Lsn(0),
|
||||
commit_lsn: state.commit_lsn,
|
||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||
s: state,
|
||||
control_store,
|
||||
wal_store,
|
||||
}
|
||||
@@ -531,7 +528,8 @@ where
|
||||
|
||||
/// Get history of term switches for the available WAL
|
||||
fn get_term_history(&self) -> TermHistory {
|
||||
self.s
|
||||
self.control_store
|
||||
.persisted_state()
|
||||
.acceptor_state
|
||||
.term_history
|
||||
.up_to(self.wal_store.flush_lsn())
|
||||
@@ -539,7 +537,10 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
fn get_epoch(&self) -> Term {
|
||||
self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn())
|
||||
self.control_store
|
||||
.persisted_state()
|
||||
.acceptor_state
|
||||
.get_epoch(self.wal_store.flush_lsn())
|
||||
}
|
||||
|
||||
/// Process message from proposer and possibly form reply. Concurrent
|
||||
@@ -566,6 +567,8 @@ where
|
||||
&mut self,
|
||||
msg: &ProposerGreeting,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
let state = self.control_store.persisted_state();
|
||||
|
||||
/* Check protocol compatibility */
|
||||
if msg.protocol_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
@@ -575,46 +578,47 @@ where
|
||||
);
|
||||
}
|
||||
/* Postgres upgrade is not treated as fatal error */
|
||||
if msg.pg_version != self.s.server.pg_version
|
||||
&& self.s.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
if msg.pg_version != state.server.pg_version
|
||||
&& state.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
{
|
||||
info!(
|
||||
"incompatible server version {}, expected {}",
|
||||
msg.pg_version, self.s.server.pg_version
|
||||
msg.pg_version, state.server.pg_version
|
||||
);
|
||||
}
|
||||
if msg.tenant_id != self.s.tenant_id {
|
||||
if msg.tenant_id != state.tenant_id {
|
||||
bail!(
|
||||
"invalid tenant ID, got {}, expected {}",
|
||||
msg.tenant_id,
|
||||
self.s.tenant_id
|
||||
state.tenant_id
|
||||
);
|
||||
}
|
||||
if msg.ztli != self.s.timeline_id {
|
||||
if msg.ztli != state.timeline_id {
|
||||
bail!(
|
||||
"invalid timeline ID, got {}, expected {}",
|
||||
msg.ztli,
|
||||
self.s.timeline_id
|
||||
state.timeline_id
|
||||
);
|
||||
}
|
||||
|
||||
// set basic info about server, if not yet
|
||||
// TODO: verify that is doesn't change after
|
||||
self.s.server.system_id = msg.system_id;
|
||||
self.s.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.control_store
|
||||
.persist(&self.s)
|
||||
.context("failed to persist shared state")?;
|
||||
self.control_store.update_state(|state| {
|
||||
state.server.system_id = msg.system_id;
|
||||
state.server.wal_seg_size = msg.wal_seg_size;
|
||||
}).context("failed to persist shared state")?;
|
||||
|
||||
let state = self.control_store.persisted_state();
|
||||
|
||||
// pass wal_seg_size to read WAL and find flush_lsn
|
||||
self.wal_store.init_storage(&self.s)?;
|
||||
self.wal_store.init_storage(state)?;
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
msg.proposer_id, self.s.acceptor_state.term
|
||||
msg.proposer_id, state.acceptor_state.term
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
|
||||
term: self.s.acceptor_state.term,
|
||||
term: state.acceptor_state.term,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -623,20 +627,24 @@ where
|
||||
&mut self,
|
||||
msg: &VoteRequest,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
let state = self.control_store.persisted_state();
|
||||
|
||||
// initialize with refusal
|
||||
let mut resp = VoteResponse {
|
||||
term: self.s.acceptor_state.term,
|
||||
term: state.acceptor_state.term,
|
||||
vote_given: false as u64,
|
||||
flush_lsn: self.wal_store.flush_lsn(),
|
||||
truncate_lsn: self.s.peer_horizon_lsn,
|
||||
truncate_lsn: state.peer_horizon_lsn,
|
||||
term_history: self.get_term_history(),
|
||||
};
|
||||
if self.s.acceptor_state.term < msg.term {
|
||||
self.s.acceptor_state.term = msg.term;
|
||||
// persist vote before sending it out
|
||||
self.control_store.persist(&self.s)?;
|
||||
resp.term = self.s.acceptor_state.term;
|
||||
if state.acceptor_state.term < msg.term {
|
||||
resp.term = msg.term;
|
||||
resp.vote_given = true as u64;
|
||||
|
||||
// persist vote before sending it out
|
||||
self.control_store.update_state(|state| {
|
||||
state.acceptor_state.term = msg.term;
|
||||
})?;
|
||||
}
|
||||
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
|
||||
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
|
||||
@@ -644,19 +652,22 @@ where
|
||||
|
||||
/// Bump our term if received a note from elected proposer with higher one
|
||||
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
|
||||
if self.s.acceptor_state.term < term {
|
||||
self.s.acceptor_state.term = term;
|
||||
self.control_store.persist(&self.s)?;
|
||||
if self.control_store.persisted_state().acceptor_state.term < term {
|
||||
self.control_store.update_state(|state| {
|
||||
state.acceptor_state.term = term;
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Form AppendResponse from current state.
|
||||
fn append_response(&self) -> AppendResponse {
|
||||
let state = self.control_store.persisted_state();
|
||||
|
||||
let ar = AppendResponse {
|
||||
term: self.s.acceptor_state.term,
|
||||
term: state.acceptor_state.term,
|
||||
flush_lsn: self.wal_store.flush_lsn(),
|
||||
commit_lsn: self.s.commit_lsn,
|
||||
commit_lsn: state.commit_lsn,
|
||||
// will be filled by the upper code to avoid bothering safekeeper
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
zenith_feedback: ZenithFeedback::empty(),
|
||||
@@ -669,7 +680,7 @@ where
|
||||
info!("received ProposerElected {:?}", msg);
|
||||
self.bump_if_higher(msg.term)?;
|
||||
// If our term is higher, ignore the message (next feedback will inform the compute)
|
||||
if self.s.acceptor_state.term > msg.term {
|
||||
if self.control_store.persisted_state().acceptor_state.term > msg.term {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -680,8 +691,9 @@ where
|
||||
self.wal_store.truncate_wal(msg.start_streaming_at)?;
|
||||
|
||||
// and now adopt term history from proposer
|
||||
self.s.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.control_store.persist(&self.s)?;
|
||||
self.control_store.update_state(|state| {
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
})?;
|
||||
|
||||
info!("start receiving WAL since {:?}", msg.start_streaming_at);
|
||||
|
||||
@@ -691,7 +703,8 @@ where
|
||||
/// Advance commit_lsn taking into account what we have locally
|
||||
fn update_commit_lsn(&mut self) -> Result<()> {
|
||||
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
|
||||
assert!(commit_lsn >= self.s.commit_lsn);
|
||||
let persisted_commit_lsn = self.control_store.persisted_state().commit_lsn;
|
||||
assert!(commit_lsn >= persisted_commit_lsn);
|
||||
|
||||
self.commit_lsn = commit_lsn;
|
||||
self.metrics.commit_lsn.set(self.commit_lsn.0 as f64);
|
||||
@@ -703,7 +716,7 @@ where
|
||||
// Also note that commit_lsn can reach epoch_start_lsn earlier
|
||||
// that we receive new epoch_start_lsn, and we still need to sync
|
||||
// control file in this case.
|
||||
if commit_lsn == self.epoch_start_lsn && self.s.commit_lsn != commit_lsn {
|
||||
if commit_lsn == self.epoch_start_lsn && persisted_commit_lsn != commit_lsn {
|
||||
self.sync_control_file()?;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user