Propagate repply_flush_lsn from SK to PS to prevent GC from collecting objects which may be still requested by replica

This commit is contained in:
Konstantin Knizhnik
2024-01-15 15:20:50 +02:00
parent f3d7d23805
commit 8cfc5ade57
9 changed files with 83 additions and 18 deletions

View File

@@ -50,6 +50,8 @@ pub struct SkTimelineInfo {
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub http_connstr: Option<String>,
#[serde(default = "lsn_invalid")]
pub standby_flush_lsn: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -215,6 +215,8 @@ pub struct Timeline {
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
pub(crate) standby_flush_lsn: AtomicLsn,
// WAL redo manager. `None` only for broken tenants.
walredo_mgr: Option<Arc<super::WalRedoManager>>,
@@ -1540,6 +1542,7 @@ impl Timeline {
compaction_lock: tokio::sync::Mutex::default(),
gc_lock: tokio::sync::Mutex::default(),
standby_flush_lsn: AtomicLsn::new(0),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -4198,7 +4201,11 @@ impl Timeline {
(horizon_cutoff, pitr_cutoff, retain_lsns)
};
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
let standby_flush_lsn = self.standby_flush_lsn.load();
if standby_flush_lsn != Lsn::INVALID && standby_flush_lsn < new_gc_cutoff {
new_gc_cutoff = standby_flush_lsn;
}
let res = self
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)

View File

@@ -553,6 +553,10 @@ impl ConnectionManagerState {
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
WALRECEIVER_BROKER_UPDATES.inc();
self.timeline
.standby_flush_lsn
.store(Lsn(timeline_update.standby_flush_lsn));
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
let old_entry = self.wal_stream_candidates.insert(
new_safekeeper_id,
@@ -561,7 +565,6 @@ impl ConnectionManagerState {
latest_update: Utc::now().naive_utc(),
},
);
if old_entry.is_none() {
info!("New SK node was added: {new_safekeeper_id}");
WALRECEIVER_CANDIDATES_ADDED.inc();
@@ -920,6 +923,7 @@ mod tests {
remote_consistent_lsn: 0,
peer_horizon_lsn: 0,
local_start_lsn: 0,
standby_flush_lsn: 0,
safekeeper_connstr: safekeeper_connstr.to_owned(),
http_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,

View File

@@ -350,6 +350,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,
standby_flush_lsn: sk_info.standby_flush_lsn.0,
};
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;

View File

@@ -83,8 +83,17 @@ impl StandbyReply {
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyFeedback {
reply: StandbyReply,
hs_feedback: HotStandbyFeedback,
pub reply: StandbyReply,
pub hs_feedback: HotStandbyFeedback,
}
impl StandbyFeedback {
pub fn empty() -> Self {
StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// WalSenders registry. Timeline holds it (wrapped in Arc).
@@ -142,9 +151,9 @@ impl WalSenders {
}
/// Get aggregated pageserver and hot standby feedback (we send them to compute).
pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, StandbyFeedback) {
let shared = self.mutex.lock();
(shared.agg_ps_feedback, shared.agg_hs_feedback)
(shared.agg_ps_feedback, shared.agg_standby_feedback)
}
/// Record new pageserver feedback, update aggregated values.
@@ -182,7 +191,7 @@ impl WalSenders {
})
}
}
shared.update_hs_feedback();
shared.update_reply_feedback();
}
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
@@ -200,13 +209,13 @@ impl WalSenders {
fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
shared.update_hs_feedback();
shared.update_reply_feedback();
}
}
struct WalSendersShared {
// aggregated over all walsenders value
agg_hs_feedback: HotStandbyFeedback,
agg_standby_feedback: StandbyFeedback,
// aggregated over all walsenders value
agg_ps_feedback: PageserverFeedback,
slots: Vec<Option<WalSenderState>>,
@@ -215,7 +224,7 @@ struct WalSendersShared {
impl WalSendersShared {
fn new() -> Self {
WalSendersShared {
agg_hs_feedback: HotStandbyFeedback::empty(),
agg_standby_feedback: StandbyFeedback::empty(),
agg_ps_feedback: PageserverFeedback::empty(),
slots: Vec::new(),
}
@@ -233,8 +242,9 @@ impl WalSendersShared {
/// Update aggregated hot standy feedback. We just take min of valid xmins
/// and ts.
fn update_hs_feedback(&mut self) {
fn update_reply_feedback(&mut self) {
let mut agg = HotStandbyFeedback::empty();
let mut reply_agg = StandbyReply::empty();
for ws_state in self.slots.iter().flatten() {
if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
let hs_feedback = standby_feedback.hs_feedback;
@@ -257,9 +267,41 @@ impl WalSendersShared {
}
agg.ts = min(agg.ts, hs_feedback.ts);
}
let reply = standby_feedback.reply;
if reply.write_lsn != Lsn::INVALID {
if reply_agg.write_lsn != Lsn::INVALID {
reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
} else {
reply_agg.write_lsn = reply.write_lsn;
}
}
if reply.flush_lsn != Lsn::INVALID {
if reply_agg.flush_lsn != Lsn::INVALID {
reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
} else {
reply_agg.flush_lsn = reply.flush_lsn;
}
}
if reply.apply_lsn != Lsn::INVALID {
if reply_agg.apply_lsn != Lsn::INVALID {
reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
} else {
reply_agg.apply_lsn = reply.apply_lsn;
}
}
if reply.reply_ts != 0 {
if reply_agg.reply_ts != 0 {
reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
} else {
reply_agg.reply_ts = reply.reply_ts;
}
}
}
}
self.agg_hs_feedback = agg;
self.agg_standby_feedback = StandbyFeedback {
reply: reply_agg,
hs_feedback: agg,
};
}
/// Update aggregated pageserver feedback. LSNs (last_received,
@@ -764,8 +806,11 @@ mod tests {
fn test_hs_feedback_no_valid() {
let mut wss = WalSendersShared::new();
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
wss.update_hs_feedback();
assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
wss.update_reply_feedback();
assert_eq!(
wss.agg_standby_feedback.hs_feedback.xmin,
INVALID_FULL_TRANSACTION_ID
);
}
#[test]
@@ -774,8 +819,8 @@ mod tests {
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
push_feedback(&mut wss, hs_feedback(1, 42));
push_feedback(&mut wss, hs_feedback(1, 64));
wss.update_hs_feedback();
assert_eq!(wss.agg_hs_feedback.xmin, 42);
wss.update_reply_feedback();
assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
}
// form pageserver feedback with given last_record_lsn / tli size and the

View File

@@ -124,6 +124,7 @@ pub struct TimelineMemState {
pub remote_consistent_lsn: Lsn,
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
pub standby_flush_lsn: Lsn,
}
/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
@@ -148,6 +149,7 @@ where
peer_horizon_lsn: state.peer_horizon_lsn,
remote_consistent_lsn: state.remote_consistent_lsn,
proposer_uuid: state.proposer_uuid,
standby_flush_lsn: Lsn::INVALID,
},
pers: state,
}

View File

@@ -270,6 +270,7 @@ impl SharedState {
backup_lsn: self.sk.state.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
standby_flush_lsn: self.sk.state.inmem.standby_flush_lsn.0,
}
}
@@ -628,9 +629,10 @@ impl Timeline {
// if this is AppendResponse, fill in proper pageserver and hot
// standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
resp.hs_feedback = hs_feedback;
let (ps_feedback, standby_feedback) = self.walsenders.get_feedbacks();
resp.hs_feedback = standby_feedback.hs_feedback;
resp.pageserver_feedback = ps_feedback;
shared_state.sk.state.inmem.standby_flush_lsn = standby_feedback.reply.flush_lsn;
}
commit_lsn = shared_state.sk.state.inmem.commit_lsn;

View File

@@ -42,6 +42,7 @@ message SafekeeperTimelineInfo {
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
uint64 local_start_lsn = 9;
uint64 standby_flush_lsn = 14;
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// HTTP endpoint connection string

View File

@@ -734,6 +734,7 @@ mod tests {
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
standby_flush_lsn: 0,
})
}