diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 2cd1accdb5..a409a0054c 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -309,10 +309,7 @@ impl WalResidentTimeline { // lock and setting `wal_removal_on_hold` later, it guarantees that WAL // won't be removed until we're done. // TODO: do we still need this snapshot code path? - let from_lsn = min( - shared_state.sk.state().remote_consistent_lsn, - shared_state.sk.state().backup_lsn, - ); + let from_lsn = shared_state.sk.state().backup_lsn; if from_lsn == Lsn::INVALID { // this is possible if snapshot is called before handling first // elected message diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index df3ba9eb08..3d49952105 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -5,7 +5,7 @@ use crate::timeline_manager::StateSnapshot; /// Get oldest LSN we still need to keep. /// /// We hold WAL till it is consumed by -/// 1) pageserver (remote_consistent_lsn) +/// 1) pageserver (min_remote_consistent_lsn) /// 2) s3 offloading. /// 3) Additionally we must store WAL since last local commit_lsn because /// that's where we start looking for last WAL record on start. @@ -17,7 +17,7 @@ use crate::timeline_manager::StateSnapshot; pub(crate) fn calc_horizon_lsn(state: &StateSnapshot, extra_horizon_lsn: Option) -> Lsn { use std::cmp::min; - let mut horizon_lsn = state.cfile_remote_consistent_lsn; + let mut horizon_lsn = state.min_remote_consistent_lsn; // we don't want to remove WAL that is not yet offloaded to s3 horizon_lsn = min(horizon_lsn, state.cfile_backup_lsn); // Min by local commit_lsn to be able to begin reading WAL from somewhere on diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 5916675c3f..482c38fd41 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -560,19 +560,6 @@ impl InterpretedWalSender<'_, IO> { // Send a periodic keep alive when the connection has been idle for a while. // Since we've been idle, also check if we can stop streaming. _ = keepalive_ticker.tick() => { - if let Some(remote_consistent_lsn) = self.wal_sender_guard - .walsenders() - .get_ws_remote_consistent_lsn(self.wal_sender_guard.id()) - { - if self.tli.should_walsender_stop(remote_consistent_lsn).await { - // Stop streaming if the receivers are caught up and - // there's no active compute. This causes the loop in - // [`crate::send_interpreted_wal::InterpretedWalSender::run`] - // to exit and terminate the WAL stream. - break; - } - } - self.pgb .write_message(&BeMessage::KeepAlive(WalSndKeepAlive { wal_end: self.end_watch_view.get().0, diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 66584f7814..1be13acee9 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -251,17 +251,6 @@ impl WalSenders { shared.update_reply_feedback(); } - /// Get remote_consistent_lsn reported by the pageserver. Returns None if - /// client is not pageserver. - pub fn get_ws_remote_consistent_lsn(self: &Arc, id: WalSenderId) -> Option { - let shared = self.mutex.lock(); - let slot = shared.get_slot(id); - match slot.get_feedback() { - ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn), - _ => None, - } - } - /// Unregister walsender. fn unregister(self: &Arc, id: WalSenderId) { let mut shared = self.mutex.lock(); @@ -890,28 +879,6 @@ impl WalSender<'_, IO> { return Ok(()); } - // Timed out waiting for WAL, check for termination and send KA. - // Check for termination only if we are streaming up to commit_lsn - // (to pageserver). - if let EndWatch::Commit(_) = self.end_watch { - if let Some(remote_consistent_lsn) = self - .ws_guard - .walsenders - .get_ws_remote_consistent_lsn(self.ws_guard.id) - { - if self.tli.should_walsender_stop(remote_consistent_lsn).await { - // Terminate if there is nothing more to send. - // Note that "ending streaming" part of the string is used by - // pageserver to identify WalReceiverError::SuccessfulCompletion, - // do not change this string without updating pageserver. - return Err(CopyStreamHandlerEnd::ServerInitiated(format!( - "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", - self.appname, self.start_pos, - ))); - } - } - } - let msg = BeMessage::KeepAlive(WalSndKeepAlive { wal_end: self.end_pos.0, timestamp: get_current_timestamp(), @@ -1020,7 +987,10 @@ impl ReplyReader { .walsenders .record_ps_feedback(self.ws_guard.id, &ps_feedback); self.tli - .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn, ps_feedback.generation) + .process_remote_consistent_lsn_update( + ps_feedback.generation, + ps_feedback.remote_consistent_lsn, + ) .await; // in principle new remote_consistent_lsn could allow to // deactivate the timeline, but we check that regularly through diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index e8c4d47161..98f460859b 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -61,10 +61,9 @@ pub struct TimelinePersistentState { /// walproposer proto called 'truncate_lsn'. Updates are currently drived /// only by walproposer. pub peer_horizon_lsn: Lsn, - /// LSN of the oldest known checkpoint made by pageserver and successfully - /// pushed to s3. We don't remove WAL beyond it. Persisted only for - /// informational purposes, we receive it from pageserver (or broker). - pub remote_consistent_lsn: Lsn, + /// Obsolete; nowadays we track remote_consistent_lsn by generation number + /// in a separate cache with relaxed persistency requirements. + remote_consistent_lsn: Lsn, /// Holds names of partial segments uploaded to remote storage. Used to /// clean up old objects without leaving garbage in remote storage. pub partial_backup: wal_backup_partial::State, diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 55aee7ec50..9c3bcce648 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -215,7 +215,7 @@ impl StateSK { StateSK::Empty => unreachable!(), } - // update everything else, including remote_consistent_lsn and backup_lsn + // update everything else, including backup_lsn let mut sync_control_file = false; let state = self.state_mut(); let wal_seg_size = state.server.wal_seg_size as u64; @@ -873,6 +873,16 @@ impl Timeline { pub async fn backup_partial_reset(self: &Arc) -> Result> { self.manager_ctl.backup_partial_reset().await } + + pub async fn process_remote_consistent_lsn_update( + &self, + generation: Generation, + candidate: Lsn, + ) { + // TODO: still update controlfile state for backwards compate + + todo!("implement & use the remote_persistent_lsn cache") + } } /// This is a guard that allows to read/write disk timeline state. @@ -897,23 +907,6 @@ impl Deref for WalResidentTimeline { } impl WalResidentTimeline { - /// Returns true if walsender should stop sending WAL to pageserver. We - /// terminate it if remote_consistent_lsn reached commit_lsn and there is no - /// computes. While there might be nothing to stream already, we learn about - /// remote_consistent_lsn update through replication feedback, and we want - /// to stop pushing to the broker if pageserver is fully caughtup. - pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { - if self.is_cancelled() { - return true; - } - let shared_state = self.read_shared_state().await; - if self.walreceivers.get_num() == 0 { - return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet - reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn; - } - false - } - /// Ensure that current term is t, erroring otherwise, and lock the state. pub async fn acquire_term(&self, t: Term) -> Result { let ss = self.read_shared_state().await; @@ -965,11 +958,6 @@ impl WalResidentTimeline { pub fn get_timeline_dir(&self) -> Utf8PathBuf { self.timeline_dir.clone() } - - /// Update in memory remote consistent lsn. - pub async fn update_remote_consistent_lsn(&self, candidate: Lsn, generation: Generation) { - todo!() - } } /// This struct contains methods that are used by timeline manager task. diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 07960ee742..fb65fbd299 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -47,6 +47,7 @@ pub(crate) struct StateSnapshot { // inmem values pub(crate) commit_lsn: Lsn, pub(crate) backup_lsn: Lsn, + pub(crate) min_remote_consistent_lsn: Lsn, // persistent control file values pub(crate) cfile_commit_lsn: Lsn, @@ -58,7 +59,7 @@ pub(crate) struct StateSnapshot { // misc pub(crate) cfile_last_persist_at: std::time::Instant, - pub(crate) inmem_flush_pending: bool, + pub(crate) cfile_inmem_flush_pending: bool, pub(crate) wal_removal_on_hold: bool, pub(crate) peers: Vec, } @@ -70,21 +71,23 @@ impl StateSnapshot { Self { commit_lsn: state.inmem.commit_lsn, backup_lsn: state.inmem.backup_lsn, + min_remote_consistent_lsn: todo!(""), cfile_commit_lsn: state.commit_lsn, cfile_backup_lsn: state.backup_lsn, flush_lsn: read_guard.sk.flush_lsn(), last_log_term: read_guard.sk.last_log_term(), cfile_last_persist_at: state.pers.last_persist_at(), - inmem_flush_pending: Self::has_unflushed_inmem_state(state), + cfile_inmem_flush_pending: Self::has_unflushed_cfile_inmem_state(state), wal_removal_on_hold: read_guard.wal_removal_on_hold, peers: read_guard.get_peers(heartbeat_timeout), } } - fn has_unflushed_inmem_state(state: &TimelineState) -> bool { + fn has_unflushed_cfile_inmem_state(state: &TimelineState) -> bool { state.inmem.commit_lsn > state.commit_lsn || state.inmem.backup_lsn > state.backup_lsn || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn + // NB: remote_consistent_lsn storage is stored separately from control file } } @@ -498,15 +501,14 @@ impl Manager { ) { let is_active = is_wal_backup_required || num_computes > 0 - // TODO: replace with new facility - || state.remote_consistent_lsn < state.commit_lsn; + || state.min_remote_consistent_lsn < state.commit_lsn; // update the broker timeline set if self.tli_broker_active.set(is_active) { // write log if state has changed info!( - "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}", - is_active, state.remote_consistent_lsn, state.commit_lsn, + "timeline active={} now, min_remote_consistent_lsn={}, commit_lsn={}", + is_active, state.min_remote_consistent_lsn, state.commit_lsn, ); MANAGER_ACTIVE_CHANGES.inc(); @@ -524,7 +526,7 @@ impl Manager { state: &StateSnapshot, next_event: &mut Option, ) { - if !state.inmem_flush_pending { + if !state.cfile_inmem_flush_pending { return; }