Refactor control file update at safekeeper.

Record global_commit_lsn, have common routine for control file update, add
SafekeeperMemstate.
This commit is contained in:
Arthur Petukhovsky
2022-03-12 20:28:44 +00:00
committed by Arseny Sher
parent a883202495
commit 8a901de52a
2 changed files with 87 additions and 50 deletions

View File

@@ -202,6 +202,14 @@ pub struct SafeKeeperState {
pub peers: Peers,
}
#[derive(Debug, Clone)]
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; they are
// not flushed yet.
pub struct SafekeeperMemState {
pub commit_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
}
impl SafeKeeperState {
pub fn new(zttid: &ZTenantTimelineId, peers: Vec<ZNodeId>) -> SafeKeeperState {
SafeKeeperState {
@@ -470,14 +478,12 @@ struct SafeKeeperMetrics {
}
impl SafeKeeperMetrics {
fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, commit_lsn: Lsn) -> Self {
fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let m = Self {
Self {
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
};
m.commit_lsn.set(u64::from(commit_lsn) as f64);
m
}
}
}
@@ -487,9 +493,14 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
// Cached metrics so we don't have to recompute labels on each update.
metrics: SafeKeeperMetrics,
/// not-yet-flushed pairs of same named fields in s.*
pub commit_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
global_commit_lsn: Lsn,
/// LSN since the proposer safekeeper currently talking to appends WAL;
/// determines epoch switch point.
epoch_start_lsn: Lsn,
pub inmem: SafekeeperMemState, // in memory part
pub s: SafeKeeperState, // persistent part
pub control_store: CTRL,
@@ -513,9 +524,13 @@ where
}
SafeKeeper {
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli, state.commit_lsn),
commit_lsn: state.commit_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli),
global_commit_lsn: state.commit_lsn,
epoch_start_lsn: Lsn(0),
inmem: SafekeeperMemState {
commit_lsn: state.commit_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
},
s: state,
control_store,
wal_store,
@@ -602,9 +617,6 @@ where
// pass wal_seg_size to read WAL and find flush_lsn
self.wal_store.init_storage(&self.s)?;
// update tenant_id/timeline_id in metrics
self.metrics = SafeKeeperMetrics::new(msg.tenant_id, msg.ztli, self.commit_lsn);
info!(
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
@@ -684,12 +696,49 @@ where
Ok(None)
}
/// Advance commit_lsn taking into account what we have locally
fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
assert!(commit_lsn >= self.inmem.commit_lsn);
self.inmem.commit_lsn = commit_lsn;
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
// Also note that commit_lsn can reach epoch_start_lsn earlier
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && self.s.commit_lsn != commit_lsn {
self.persist_control_file()?;
}
// We got our first commit_lsn, which means we should sync
// everything to disk, to initialize the state.
if self.s.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) {
self.wal_store.flush_wal()?;
self.persist_control_file()?;
}
Ok(())
}
/// Persist in-memory state to the disk.
fn persist_control_file(&mut self) -> Result<()> {
self.s.commit_lsn = self.inmem.commit_lsn;
self.s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
self.control_store.persist(&self.s)
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
fn handle_append_request(
&mut self,
msg: &AppendRequest,
mut require_flush: bool,
require_flush: bool,
) -> Result<Option<AcceptorProposerMessage>> {
if self.s.acceptor_state.term < msg.h.term {
bail!("got AppendRequest before ProposerElected");
@@ -701,25 +750,22 @@ where
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
// After ProposerElected, which performs truncation, we should get only
// indeed append requests (but flush_lsn is advanced only on record
// boundary, so might be less).
assert!(self.wal_store.flush_lsn() <= msg.h.begin_lsn);
// Now we know that we are in the same term as the proposer,
// processing the message.
self.epoch_start_lsn = msg.h.epoch_start_lsn;
// TODO: don't update state without persisting to disk
self.s.proposer_uuid = msg.h.proposer_uuid;
let mut sync_control_file = false;
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
// If this was the first record we ever receieved, initialize
// If this was the first record we ever received, initialize
// commit_lsn to help find_end_of_wal skip the hole in the
// beginning.
if self.s.commit_lsn == Lsn(0) {
self.s.commit_lsn = msg.h.begin_lsn;
sync_control_file = true;
require_flush = true;
if self.global_commit_lsn == Lsn(0) {
self.global_commit_lsn = msg.h.begin_lsn;
}
}
@@ -728,35 +774,22 @@ where
self.wal_store.flush_wal()?;
}
// Advance commit_lsn taking into account what we have locally.
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case.
// Update global_commit_lsn, verifying that it cannot decrease.
if msg.h.commit_lsn != Lsn(0) {
let commit_lsn = min(msg.h.commit_lsn, self.wal_store.flush_lsn());
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
sync_control_file |= commit_lsn == msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.commit_lsn
.set(u64::from(self.commit_lsn) as f64);
assert!(msg.h.commit_lsn >= self.global_commit_lsn);
self.global_commit_lsn = msg.h.commit_lsn;
}
self.peer_horizon_lsn = msg.h.truncate_lsn;
self.inmem.peer_horizon_lsn = msg.h.truncate_lsn;
self.update_commit_lsn()?;
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
// when truncate_lsn delta exceeds WAL segment size.
sync_control_file |=
self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) < self.peer_horizon_lsn;
if sync_control_file {
self.s.commit_lsn = self.commit_lsn;
self.s.peer_horizon_lsn = self.peer_horizon_lsn;
}
if sync_control_file {
self.control_store.persist(&self.s)?;
if self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn
{
self.persist_control_file()?;
}
trace!(
@@ -780,6 +813,10 @@ where
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal()?;
// commit_lsn can be updated because we have new flushed data locally.
self.update_commit_lsn()?;
Ok(Some(AcceptorProposerMessage::AppendResponse(
self.append_response(),
)))

View File

@@ -340,7 +340,7 @@ impl Timeline {
let replica_state = shared_state.replicas[replica_id].unwrap();
let deactivate = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
(replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.last_received_lsn >= shared_state.sk.commit_lsn);
replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn);
if deactivate {
shared_state.deactivate(&self.zttid, callmemaybe_tx)?;
return Ok(true);
@@ -394,7 +394,7 @@ impl Timeline {
rmsg = shared_state.sk.process_msg(msg)?;
// locally available commit lsn. flush_lsn can be smaller than
// commit_lsn if we are catching up safekeeper.
commit_lsn = shared_state.sk.commit_lsn;
commit_lsn = shared_state.sk.inmem.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {