mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-03 20:20:38 +00:00
Fix commit_lsn calculations
This commit is contained in:
@@ -470,14 +470,12 @@ struct SafeKeeperMetrics {
|
||||
}
|
||||
|
||||
impl SafeKeeperMetrics {
|
||||
fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, commit_lsn: Lsn) -> Self {
|
||||
fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self {
|
||||
let tenant_id = tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
let m = Self {
|
||||
Self {
|
||||
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
};
|
||||
m.commit_lsn.set(u64::from(commit_lsn) as f64);
|
||||
m
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -487,10 +485,17 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
||||
// Cached metrics so we don't have to recompute labels on each update.
|
||||
metrics: SafeKeeperMetrics,
|
||||
|
||||
/// not-yet-flushed pairs of same named fields in s.*
|
||||
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
|
||||
global_commit_lsn: Lsn,
|
||||
/// LSN since the proposer appends WAL; determines epoch switch point.
|
||||
epoch_start_lsn: Lsn,
|
||||
|
||||
// not-yet-flushed pairs of same named fields in s.*
|
||||
pub commit_lsn: Lsn,
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
pub s: SafeKeeperState, // persistent part
|
||||
|
||||
// persisted to disk
|
||||
pub s: SafeKeeperState,
|
||||
|
||||
pub control_store: CTRL,
|
||||
pub wal_store: WAL,
|
||||
@@ -513,7 +518,9 @@ where
|
||||
}
|
||||
|
||||
SafeKeeper {
|
||||
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli, state.commit_lsn),
|
||||
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli),
|
||||
global_commit_lsn: state.commit_lsn,
|
||||
epoch_start_lsn: Lsn(0),
|
||||
commit_lsn: state.commit_lsn,
|
||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||
s: state,
|
||||
@@ -602,9 +609,6 @@ where
|
||||
// pass wal_seg_size to read WAL and find flush_lsn
|
||||
self.wal_store.init_storage(&self.s)?;
|
||||
|
||||
// update tenant_id/timeline_id in metrics
|
||||
self.metrics = SafeKeeperMetrics::new(msg.tenant_id, msg.ztli, self.commit_lsn);
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
msg.proposer_id, self.s.acceptor_state.term
|
||||
@@ -684,12 +688,49 @@ where
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Advance commit_lsn taking into account what we have locally
|
||||
fn update_commit_lsn(&mut self) -> Result<()> {
|
||||
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
|
||||
if self.commit_lsn >= commit_lsn {
|
||||
return Ok(());
|
||||
}
|
||||
let commit_lsn_was_zero = self.commit_lsn == Lsn(0);
|
||||
|
||||
self.commit_lsn = commit_lsn;
|
||||
self.metrics.commit_lsn.set(self.commit_lsn.0 as f64);
|
||||
|
||||
// If new commit_lsn reached epoch switch, force sync of control
|
||||
// file: walproposer in sync mode is very interested when this
|
||||
// happens. Note: this is for sync-safekeepers mode only, as
|
||||
// otherwise commit_lsn might jump over epoch_start_lsn.
|
||||
if commit_lsn == self.epoch_start_lsn {
|
||||
self.sync_control_file()?;
|
||||
}
|
||||
|
||||
// We got our first commit_lsn, which means we should sync
|
||||
// everything to disk, to initialize the state.
|
||||
if commit_lsn_was_zero {
|
||||
self.wal_store.flush_wal()?;
|
||||
self.sync_control_file()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist in-memory state to the disk.
|
||||
fn sync_control_file(&mut self) -> Result<()> {
|
||||
self.s.commit_lsn = self.commit_lsn;
|
||||
self.s.peer_horizon_lsn = self.peer_horizon_lsn;
|
||||
|
||||
self.control_store.persist(&self.s)
|
||||
}
|
||||
|
||||
/// Handle request to append WAL.
|
||||
#[allow(clippy::comparison_chain)]
|
||||
fn handle_append_request(
|
||||
&mut self,
|
||||
msg: &AppendRequest,
|
||||
mut require_flush: bool,
|
||||
require_flush: bool,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
if self.s.acceptor_state.term < msg.h.term {
|
||||
bail!("got AppendRequest before ProposerElected");
|
||||
@@ -701,13 +742,11 @@ where
|
||||
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
|
||||
}
|
||||
|
||||
// After ProposerElected, which performs truncation, we should get only
|
||||
// indeed append requests (but flush_lsn is advanced only on record
|
||||
// boundary, so might be less).
|
||||
assert!(self.wal_store.flush_lsn() <= msg.h.begin_lsn);
|
||||
// Now we know that we are in the same term as the proposer,
|
||||
// processing the message.
|
||||
|
||||
self.epoch_start_lsn = msg.h.epoch_start_lsn;
|
||||
self.s.proposer_uuid = msg.h.proposer_uuid;
|
||||
let mut sync_control_file = false;
|
||||
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
@@ -716,10 +755,9 @@ where
|
||||
// If this was the first record we ever receieved, initialize
|
||||
// commit_lsn to help find_end_of_wal skip the hole in the
|
||||
// beginning.
|
||||
if self.s.commit_lsn == Lsn(0) {
|
||||
self.s.commit_lsn = msg.h.begin_lsn;
|
||||
sync_control_file = true;
|
||||
require_flush = true;
|
||||
if self.global_commit_lsn == Lsn(0) {
|
||||
// TODO: can we be sure that first AppendRequest has at least one complete record?
|
||||
self.global_commit_lsn = msg.h.begin_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -728,35 +766,22 @@ where
|
||||
self.wal_store.flush_wal()?;
|
||||
}
|
||||
|
||||
// Advance commit_lsn taking into account what we have locally.
|
||||
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
|
||||
// collected majority of its epoch acks yet, ignore it in this case.
|
||||
// Update global_commit_lsn, verifying that it cannot decrease.
|
||||
if msg.h.commit_lsn != Lsn(0) {
|
||||
let commit_lsn = min(msg.h.commit_lsn, self.wal_store.flush_lsn());
|
||||
// If new commit_lsn reached epoch switch, force sync of control
|
||||
// file: walproposer in sync mode is very interested when this
|
||||
// happens. Note: this is for sync-safekeepers mode only, as
|
||||
// otherwise commit_lsn might jump over epoch_start_lsn.
|
||||
sync_control_file |= commit_lsn == msg.h.epoch_start_lsn;
|
||||
self.commit_lsn = commit_lsn;
|
||||
self.metrics
|
||||
.commit_lsn
|
||||
.set(u64::from(self.commit_lsn) as f64);
|
||||
assert!(msg.h.commit_lsn >= self.global_commit_lsn);
|
||||
if msg.h.commit_lsn > self.global_commit_lsn {
|
||||
self.global_commit_lsn = msg.h.commit_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
self.peer_horizon_lsn = msg.h.truncate_lsn;
|
||||
self.update_commit_lsn()?;
|
||||
|
||||
// Update truncate and commit LSN in control file.
|
||||
// To avoid negative impact on performance of extra fsync, do it only
|
||||
// when truncate_lsn delta exceeds WAL segment size.
|
||||
sync_control_file |=
|
||||
self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) < self.peer_horizon_lsn;
|
||||
if sync_control_file {
|
||||
self.s.commit_lsn = self.commit_lsn;
|
||||
self.s.peer_horizon_lsn = self.peer_horizon_lsn;
|
||||
}
|
||||
|
||||
if sync_control_file {
|
||||
self.control_store.persist(&self.s)?;
|
||||
if self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) < self.peer_horizon_lsn {
|
||||
self.sync_control_file()?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
@@ -780,6 +805,10 @@ where
|
||||
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
|
||||
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
|
||||
self.wal_store.flush_wal()?;
|
||||
|
||||
// commit_lsn can be updated because we have new flushed data locally.
|
||||
self.update_commit_lsn()?;
|
||||
|
||||
Ok(Some(AcceptorProposerMessage::AppendResponse(
|
||||
self.append_response(),
|
||||
)))
|
||||
|
||||
Reference in New Issue
Block a user