From 8a901de52a270b8bf8a97a256527037fb0031276 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Sat, 12 Mar 2022 20:28:44 +0000 Subject: [PATCH] Refactor control file update at safekeeper. Record global_commit_lsn, have common routine for control file update, add SafekeeperMemstate. --- walkeeper/src/safekeeper.rs | 133 +++++++++++++++++++++++------------- walkeeper/src/timeline.rs | 4 +- 2 files changed, 87 insertions(+), 50 deletions(-) diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 53fd6f5588..8300b32b42 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -202,6 +202,14 @@ pub struct SafeKeeperState { pub peers: Peers, } +#[derive(Debug, Clone)] +// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; they are +// not flushed yet. +pub struct SafekeeperMemState { + pub commit_lsn: Lsn, + pub peer_horizon_lsn: Lsn, +} + impl SafeKeeperState { pub fn new(zttid: &ZTenantTimelineId, peers: Vec) -> SafeKeeperState { SafeKeeperState { @@ -470,14 +478,12 @@ struct SafeKeeperMetrics { } impl SafeKeeperMetrics { - fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, commit_lsn: Lsn) -> Self { + fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self { let tenant_id = tenant_id.to_string(); let timeline_id = timeline_id.to_string(); - let m = Self { + Self { commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), - }; - m.commit_lsn.set(u64::from(commit_lsn) as f64); - m + } } } @@ -487,9 +493,14 @@ pub struct SafeKeeper { // Cached metrics so we don't have to recompute labels on each update. metrics: SafeKeeperMetrics, - /// not-yet-flushed pairs of same named fields in s.* - pub commit_lsn: Lsn, - pub peer_horizon_lsn: Lsn, + /// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn. + global_commit_lsn: Lsn, + /// LSN since the proposer safekeeper currently talking to appends WAL; + /// determines epoch switch point. + epoch_start_lsn: Lsn, + + pub inmem: SafekeeperMemState, // in memory part + pub s: SafeKeeperState, // persistent part pub control_store: CTRL, @@ -513,9 +524,13 @@ where } SafeKeeper { - metrics: SafeKeeperMetrics::new(state.tenant_id, ztli, state.commit_lsn), - commit_lsn: state.commit_lsn, - peer_horizon_lsn: state.peer_horizon_lsn, + metrics: SafeKeeperMetrics::new(state.tenant_id, ztli), + global_commit_lsn: state.commit_lsn, + epoch_start_lsn: Lsn(0), + inmem: SafekeeperMemState { + commit_lsn: state.commit_lsn, + peer_horizon_lsn: state.peer_horizon_lsn, + }, s: state, control_store, wal_store, @@ -602,9 +617,6 @@ where // pass wal_seg_size to read WAL and find flush_lsn self.wal_store.init_storage(&self.s)?; - // update tenant_id/timeline_id in metrics - self.metrics = SafeKeeperMetrics::new(msg.tenant_id, msg.ztli, self.commit_lsn); - info!( "processed greeting from proposer {:?}, sending term {:?}", msg.proposer_id, self.s.acceptor_state.term @@ -684,12 +696,49 @@ where Ok(None) } + /// Advance commit_lsn taking into account what we have locally + fn update_commit_lsn(&mut self) -> Result<()> { + let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn()); + assert!(commit_lsn >= self.inmem.commit_lsn); + + self.inmem.commit_lsn = commit_lsn; + self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64); + + // If new commit_lsn reached epoch switch, force sync of control + // file: walproposer in sync mode is very interested when this + // happens. Note: this is for sync-safekeepers mode only, as + // otherwise commit_lsn might jump over epoch_start_lsn. + // Also note that commit_lsn can reach epoch_start_lsn earlier + // that we receive new epoch_start_lsn, and we still need to sync + // control file in this case. + if commit_lsn == self.epoch_start_lsn && self.s.commit_lsn != commit_lsn { + self.persist_control_file()?; + } + + // We got our first commit_lsn, which means we should sync + // everything to disk, to initialize the state. + if self.s.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) { + self.wal_store.flush_wal()?; + self.persist_control_file()?; + } + + Ok(()) + } + + /// Persist in-memory state to the disk. + fn persist_control_file(&mut self) -> Result<()> { + self.s.commit_lsn = self.inmem.commit_lsn; + self.s.peer_horizon_lsn = self.inmem.peer_horizon_lsn; + + self.control_store.persist(&self.s) + } + /// Handle request to append WAL. #[allow(clippy::comparison_chain)] fn handle_append_request( &mut self, msg: &AppendRequest, - mut require_flush: bool, + require_flush: bool, ) -> Result> { if self.s.acceptor_state.term < msg.h.term { bail!("got AppendRequest before ProposerElected"); @@ -701,25 +750,22 @@ where return Ok(Some(AcceptorProposerMessage::AppendResponse(resp))); } - // After ProposerElected, which performs truncation, we should get only - // indeed append requests (but flush_lsn is advanced only on record - // boundary, so might be less). - assert!(self.wal_store.flush_lsn() <= msg.h.begin_lsn); + // Now we know that we are in the same term as the proposer, + // processing the message. + self.epoch_start_lsn = msg.h.epoch_start_lsn; + // TODO: don't update state without persisting to disk self.s.proposer_uuid = msg.h.proposer_uuid; - let mut sync_control_file = false; // do the job if !msg.wal_data.is_empty() { self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; - // If this was the first record we ever receieved, initialize + // If this was the first record we ever received, initialize // commit_lsn to help find_end_of_wal skip the hole in the // beginning. - if self.s.commit_lsn == Lsn(0) { - self.s.commit_lsn = msg.h.begin_lsn; - sync_control_file = true; - require_flush = true; + if self.global_commit_lsn == Lsn(0) { + self.global_commit_lsn = msg.h.begin_lsn; } } @@ -728,35 +774,22 @@ where self.wal_store.flush_wal()?; } - // Advance commit_lsn taking into account what we have locally. - // commit_lsn can be 0, being unknown to new walproposer while he hasn't - // collected majority of its epoch acks yet, ignore it in this case. + // Update global_commit_lsn, verifying that it cannot decrease. if msg.h.commit_lsn != Lsn(0) { - let commit_lsn = min(msg.h.commit_lsn, self.wal_store.flush_lsn()); - // If new commit_lsn reached epoch switch, force sync of control - // file: walproposer in sync mode is very interested when this - // happens. Note: this is for sync-safekeepers mode only, as - // otherwise commit_lsn might jump over epoch_start_lsn. - sync_control_file |= commit_lsn == msg.h.epoch_start_lsn; - self.commit_lsn = commit_lsn; - self.metrics - .commit_lsn - .set(u64::from(self.commit_lsn) as f64); + assert!(msg.h.commit_lsn >= self.global_commit_lsn); + self.global_commit_lsn = msg.h.commit_lsn; } - self.peer_horizon_lsn = msg.h.truncate_lsn; + self.inmem.peer_horizon_lsn = msg.h.truncate_lsn; + self.update_commit_lsn()?; + // Update truncate and commit LSN in control file. // To avoid negative impact on performance of extra fsync, do it only // when truncate_lsn delta exceeds WAL segment size. - sync_control_file |= - self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) < self.peer_horizon_lsn; - if sync_control_file { - self.s.commit_lsn = self.commit_lsn; - self.s.peer_horizon_lsn = self.peer_horizon_lsn; - } - - if sync_control_file { - self.control_store.persist(&self.s)?; + if self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) + < self.inmem.peer_horizon_lsn + { + self.persist_control_file()?; } trace!( @@ -780,6 +813,10 @@ where /// Flush WAL to disk. Return AppendResponse with latest LSNs. fn handle_flush(&mut self) -> Result> { self.wal_store.flush_wal()?; + + // commit_lsn can be updated because we have new flushed data locally. + self.update_commit_lsn()?; + Ok(Some(AcceptorProposerMessage::AppendResponse( self.append_response(), ))) diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index ea8308b95e..b53f2e086b 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -340,7 +340,7 @@ impl Timeline { let replica_state = shared_state.replicas[replica_id].unwrap(); let deactivate = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet (replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. - replica_state.last_received_lsn >= shared_state.sk.commit_lsn); + replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn); if deactivate { shared_state.deactivate(&self.zttid, callmemaybe_tx)?; return Ok(true); @@ -394,7 +394,7 @@ impl Timeline { rmsg = shared_state.sk.process_msg(msg)?; // locally available commit lsn. flush_lsn can be smaller than // commit_lsn if we are catching up safekeeper. - commit_lsn = shared_state.sk.commit_lsn; + commit_lsn = shared_state.sk.inmem.commit_lsn; // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {