Compare commits

...

3 Commits

Author SHA1 Message Date
Arthur Petukhovsky
8d9b5d4a98 WIP update state with closure 2022-03-20 13:34:52 +00:00
Arthur Petukhovsky
e43c42e457 Fix sync on epoch_start_lsn reach 2022-03-18 09:54:20 +00:00
Arthur Petukhovsky
1e57769151 Fix commit_lsn calculations 2022-03-18 09:54:20 +00:00
2 changed files with 146 additions and 84 deletions

View File

@@ -38,8 +38,8 @@ lazy_static! {
}
pub trait Storage {
/// Persist safekeeper state on disk.
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
fn persisted_state(&self) -> &SafeKeeperState;
fn update_state<F: Fn(&mut SafeKeeperState)>(&mut self, f: F) -> Result<()>;
}
#[derive(Debug)]
@@ -48,19 +48,27 @@ pub struct FileStorage {
timeline_dir: PathBuf,
conf: SafeKeeperConf,
persist_control_file_seconds: Histogram,
/// Last state persisted to disk.
state: SafeKeeperState,
}
impl FileStorage {
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage {
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
let timeline_dir = conf.timeline_dir(zttid);
let tenant_id = zttid.tenant_id.to_string();
let timeline_id = zttid.timeline_id.to_string();
FileStorage {
let state = Self::load_control_file_conf(conf, zttid)
.context("failed to load from control file")?;
Ok(FileStorage {
timeline_dir,
conf: conf.clone(),
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
.with_label_values(&[&tenant_id, &timeline_id]),
}
state: state,
})
}
// Check the magic/version in the on-disk data and deserialize it, if possible.
@@ -139,9 +147,7 @@ impl FileStorage {
})?;
Ok(state)
}
}
impl Storage for FileStorage {
// persists state durably to underlying storage
// for description see https://lwn.net/Articles/457667/
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
@@ -205,6 +211,20 @@ impl Storage for FileStorage {
}
}
impl Storage for FileStorage {
fn persisted_state(&self) -> &SafeKeeperState {
&self.state
}
fn update_state<F: Fn(&mut SafeKeeperState)>(&mut self, f: F) -> Result<()> {
let mut new_state = self.state.clone();
f(&mut new_state);
self.persist(&new_state)?;
self.state = new_state;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::FileStorage;

View File

@@ -470,14 +470,12 @@ struct SafeKeeperMetrics {
}
impl SafeKeeperMetrics {
fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, commit_lsn: Lsn) -> Self {
fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let m = Self {
Self {
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
};
m.commit_lsn.set(u64::from(commit_lsn) as f64);
m
}
}
}
@@ -487,10 +485,14 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
// Cached metrics so we don't have to recompute labels on each update.
metrics: SafeKeeperMetrics,
/// not-yet-flushed pairs of same named fields in s.*
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
global_commit_lsn: Lsn,
/// LSN since the proposer appends WAL; determines epoch switch point.
epoch_start_lsn: Lsn,
// not-yet-flushed pairs of same named fields in control_store.persisted_state()
pub commit_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub s: SafeKeeperState, // persistent part
pub control_store: CTRL,
pub wal_store: WAL,
@@ -506,17 +508,19 @@ where
ztli: ZTimelineId,
control_store: CTRL,
wal_store: WAL,
state: SafeKeeperState,
) -> SafeKeeper<CTRL, WAL> {
let state = control_store.persisted_state();
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
}
SafeKeeper {
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli, state.commit_lsn),
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli),
global_commit_lsn: state.commit_lsn,
epoch_start_lsn: Lsn(0),
commit_lsn: state.commit_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
s: state,
control_store,
wal_store,
}
@@ -524,7 +528,8 @@ where
/// Get history of term switches for the available WAL
fn get_term_history(&self) -> TermHistory {
self.s
self.control_store
.persisted_state()
.acceptor_state
.term_history
.up_to(self.wal_store.flush_lsn())
@@ -532,7 +537,10 @@ where
#[cfg(test)]
fn get_epoch(&self) -> Term {
self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn())
self.control_store
.persisted_state()
.acceptor_state
.get_epoch(self.wal_store.flush_lsn())
}
/// Process message from proposer and possibly form reply. Concurrent
@@ -559,6 +567,8 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
let state = self.control_store.persisted_state();
/* Check protocol compatibility */
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
@@ -568,49 +578,47 @@ where
);
}
/* Postgres upgrade is not treated as fatal error */
if msg.pg_version != self.s.server.pg_version
&& self.s.server.pg_version != UNKNOWN_SERVER_VERSION
if msg.pg_version != state.server.pg_version
&& state.server.pg_version != UNKNOWN_SERVER_VERSION
{
info!(
"incompatible server version {}, expected {}",
msg.pg_version, self.s.server.pg_version
msg.pg_version, state.server.pg_version
);
}
if msg.tenant_id != self.s.tenant_id {
if msg.tenant_id != state.tenant_id {
bail!(
"invalid tenant ID, got {}, expected {}",
msg.tenant_id,
self.s.tenant_id
state.tenant_id
);
}
if msg.ztli != self.s.timeline_id {
if msg.ztli != state.timeline_id {
bail!(
"invalid timeline ID, got {}, expected {}",
msg.ztli,
self.s.timeline_id
state.timeline_id
);
}
// set basic info about server, if not yet
// TODO: verify that is doesn't change after
self.s.server.system_id = msg.system_id;
self.s.server.wal_seg_size = msg.wal_seg_size;
self.control_store
.persist(&self.s)
.context("failed to persist shared state")?;
self.control_store.update_state(|state| {
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
}).context("failed to persist shared state")?;
let state = self.control_store.persisted_state();
// pass wal_seg_size to read WAL and find flush_lsn
self.wal_store.init_storage(&self.s)?;
// update tenant_id/timeline_id in metrics
self.metrics = SafeKeeperMetrics::new(msg.tenant_id, msg.ztli, self.commit_lsn);
self.wal_store.init_storage(state)?;
info!(
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
msg.proposer_id, state.acceptor_state.term
);
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.s.acceptor_state.term,
term: state.acceptor_state.term,
})))
}
@@ -619,20 +627,24 @@ where
&mut self,
msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> {
let state = self.control_store.persisted_state();
// initialize with refusal
let mut resp = VoteResponse {
term: self.s.acceptor_state.term,
term: state.acceptor_state.term,
vote_given: false as u64,
flush_lsn: self.wal_store.flush_lsn(),
truncate_lsn: self.s.peer_horizon_lsn,
truncate_lsn: state.peer_horizon_lsn,
term_history: self.get_term_history(),
};
if self.s.acceptor_state.term < msg.term {
self.s.acceptor_state.term = msg.term;
// persist vote before sending it out
self.control_store.persist(&self.s)?;
resp.term = self.s.acceptor_state.term;
if state.acceptor_state.term < msg.term {
resp.term = msg.term;
resp.vote_given = true as u64;
// persist vote before sending it out
self.control_store.update_state(|state| {
state.acceptor_state.term = msg.term;
})?;
}
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
@@ -640,19 +652,22 @@ where
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.s.acceptor_state.term < term {
self.s.acceptor_state.term = term;
self.control_store.persist(&self.s)?;
if self.control_store.persisted_state().acceptor_state.term < term {
self.control_store.update_state(|state| {
state.acceptor_state.term = term;
})?;
}
Ok(())
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
let state = self.control_store.persisted_state();
let ar = AppendResponse {
term: self.s.acceptor_state.term,
term: state.acceptor_state.term,
flush_lsn: self.wal_store.flush_lsn(),
commit_lsn: self.s.commit_lsn,
commit_lsn: state.commit_lsn,
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
zenith_feedback: ZenithFeedback::empty(),
@@ -665,7 +680,7 @@ where
info!("received ProposerElected {:?}", msg);
self.bump_if_higher(msg.term)?;
// If our term is higher, ignore the message (next feedback will inform the compute)
if self.s.acceptor_state.term > msg.term {
if self.control_store.persisted_state().acceptor_state.term > msg.term {
return Ok(None);
}
@@ -676,20 +691,59 @@ where
self.wal_store.truncate_wal(msg.start_streaming_at)?;
// and now adopt term history from proposer
self.s.acceptor_state.term_history = msg.term_history.clone();
self.control_store.persist(&self.s)?;
self.control_store.update_state(|state| {
state.acceptor_state.term_history = msg.term_history.clone();
})?;
info!("start receiving WAL since {:?}", msg.start_streaming_at);
Ok(None)
}
/// Advance commit_lsn taking into account what we have locally
fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
let persisted_commit_lsn = self.control_store.persisted_state().commit_lsn;
assert!(commit_lsn >= persisted_commit_lsn);
self.commit_lsn = commit_lsn;
self.metrics.commit_lsn.set(self.commit_lsn.0 as f64);
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
// Also note that commit_lsn can reach epoch_start_lsn earlier
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && persisted_commit_lsn != commit_lsn {
self.sync_control_file()?;
}
// We got our first commit_lsn, which means we should sync
// everything to disk, to initialize the state.
if self.s.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) {
self.wal_store.flush_wal()?;
self.sync_control_file()?;
}
Ok(())
}
/// Persist in-memory state to the disk.
fn sync_control_file(&mut self) -> Result<()> {
self.s.commit_lsn = self.commit_lsn;
self.s.peer_horizon_lsn = self.peer_horizon_lsn;
self.control_store.persist(&self.s)
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
fn handle_append_request(
&mut self,
msg: &AppendRequest,
mut require_flush: bool,
require_flush: bool,
) -> Result<Option<AcceptorProposerMessage>> {
if self.s.acceptor_state.term < msg.h.term {
bail!("got AppendRequest before ProposerElected");
@@ -701,13 +755,11 @@ where
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
// After ProposerElected, which performs truncation, we should get only
// indeed append requests (but flush_lsn is advanced only on record
// boundary, so might be less).
assert!(self.wal_store.flush_lsn() <= msg.h.begin_lsn);
// Now we know that we are in the same term as the proposer,
// processing the message.
self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.s.proposer_uuid = msg.h.proposer_uuid;
let mut sync_control_file = false;
// do the job
if !msg.wal_data.is_empty() {
@@ -716,10 +768,9 @@ where
// If this was the first record we ever receieved, initialize
// commit_lsn to help find_end_of_wal skip the hole in the
// beginning.
if self.s.commit_lsn == Lsn(0) {
self.s.commit_lsn = msg.h.begin_lsn;
sync_control_file = true;
require_flush = true;
if self.global_commit_lsn == Lsn(0) {
// TODO: can we be sure that first AppendRequest has at least one complete record?
self.global_commit_lsn = msg.h.begin_lsn;
}
}
@@ -728,35 +779,22 @@ where
self.wal_store.flush_wal()?;
}
// Advance commit_lsn taking into account what we have locally.
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case.
// Update global_commit_lsn, verifying that it cannot decrease.
if msg.h.commit_lsn != Lsn(0) {
let commit_lsn = min(msg.h.commit_lsn, self.wal_store.flush_lsn());
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
sync_control_file |= commit_lsn == msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.commit_lsn
.set(u64::from(self.commit_lsn) as f64);
assert!(msg.h.commit_lsn >= self.global_commit_lsn);
if msg.h.commit_lsn > self.global_commit_lsn {
self.global_commit_lsn = msg.h.commit_lsn;
}
}
self.peer_horizon_lsn = msg.h.truncate_lsn;
self.update_commit_lsn()?;
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
// when truncate_lsn delta exceeds WAL segment size.
sync_control_file |=
self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) < self.peer_horizon_lsn;
if sync_control_file {
self.s.commit_lsn = self.commit_lsn;
self.s.peer_horizon_lsn = self.peer_horizon_lsn;
}
if sync_control_file {
self.control_store.persist(&self.s)?;
if self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) < self.peer_horizon_lsn {
self.sync_control_file()?;
}
trace!(
@@ -780,6 +818,10 @@ where
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal()?;
// commit_lsn can be updated because we have new flushed data locally.
self.update_commit_lsn()?;
Ok(Some(AcceptorProposerMessage::AppendResponse(
self.append_response(),
)))