Compare commits

...

3 Commits

Author SHA1 Message Date
Christian Schwarz
fec3013fcd WIP 2025-02-28 22:10:08 +01:00
Christian Schwarz
4b04e3b55e experiment: find all places that need new rclsn-by-generation facility
delete all the code that uses current inmem & controlfile remote_consistent_lsn field

After this removal, search for `remote_consistent_lsn` highlights
the places where the new solution that tracks rclsn per generation id
needs to be fitted in
2025-02-20 01:06:38 +01:00
Christian Schwarz
436178faee report generation in PageserverFeedback (maybe we don't need this if we include in Offer RPC response?) 2025-02-20 01:05:38 +01:00
15 changed files with 46 additions and 139 deletions

View File

@@ -202,7 +202,6 @@ pub struct TimelineStatus {
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,
pub walreceivers: Vec<WalReceiverState>,

View File

@@ -5,7 +5,7 @@ use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use tracing::{trace, warn};
use crate::lsn::Lsn;
use crate::{generation::Generation, lsn::Lsn};
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
///
@@ -32,6 +32,12 @@ pub struct PageserverFeedback {
pub replytime: SystemTime,
/// Used to track feedbacks from different shards. Always zero for unsharded tenants.
pub shard_number: u32,
/// The shard's pageserver-side generation number.
/// Used to track `remote_consistent_lsn` by generation which is required
/// to determine whether
/// - WAL offers still need to be sent
/// - in future: whether WAL can be evicted and/or pruned
pub generation: Generation,
}
impl PageserverFeedback {
@@ -43,6 +49,7 @@ impl PageserverFeedback {
disk_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
shard_number: 0,
generation: Generation::none(),
}
}
@@ -101,6 +108,8 @@ impl PageserverFeedback {
buf.put_u32(self.shard_number);
}
todo!("ps_generation");
buf[buf_ptr] = nkeys;
}
@@ -147,6 +156,9 @@ impl PageserverFeedback {
assert_eq!(len, 4);
rf.shard_number = buf.get_u32();
}
b"ps_generation" => {
todo!();
}
_ => {
let len = buf.get_i32();
warn!(

View File

@@ -266,7 +266,6 @@ components:
- flush_lsn
- commit_lsn
- backup_lsn
- remote_consistent_lsn
- peer_horizon_lsn
- safekeeper_connstr
properties:
@@ -279,8 +278,6 @@ components:
type: string
backup_lsn:
type: string
remote_consistent_lsn:
type: string
peer_horizon_lsn:
type: string
safekeeper_connstr:
@@ -325,8 +322,6 @@ components:
type: string
peer_horizon_lsn:
type: string
remote_consistent_lsn:
type: string
AcceptorStateStatus:
type: object

View File

@@ -199,7 +199,6 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
commit_lsn: inmem.commit_lsn,
backup_lsn: inmem.backup_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,
remote_consistent_lsn: inmem.remote_consistent_lsn,
peers: tli.get_peers(conf).await,
walsenders: tli.get_walsenders().get_all_public(),
walreceivers: tli.get_walreceivers().get_all(),
@@ -456,7 +455,6 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
last_log_term: sk_info.last_log_term.unwrap_or(0),
flush_lsn: sk_info.flush_lsn.0,
commit_lsn: sk_info.commit_lsn.0,
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),

View File

@@ -471,7 +471,6 @@ pub struct TimelineCollector {
flush_lsn: GenericGaugeVec<AtomicU64>,
epoch_start_lsn: GenericGaugeVec<AtomicU64>,
peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
ps_feedback_count: GenericGaugeVec<AtomicU64>,
@@ -543,16 +542,6 @@ impl TimelineCollector {
.unwrap();
descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
let remote_consistent_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_remote_consistent_lsn",
"LSN which is persisted to the remote storage in pageserver",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
let ps_last_received_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_ps_last_received_lsn",
@@ -698,7 +687,6 @@ impl TimelineCollector {
flush_lsn,
epoch_start_lsn,
peer_horizon_lsn,
remote_consistent_lsn,
ps_last_received_lsn,
feedback_last_time_seconds,
ps_feedback_count,
@@ -732,7 +720,6 @@ impl Collector for TimelineCollector {
self.flush_lsn.reset();
self.epoch_start_lsn.reset();
self.peer_horizon_lsn.reset();
self.remote_consistent_lsn.reset();
self.ps_last_received_lsn.reset();
self.feedback_last_time_seconds.reset();
self.ps_feedback_count.reset();
@@ -786,9 +773,6 @@ impl Collector for TimelineCollector {
self.peer_horizon_lsn
.with_label_values(labels)
.set(tli.mem_state.peer_horizon_lsn.into());
self.remote_consistent_lsn
.with_label_values(labels)
.set(tli.mem_state.remote_consistent_lsn.into());
self.timeline_active
.with_label_values(labels)
.set(tli.timeline_is_active as u64);
@@ -849,7 +833,6 @@ impl Collector for TimelineCollector {
mfs.extend(self.flush_lsn.collect());
mfs.extend(self.epoch_start_lsn.collect());
mfs.extend(self.peer_horizon_lsn.collect());
mfs.extend(self.remote_consistent_lsn.collect());
mfs.extend(self.ps_last_received_lsn.collect());
mfs.extend(self.feedback_last_time_seconds.collect());
mfs.extend(self.ps_feedback_count.collect());

View File

@@ -308,10 +308,8 @@ impl WalResidentTimeline {
// removed further than `backup_lsn`. Since we're holding shared_state
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
// won't be removed until we're done.
let from_lsn = min(
shared_state.sk.state().remote_consistent_lsn,
shared_state.sk.state().backup_lsn,
);
// TODO: do we still need this snapshot code path?
let from_lsn = shared_state.sk.state().backup_lsn;
if from_lsn == Lsn::INVALID {
// this is possible if snapshot is called before handling first
// elected message

View File

@@ -5,7 +5,7 @@ use crate::timeline_manager::StateSnapshot;
/// Get oldest LSN we still need to keep.
///
/// We hold WAL till it is consumed by
/// 1) pageserver (remote_consistent_lsn)
/// 1) pageserver (min_remote_consistent_lsn)
/// 2) s3 offloading.
/// 3) Additionally we must store WAL since last local commit_lsn because
/// that's where we start looking for last WAL record on start.
@@ -17,7 +17,7 @@ use crate::timeline_manager::StateSnapshot;
pub(crate) fn calc_horizon_lsn(state: &StateSnapshot, extra_horizon_lsn: Option<Lsn>) -> Lsn {
use std::cmp::min;
let mut horizon_lsn = state.cfile_remote_consistent_lsn;
let mut horizon_lsn = state.min_remote_consistent_lsn;
// we don't want to remove WAL that is not yet offloaded to s3
horizon_lsn = min(horizon_lsn, state.cfile_backup_lsn);
// Min by local commit_lsn to be able to begin reading WAL from somewhere on

View File

@@ -560,19 +560,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
// Send a periodic keep alive when the connection has been idle for a while.
// Since we've been idle, also check if we can stop streaming.
_ = keepalive_ticker.tick() => {
if let Some(remote_consistent_lsn) = self.wal_sender_guard
.walsenders()
.get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Stop streaming if the receivers are caught up and
// there's no active compute. This causes the loop in
// [`crate::send_interpreted_wal::InterpretedWalSender::run`]
// to exit and terminate the WAL stream.
break;
}
}
self.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
wal_end: self.end_watch_view.get().0,

View File

@@ -251,17 +251,6 @@ impl WalSenders {
shared.update_reply_feedback();
}
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
/// client is not pageserver.
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
let shared = self.mutex.lock();
let slot = shared.get_slot(id);
match slot.get_feedback() {
ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
_ => None,
}
}
/// Unregister walsender.
fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
let mut shared = self.mutex.lock();
@@ -890,28 +879,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
return Ok(());
}
// Timed out waiting for WAL, check for termination and send KA.
// Check for termination only if we are streaming up to commit_lsn
// (to pageserver).
if let EndWatch::Commit(_) = self.end_watch {
if let Some(remote_consistent_lsn) = self
.ws_guard
.walsenders
.get_ws_remote_consistent_lsn(self.ws_guard.id)
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,
)));
}
}
}
let msg = BeMessage::KeepAlive(WalSndKeepAlive {
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
@@ -1020,7 +987,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
.walsenders
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
self.tli
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.process_remote_consistent_lsn_update(
ps_feedback.generation,
ps_feedback.remote_consistent_lsn,
)
.await;
// in principle new remote_consistent_lsn could allow to
// deactivate the timeline, but we check that regularly through

View File

@@ -61,10 +61,9 @@ pub struct TimelinePersistentState {
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
/// only by walproposer.
pub peer_horizon_lsn: Lsn,
/// LSN of the oldest known checkpoint made by pageserver and successfully
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
/// Obsolete; nowadays we track remote_consistent_lsn by generation number
/// in a separate cache with relaxed persistency requirements.
remote_consistent_lsn: Lsn,
/// Holds names of partial segments uploaded to remote storage. Used to
/// clean up old objects without leaving garbage in remote storage.
pub partial_backup: wal_backup_partial::State,
@@ -171,7 +170,6 @@ pub struct TimelineMemState {
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
}
@@ -198,7 +196,6 @@ where
commit_lsn: state.commit_lsn,
backup_lsn: state.backup_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
remote_consistent_lsn: state.remote_consistent_lsn,
proposer_uuid: state.proposer_uuid,
},
pers: state,
@@ -213,7 +210,6 @@ where
s.commit_lsn = self.inmem.commit_lsn;
s.backup_lsn = self.inmem.backup_lsn;
s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
s.proposer_uuid = self.inmem.proposer_uuid;
s
}
@@ -230,7 +226,6 @@ where
self.inmem.commit_lsn = s.commit_lsn;
self.inmem.backup_lsn = s.backup_lsn;
self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
self.inmem.proposer_uuid = s.proposer_uuid;
Ok(())
}

View File

@@ -11,6 +11,7 @@ use safekeeper_api::models::{
use safekeeper_api::Term;
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
use utils::generation::Generation;
use utils::id::TenantId;
use utils::sync::gate::Gate;
@@ -214,7 +215,7 @@ impl StateSK {
StateSK::Empty => unreachable!(),
}
// update everything else, including remote_consistent_lsn and backup_lsn
// update everything else, including backup_lsn
let mut sync_control_file = false;
let state = self.state_mut();
let wal_seg_size = state.server.wal_seg_size as u64;
@@ -222,13 +223,6 @@ impl StateSK {
state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
state.inmem.remote_consistent_lsn = max(
Lsn(sk_info.remote_consistent_lsn),
state.inmem.remote_consistent_lsn,
);
sync_control_file |=
state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
state.inmem.peer_horizon_lsn =
max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
@@ -364,7 +358,6 @@ impl SharedState {
flush_lsn: self.sk.flush_lsn().0,
// note: this value is not flushed to control file yet and can be lost
commit_lsn: self.sk.state().inmem.commit_lsn.0,
remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
safekeeper_connstr: conf
.advertise_pg_addr
@@ -880,6 +873,16 @@ impl Timeline {
pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
self.manager_ctl.backup_partial_reset().await
}
pub async fn process_remote_consistent_lsn_update(
&self,
generation: Generation,
candidate: Lsn,
) {
// TODO: still update controlfile state for backwards compate
todo!("implement & use the remote_persistent_lsn cache")
}
}
/// This is a guard that allows to read/write disk timeline state.
@@ -904,23 +907,6 @@ impl Deref for WalResidentTimeline {
}
impl WalResidentTimeline {
/// Returns true if walsender should stop sending WAL to pageserver. We
/// terminate it if remote_consistent_lsn reached commit_lsn and there is no
/// computes. While there might be nothing to stream already, we learn about
/// remote_consistent_lsn update through replication feedback, and we want
/// to stop pushing to the broker if pageserver is fully caughtup.
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
if self.is_cancelled() {
return true;
}
let shared_state = self.read_shared_state().await;
if self.walreceivers.get_num() == 0 {
return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
}
false
}
/// Ensure that current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
let ss = self.read_shared_state().await;
@@ -972,15 +958,6 @@ impl WalResidentTimeline {
pub fn get_timeline_dir(&self) -> Utf8PathBuf {
self.timeline_dir.clone()
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
shared_state.sk.state().inmem.remote_consistent_lsn,
candidate,
);
}
}
/// This struct contains methods that are used by timeline manager task.

View File

@@ -47,11 +47,10 @@ pub(crate) struct StateSnapshot {
// inmem values
pub(crate) commit_lsn: Lsn,
pub(crate) backup_lsn: Lsn,
pub(crate) remote_consistent_lsn: Lsn,
pub(crate) min_remote_consistent_lsn: Lsn,
// persistent control file values
pub(crate) cfile_commit_lsn: Lsn,
pub(crate) cfile_remote_consistent_lsn: Lsn,
pub(crate) cfile_backup_lsn: Lsn,
// latest state
@@ -60,7 +59,7 @@ pub(crate) struct StateSnapshot {
// misc
pub(crate) cfile_last_persist_at: std::time::Instant,
pub(crate) inmem_flush_pending: bool,
pub(crate) cfile_inmem_flush_pending: bool,
pub(crate) wal_removal_on_hold: bool,
pub(crate) peers: Vec<PeerInfo>,
}
@@ -72,24 +71,23 @@ impl StateSnapshot {
Self {
commit_lsn: state.inmem.commit_lsn,
backup_lsn: state.inmem.backup_lsn,
remote_consistent_lsn: state.inmem.remote_consistent_lsn,
min_remote_consistent_lsn: todo!(""),
cfile_commit_lsn: state.commit_lsn,
cfile_remote_consistent_lsn: state.remote_consistent_lsn,
cfile_backup_lsn: state.backup_lsn,
flush_lsn: read_guard.sk.flush_lsn(),
last_log_term: read_guard.sk.last_log_term(),
cfile_last_persist_at: state.pers.last_persist_at(),
inmem_flush_pending: Self::has_unflushed_inmem_state(state),
cfile_inmem_flush_pending: Self::has_unflushed_cfile_inmem_state(state),
wal_removal_on_hold: read_guard.wal_removal_on_hold,
peers: read_guard.get_peers(heartbeat_timeout),
}
}
fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
fn has_unflushed_cfile_inmem_state(state: &TimelineState<FileStorage>) -> bool {
state.inmem.commit_lsn > state.commit_lsn
|| state.inmem.backup_lsn > state.backup_lsn
|| state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
|| state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
// NB: remote_consistent_lsn storage is stored separately from control file
}
}
@@ -503,14 +501,14 @@ impl Manager {
) {
let is_active = is_wal_backup_required
|| num_computes > 0
|| state.remote_consistent_lsn < state.commit_lsn;
|| state.min_remote_consistent_lsn < state.commit_lsn;
// update the broker timeline set
if self.tli_broker_active.set(is_active) {
// write log if state has changed
info!(
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
is_active, state.remote_consistent_lsn, state.commit_lsn,
"timeline active={} now, min_remote_consistent_lsn={}, commit_lsn={}",
is_active, state.min_remote_consistent_lsn, state.commit_lsn,
);
MANAGER_ACTIVE_CHANGES.inc();
@@ -528,7 +526,7 @@ impl Manager {
state: &StateSnapshot,
next_event: &mut Option<Instant>,
) {
if !state.inmem_flush_pending {
if !state.cfile_inmem_flush_pending {
return;
}

View File

@@ -38,8 +38,6 @@ message SafekeeperTimelineInfo {
uint64 commit_lsn = 5;
// LSN up to which safekeeper has backed WAL.
uint64 backup_lsn = 6;
// LSN of last checkpoint uploaded by pageserver.
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
uint64 local_start_lsn = 9;
uint64 standby_horizon = 14;

View File

@@ -760,7 +760,6 @@ mod tests {
flush_lsn: 1,
commit_lsn: 2,
backup_lsn: 3,
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
http_connstr: "neon-1-sk-1.local:7677".to_owned(),

View File

@@ -34,7 +34,6 @@ class SafekeeperTimelineStatus:
timeline_start_lsn: Lsn
backup_lsn: Lsn
peer_horizon_lsn: Lsn
remote_consistent_lsn: Lsn
walreceivers: list[Walreceiver]
@@ -205,7 +204,6 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
backup_lsn=Lsn(resj["backup_lsn"]),
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
walreceivers=walreceivers,
)