mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
experiment: find all places that need new rclsn-by-generation facility
delete all the code that uses current inmem & controlfile remote_consistent_lsn field After this removal, search for `remote_consistent_lsn` highlights the places where the new solution that tracks rclsn per generation id needs to be fitted in
This commit is contained in:
@@ -202,7 +202,6 @@ pub struct TimelineStatus {
|
||||
pub commit_lsn: Lsn,
|
||||
pub backup_lsn: Lsn,
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub peers: Vec<PeerInfo>,
|
||||
pub walsenders: Vec<WalSenderState>,
|
||||
pub walreceivers: Vec<WalReceiverState>,
|
||||
|
||||
@@ -266,7 +266,6 @@ components:
|
||||
- flush_lsn
|
||||
- commit_lsn
|
||||
- backup_lsn
|
||||
- remote_consistent_lsn
|
||||
- peer_horizon_lsn
|
||||
- safekeeper_connstr
|
||||
properties:
|
||||
@@ -279,8 +278,6 @@ components:
|
||||
type: string
|
||||
backup_lsn:
|
||||
type: string
|
||||
remote_consistent_lsn:
|
||||
type: string
|
||||
peer_horizon_lsn:
|
||||
type: string
|
||||
safekeeper_connstr:
|
||||
@@ -325,8 +322,6 @@ components:
|
||||
type: string
|
||||
peer_horizon_lsn:
|
||||
type: string
|
||||
remote_consistent_lsn:
|
||||
type: string
|
||||
|
||||
AcceptorStateStatus:
|
||||
type: object
|
||||
|
||||
@@ -199,7 +199,6 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
commit_lsn: inmem.commit_lsn,
|
||||
backup_lsn: inmem.backup_lsn,
|
||||
peer_horizon_lsn: inmem.peer_horizon_lsn,
|
||||
remote_consistent_lsn: inmem.remote_consistent_lsn,
|
||||
peers: tli.get_peers(conf).await,
|
||||
walsenders: tli.get_walsenders().get_all_public(),
|
||||
walreceivers: tli.get_walreceivers().get_all(),
|
||||
@@ -456,7 +455,6 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
last_log_term: sk_info.last_log_term.unwrap_or(0),
|
||||
flush_lsn: sk_info.flush_lsn.0,
|
||||
commit_lsn: sk_info.commit_lsn.0,
|
||||
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
|
||||
@@ -471,7 +471,6 @@ pub struct TimelineCollector {
|
||||
flush_lsn: GenericGaugeVec<AtomicU64>,
|
||||
epoch_start_lsn: GenericGaugeVec<AtomicU64>,
|
||||
peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
|
||||
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
|
||||
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
|
||||
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
|
||||
ps_feedback_count: GenericGaugeVec<AtomicU64>,
|
||||
@@ -543,16 +542,6 @@ impl TimelineCollector {
|
||||
.unwrap();
|
||||
descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
|
||||
|
||||
let remote_consistent_lsn = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_remote_consistent_lsn",
|
||||
"LSN which is persisted to the remote storage in pageserver",
|
||||
),
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
|
||||
|
||||
let ps_last_received_lsn = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_ps_last_received_lsn",
|
||||
@@ -698,7 +687,6 @@ impl TimelineCollector {
|
||||
flush_lsn,
|
||||
epoch_start_lsn,
|
||||
peer_horizon_lsn,
|
||||
remote_consistent_lsn,
|
||||
ps_last_received_lsn,
|
||||
feedback_last_time_seconds,
|
||||
ps_feedback_count,
|
||||
@@ -732,7 +720,6 @@ impl Collector for TimelineCollector {
|
||||
self.flush_lsn.reset();
|
||||
self.epoch_start_lsn.reset();
|
||||
self.peer_horizon_lsn.reset();
|
||||
self.remote_consistent_lsn.reset();
|
||||
self.ps_last_received_lsn.reset();
|
||||
self.feedback_last_time_seconds.reset();
|
||||
self.ps_feedback_count.reset();
|
||||
@@ -786,9 +773,6 @@ impl Collector for TimelineCollector {
|
||||
self.peer_horizon_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.mem_state.peer_horizon_lsn.into());
|
||||
self.remote_consistent_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.mem_state.remote_consistent_lsn.into());
|
||||
self.timeline_active
|
||||
.with_label_values(labels)
|
||||
.set(tli.timeline_is_active as u64);
|
||||
@@ -849,7 +833,6 @@ impl Collector for TimelineCollector {
|
||||
mfs.extend(self.flush_lsn.collect());
|
||||
mfs.extend(self.epoch_start_lsn.collect());
|
||||
mfs.extend(self.peer_horizon_lsn.collect());
|
||||
mfs.extend(self.remote_consistent_lsn.collect());
|
||||
mfs.extend(self.ps_last_received_lsn.collect());
|
||||
mfs.extend(self.feedback_last_time_seconds.collect());
|
||||
mfs.extend(self.ps_feedback_count.collect());
|
||||
|
||||
@@ -308,6 +308,7 @@ impl WalResidentTimeline {
|
||||
// removed further than `backup_lsn`. Since we're holding shared_state
|
||||
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
|
||||
// won't be removed until we're done.
|
||||
// TODO: do we still need this snapshot code path?
|
||||
let from_lsn = min(
|
||||
shared_state.sk.state().remote_consistent_lsn,
|
||||
shared_state.sk.state().backup_lsn,
|
||||
|
||||
@@ -171,7 +171,6 @@ pub struct TimelineMemState {
|
||||
pub commit_lsn: Lsn,
|
||||
pub backup_lsn: Lsn,
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
}
|
||||
@@ -198,7 +197,6 @@ where
|
||||
commit_lsn: state.commit_lsn,
|
||||
backup_lsn: state.backup_lsn,
|
||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||
remote_consistent_lsn: state.remote_consistent_lsn,
|
||||
proposer_uuid: state.proposer_uuid,
|
||||
},
|
||||
pers: state,
|
||||
@@ -213,7 +211,6 @@ where
|
||||
s.commit_lsn = self.inmem.commit_lsn;
|
||||
s.backup_lsn = self.inmem.backup_lsn;
|
||||
s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
||||
s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
|
||||
s.proposer_uuid = self.inmem.proposer_uuid;
|
||||
s
|
||||
}
|
||||
@@ -230,7 +227,6 @@ where
|
||||
self.inmem.commit_lsn = s.commit_lsn;
|
||||
self.inmem.backup_lsn = s.backup_lsn;
|
||||
self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
|
||||
self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
|
||||
self.inmem.proposer_uuid = s.proposer_uuid;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -223,13 +223,6 @@ impl StateSK {
|
||||
state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
|
||||
sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
|
||||
|
||||
state.inmem.remote_consistent_lsn = max(
|
||||
Lsn(sk_info.remote_consistent_lsn),
|
||||
state.inmem.remote_consistent_lsn,
|
||||
);
|
||||
sync_control_file |=
|
||||
state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
|
||||
|
||||
state.inmem.peer_horizon_lsn =
|
||||
max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
|
||||
sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
|
||||
@@ -365,7 +358,6 @@ impl SharedState {
|
||||
flush_lsn: self.sk.flush_lsn().0,
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.state().inmem.commit_lsn.0,
|
||||
remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: conf
|
||||
.advertise_pg_addr
|
||||
@@ -976,11 +968,7 @@ impl WalResidentTimeline {
|
||||
|
||||
/// Update in memory remote consistent lsn.
|
||||
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn, generation: Generation) {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
|
||||
shared_state.sk.state().inmem.remote_consistent_lsn,
|
||||
candidate,
|
||||
);
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,11 +47,9 @@ pub(crate) struct StateSnapshot {
|
||||
// inmem values
|
||||
pub(crate) commit_lsn: Lsn,
|
||||
pub(crate) backup_lsn: Lsn,
|
||||
pub(crate) remote_consistent_lsn: Lsn,
|
||||
|
||||
// persistent control file values
|
||||
pub(crate) cfile_commit_lsn: Lsn,
|
||||
pub(crate) cfile_remote_consistent_lsn: Lsn,
|
||||
pub(crate) cfile_backup_lsn: Lsn,
|
||||
|
||||
// latest state
|
||||
@@ -72,9 +70,7 @@ impl StateSnapshot {
|
||||
Self {
|
||||
commit_lsn: state.inmem.commit_lsn,
|
||||
backup_lsn: state.inmem.backup_lsn,
|
||||
remote_consistent_lsn: state.inmem.remote_consistent_lsn,
|
||||
cfile_commit_lsn: state.commit_lsn,
|
||||
cfile_remote_consistent_lsn: state.remote_consistent_lsn,
|
||||
cfile_backup_lsn: state.backup_lsn,
|
||||
flush_lsn: read_guard.sk.flush_lsn(),
|
||||
last_log_term: read_guard.sk.last_log_term(),
|
||||
@@ -89,7 +85,6 @@ impl StateSnapshot {
|
||||
state.inmem.commit_lsn > state.commit_lsn
|
||||
|| state.inmem.backup_lsn > state.backup_lsn
|
||||
|| state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
|
||||
|| state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
|
||||
}
|
||||
}
|
||||
|
||||
@@ -503,6 +498,7 @@ impl Manager {
|
||||
) {
|
||||
let is_active = is_wal_backup_required
|
||||
|| num_computes > 0
|
||||
// TODO: replace with new facility
|
||||
|| state.remote_consistent_lsn < state.commit_lsn;
|
||||
|
||||
// update the broker timeline set
|
||||
|
||||
@@ -38,8 +38,6 @@ message SafekeeperTimelineInfo {
|
||||
uint64 commit_lsn = 5;
|
||||
// LSN up to which safekeeper has backed WAL.
|
||||
uint64 backup_lsn = 6;
|
||||
// LSN of last checkpoint uploaded by pageserver.
|
||||
uint64 remote_consistent_lsn = 7;
|
||||
uint64 peer_horizon_lsn = 8;
|
||||
uint64 local_start_lsn = 9;
|
||||
uint64 standby_horizon = 14;
|
||||
|
||||
@@ -760,7 +760,6 @@ mod tests {
|
||||
flush_lsn: 1,
|
||||
commit_lsn: 2,
|
||||
backup_lsn: 3,
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
|
||||
@@ -34,7 +34,6 @@ class SafekeeperTimelineStatus:
|
||||
timeline_start_lsn: Lsn
|
||||
backup_lsn: Lsn
|
||||
peer_horizon_lsn: Lsn
|
||||
remote_consistent_lsn: Lsn
|
||||
walreceivers: list[Walreceiver]
|
||||
|
||||
|
||||
@@ -205,7 +204,6 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
|
||||
backup_lsn=Lsn(resj["backup_lsn"]),
|
||||
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
|
||||
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
|
||||
walreceivers=walreceivers,
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user