mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
WIP
This commit is contained in:
@@ -309,10 +309,7 @@ impl WalResidentTimeline {
|
||||
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
|
||||
// won't be removed until we're done.
|
||||
// TODO: do we still need this snapshot code path?
|
||||
let from_lsn = min(
|
||||
shared_state.sk.state().remote_consistent_lsn,
|
||||
shared_state.sk.state().backup_lsn,
|
||||
);
|
||||
let from_lsn = shared_state.sk.state().backup_lsn;
|
||||
if from_lsn == Lsn::INVALID {
|
||||
// this is possible if snapshot is called before handling first
|
||||
// elected message
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::timeline_manager::StateSnapshot;
|
||||
/// Get oldest LSN we still need to keep.
|
||||
///
|
||||
/// We hold WAL till it is consumed by
|
||||
/// 1) pageserver (remote_consistent_lsn)
|
||||
/// 1) pageserver (min_remote_consistent_lsn)
|
||||
/// 2) s3 offloading.
|
||||
/// 3) Additionally we must store WAL since last local commit_lsn because
|
||||
/// that's where we start looking for last WAL record on start.
|
||||
@@ -17,7 +17,7 @@ use crate::timeline_manager::StateSnapshot;
|
||||
pub(crate) fn calc_horizon_lsn(state: &StateSnapshot, extra_horizon_lsn: Option<Lsn>) -> Lsn {
|
||||
use std::cmp::min;
|
||||
|
||||
let mut horizon_lsn = state.cfile_remote_consistent_lsn;
|
||||
let mut horizon_lsn = state.min_remote_consistent_lsn;
|
||||
// we don't want to remove WAL that is not yet offloaded to s3
|
||||
horizon_lsn = min(horizon_lsn, state.cfile_backup_lsn);
|
||||
// Min by local commit_lsn to be able to begin reading WAL from somewhere on
|
||||
|
||||
@@ -560,19 +560,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
// Since we've been idle, also check if we can stop streaming.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
if let Some(remote_consistent_lsn) = self.wal_sender_guard
|
||||
.walsenders()
|
||||
.get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Stop streaming if the receivers are caught up and
|
||||
// there's no active compute. This causes the loop in
|
||||
// [`crate::send_interpreted_wal::InterpretedWalSender::run`]
|
||||
// to exit and terminate the WAL stream.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_watch_view.get().0,
|
||||
|
||||
@@ -251,17 +251,6 @@ impl WalSenders {
|
||||
shared.update_reply_feedback();
|
||||
}
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.get_feedback() {
|
||||
ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Unregister walsender.
|
||||
fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
|
||||
let mut shared = self.mutex.lock();
|
||||
@@ -890,28 +879,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Timed out waiting for WAL, check for termination and send KA.
|
||||
// Check for termination only if we are streaming up to commit_lsn
|
||||
// (to pageserver).
|
||||
if let EndWatch::Commit(_) = self.end_watch {
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, self.start_pos,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let msg = BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
@@ -1020,7 +987,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
.walsenders
|
||||
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
|
||||
self.tli
|
||||
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn, ps_feedback.generation)
|
||||
.process_remote_consistent_lsn_update(
|
||||
ps_feedback.generation,
|
||||
ps_feedback.remote_consistent_lsn,
|
||||
)
|
||||
.await;
|
||||
// in principle new remote_consistent_lsn could allow to
|
||||
// deactivate the timeline, but we check that regularly through
|
||||
|
||||
@@ -61,10 +61,9 @@ pub struct TimelinePersistentState {
|
||||
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
|
||||
/// only by walproposer.
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
/// Obsolete; nowadays we track remote_consistent_lsn by generation number
|
||||
/// in a separate cache with relaxed persistency requirements.
|
||||
remote_consistent_lsn: Lsn,
|
||||
/// Holds names of partial segments uploaded to remote storage. Used to
|
||||
/// clean up old objects without leaving garbage in remote storage.
|
||||
pub partial_backup: wal_backup_partial::State,
|
||||
|
||||
@@ -215,7 +215,7 @@ impl StateSK {
|
||||
StateSK::Empty => unreachable!(),
|
||||
}
|
||||
|
||||
// update everything else, including remote_consistent_lsn and backup_lsn
|
||||
// update everything else, including backup_lsn
|
||||
let mut sync_control_file = false;
|
||||
let state = self.state_mut();
|
||||
let wal_seg_size = state.server.wal_seg_size as u64;
|
||||
@@ -873,6 +873,16 @@ impl Timeline {
|
||||
pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
|
||||
self.manager_ctl.backup_partial_reset().await
|
||||
}
|
||||
|
||||
pub async fn process_remote_consistent_lsn_update(
|
||||
&self,
|
||||
generation: Generation,
|
||||
candidate: Lsn,
|
||||
) {
|
||||
// TODO: still update controlfile state for backwards compate
|
||||
|
||||
todo!("implement & use the remote_persistent_lsn cache")
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a guard that allows to read/write disk timeline state.
|
||||
@@ -897,23 +907,6 @@ impl Deref for WalResidentTimeline {
|
||||
}
|
||||
|
||||
impl WalResidentTimeline {
|
||||
/// Returns true if walsender should stop sending WAL to pageserver. We
|
||||
/// terminate it if remote_consistent_lsn reached commit_lsn and there is no
|
||||
/// computes. While there might be nothing to stream already, we learn about
|
||||
/// remote_consistent_lsn update through replication feedback, and we want
|
||||
/// to stop pushing to the broker if pageserver is fully caughtup.
|
||||
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let shared_state = self.read_shared_state().await;
|
||||
if self.walreceivers.get_num() == 0 {
|
||||
return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Ensure that current term is t, erroring otherwise, and lock the state.
|
||||
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
|
||||
let ss = self.read_shared_state().await;
|
||||
@@ -965,11 +958,6 @@ impl WalResidentTimeline {
|
||||
pub fn get_timeline_dir(&self) -> Utf8PathBuf {
|
||||
self.timeline_dir.clone()
|
||||
}
|
||||
|
||||
/// Update in memory remote consistent lsn.
|
||||
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn, generation: Generation) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct contains methods that are used by timeline manager task.
|
||||
|
||||
@@ -47,6 +47,7 @@ pub(crate) struct StateSnapshot {
|
||||
// inmem values
|
||||
pub(crate) commit_lsn: Lsn,
|
||||
pub(crate) backup_lsn: Lsn,
|
||||
pub(crate) min_remote_consistent_lsn: Lsn,
|
||||
|
||||
// persistent control file values
|
||||
pub(crate) cfile_commit_lsn: Lsn,
|
||||
@@ -58,7 +59,7 @@ pub(crate) struct StateSnapshot {
|
||||
|
||||
// misc
|
||||
pub(crate) cfile_last_persist_at: std::time::Instant,
|
||||
pub(crate) inmem_flush_pending: bool,
|
||||
pub(crate) cfile_inmem_flush_pending: bool,
|
||||
pub(crate) wal_removal_on_hold: bool,
|
||||
pub(crate) peers: Vec<PeerInfo>,
|
||||
}
|
||||
@@ -70,21 +71,23 @@ impl StateSnapshot {
|
||||
Self {
|
||||
commit_lsn: state.inmem.commit_lsn,
|
||||
backup_lsn: state.inmem.backup_lsn,
|
||||
min_remote_consistent_lsn: todo!(""),
|
||||
cfile_commit_lsn: state.commit_lsn,
|
||||
cfile_backup_lsn: state.backup_lsn,
|
||||
flush_lsn: read_guard.sk.flush_lsn(),
|
||||
last_log_term: read_guard.sk.last_log_term(),
|
||||
cfile_last_persist_at: state.pers.last_persist_at(),
|
||||
inmem_flush_pending: Self::has_unflushed_inmem_state(state),
|
||||
cfile_inmem_flush_pending: Self::has_unflushed_cfile_inmem_state(state),
|
||||
wal_removal_on_hold: read_guard.wal_removal_on_hold,
|
||||
peers: read_guard.get_peers(heartbeat_timeout),
|
||||
}
|
||||
}
|
||||
|
||||
fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
|
||||
fn has_unflushed_cfile_inmem_state(state: &TimelineState<FileStorage>) -> bool {
|
||||
state.inmem.commit_lsn > state.commit_lsn
|
||||
|| state.inmem.backup_lsn > state.backup_lsn
|
||||
|| state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
|
||||
// NB: remote_consistent_lsn storage is stored separately from control file
|
||||
}
|
||||
}
|
||||
|
||||
@@ -498,15 +501,14 @@ impl Manager {
|
||||
) {
|
||||
let is_active = is_wal_backup_required
|
||||
|| num_computes > 0
|
||||
// TODO: replace with new facility
|
||||
|| state.remote_consistent_lsn < state.commit_lsn;
|
||||
|| state.min_remote_consistent_lsn < state.commit_lsn;
|
||||
|
||||
// update the broker timeline set
|
||||
if self.tli_broker_active.set(is_active) {
|
||||
// write log if state has changed
|
||||
info!(
|
||||
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
|
||||
is_active, state.remote_consistent_lsn, state.commit_lsn,
|
||||
"timeline active={} now, min_remote_consistent_lsn={}, commit_lsn={}",
|
||||
is_active, state.min_remote_consistent_lsn, state.commit_lsn,
|
||||
);
|
||||
|
||||
MANAGER_ACTIVE_CHANGES.inc();
|
||||
@@ -524,7 +526,7 @@ impl Manager {
|
||||
state: &StateSnapshot,
|
||||
next_event: &mut Option<Instant>,
|
||||
) {
|
||||
if !state.inmem_flush_pending {
|
||||
if !state.cfile_inmem_flush_pending {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user