Compare commits

...

1 Commits

Author SHA1 Message Date
Erik Grinaker
8dd8369a83 safekeeper: rename flush_lsn() to flush_record_lsn() 2024-11-17 18:33:51 +01:00
6 changed files with 22 additions and 20 deletions

View File

@@ -603,7 +603,10 @@ where
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
pub fn flush_lsn(&self) -> Lsn {
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
max(
self.wal_store.flush_record_lsn(),
self.state.timeline_start_lsn,
)
}
/// Process message from proposer and possibly form reply. Concurrent
@@ -828,7 +831,7 @@ where
//
// If we fail before first WAL write flush this action would be
// repeated, that's ok because it is idempotent.
if self.wal_store.flush_lsn() == Lsn::INVALID {
if self.wal_store.flush_record_lsn() == Lsn::INVALID {
self.wal_store
.initialize_first_segment(msg.start_streaming_at)
.await?;
@@ -947,7 +950,7 @@ where
// while first connection still gets some packets later. It might be
// better to not log this as error! above.
let write_lsn = self.wal_store.write_lsn();
let flush_lsn = self.wal_store.flush_lsn();
let flush_lsn = self.wal_store.flush_record_lsn();
if write_lsn > msg.h.begin_lsn {
bail!(
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
@@ -1087,7 +1090,7 @@ mod tests {
self.lsn
}
fn flush_lsn(&self) -> Lsn {
fn flush_record_lsn(&self) -> Lsn {
self.lsn
}

View File

@@ -176,7 +176,7 @@ pub enum StateSK {
impl StateSK {
pub fn flush_lsn(&self) -> Lsn {
match self {
StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
StateSK::Loaded(sk) => sk.wal_store.flush_record_lsn(),
StateSK::Offloaded(state) => match state.eviction_state {
EvictionState::Offloaded(flush_lsn) => flush_lsn,
_ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
@@ -1108,11 +1108,11 @@ impl ManagerTimeline {
);
}
if wal_store.flush_lsn() != shared.sk.flush_lsn() {
if wal_store.flush_record_lsn() != shared.sk.flush_lsn() {
bail!(
"flush_lsn mismatch in restored WAL, expected {}, got {}",
shared.sk.flush_lsn(),
wal_store.flush_lsn()
wal_store.flush_record_lsn()
);
}

View File

@@ -599,7 +599,7 @@ pub async fn validate_temp_timeline(
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();
let flush_lsn = wal_store.flush_record_lsn();
Ok((commit_lsn, flush_lsn))
}

View File

@@ -35,10 +35,10 @@ use pq_proto::SystemId;
use utils::{id::TenantTimelineId, lsn::Lsn};
pub trait Storage {
// Last written LSN.
/// Last written LSN.
fn write_lsn(&self) -> Lsn;
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// End LSN of last durably stored WAL record.
fn flush_record_lsn(&self) -> Lsn;
/// Initialize segment by creating proper long header at the beginning of
/// the segment and short header at the page of given LSN. This is only used
@@ -116,11 +116,13 @@ pub struct PhysicalStorage {
/// The last LSN flushed to disk. May be in the middle of a record.
///
/// NB: when the rest of the system refers to `flush_lsn`, it usually
/// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
/// and should be resolved.
/// means `flush_record_lsn`. This `flush_lsn` is only used internally.
flush_lsn: Lsn,
/// The LSN of the last WAL record flushed to disk.
///
/// NB: when the rest of the system refers to `flush_lsn`, it usually
/// means `flush_record_lsn`.
flush_record_lsn: Lsn,
/// Decoder is required for detecting boundaries of WAL records.
@@ -387,11 +389,8 @@ impl Storage for PhysicalStorage {
fn write_lsn(&self) -> Lsn {
self.write_lsn
}
/// flush_lsn returns LSN of last durably stored WAL record.
///
/// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
#[allow(clippy::misnamed_getters)]
fn flush_lsn(&self) -> Lsn {
/// End LSN of the last durably stored WAL record.
fn flush_record_lsn(&self) -> Lsn {
self.flush_record_lsn
}

View File

@@ -178,7 +178,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
let mut conns: HashMap<usize, ConnState> = HashMap::new();
for (&_ttid, shared_state) in global.timelines.iter_mut() {
let flush_lsn = shared_state.sk.wal_store.flush_lsn();
let flush_lsn = shared_state.sk.wal_store.flush_record_lsn();
let commit_lsn = shared_state.sk.state.commit_lsn;
os.log_event(format!("tli_loaded;{};{}", flush_lsn.0, commit_lsn.0));
}

View File

@@ -180,7 +180,7 @@ impl wal_storage::Storage for DiskWALStorage {
self.write_lsn
}
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
fn flush_record_lsn(&self) -> Lsn {
self.flush_record_lsn
}