Propagate standby apply LSN to pageserver to hold off GC.

To avoid pageserver gc'ing data needed by standby, propagate standby apply LSN
through standby -> safekeeper -> broker -> pageserver flow and hold off GC for
it. Iteration of GC resets the value to remove the horizon when standby goes
away -- pushes are assumed to happen at least once between gc iterations. As a
safety guard max allowed lag compared to normal GC horizon is hardcoded as 10GB.
Add test for the feature.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Arseny Sher
2024-05-21 14:32:29 +03:00
committed by Arseny Sher
parent 4ce6e2d2fc
commit 478cc37a70
12 changed files with 169 additions and 32 deletions

View File

@@ -50,6 +50,9 @@ pub struct SkTimelineInfo {
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub http_connstr: Option<String>,
// Minimum of all active RO replicas flush LSN
#[serde(default = "lsn_invalid")]
pub standby_horizon: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -269,6 +269,8 @@ pub struct Timeline {
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
pub(crate) standby_horizon: AtomicLsn,
// WAL redo manager. `None` only for broken tenants.
walredo_mgr: Option<Arc<super::WalRedoManager>>,
@@ -2279,6 +2281,8 @@ impl Timeline {
compaction_lock: tokio::sync::Mutex::default(),
gc_lock: tokio::sync::Mutex::default(),
standby_horizon: AtomicLsn::new(0),
timeline_get_throttle: resources.timeline_get_throttle,
aux_files: tokio::sync::Mutex::new(AuxFilesState {
@@ -4844,7 +4848,29 @@ impl Timeline {
(horizon_cutoff, pitr_cutoff, retain_lsns)
};
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
let standby_horizon = self.standby_horizon.load();
// Hold GC for the standby, but as a safety guard do it only within some
// reasonable lag.
if standby_horizon != Lsn::INVALID {
if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) {
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff);
trace!("holding off GC for standby apply LSN {}", standby_horizon);
} else {
warn!(
"standby is lagging for more than {}MB, not holding gc for it",
MAX_ALLOWED_STANDBY_LAG / 1024 / 1024
)
}
}
}
// Reset standby horizon to ignore it if it is not updated till next GC.
// It is an easy way to unset it when standby disappears without adding
// more conf options.
self.standby_horizon.store(Lsn::INVALID);
let res = self
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)

View File

@@ -705,6 +705,7 @@ impl ConnectionManagerState {
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
@@ -725,6 +726,17 @@ impl ConnectionManagerState {
WALRECEIVER_BROKER_UPDATES.inc();
trace!(
"safekeeper info update: standby_horizon(cutoff)={}",
timeline_update.standby_horizon
);
if timeline_update.standby_horizon != 0 {
// ignore reports from safekeepers not connected to replicas
self.timeline
.standby_horizon
.store(Lsn(timeline_update.standby_horizon));
}
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
let old_entry = self.wal_stream_candidates.insert(
new_safekeeper_id,
@@ -1094,6 +1106,7 @@ mod tests {
commit_lsn,
safekeeper_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,
standby_horizon: 0,
},
latest_update,
}

View File

@@ -186,6 +186,7 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<
commit_lsn: sk_info.commit_lsn,
safekeeper_connstr: sk_info.safekeeper_connstr,
availability_zone: sk_info.availability_zone,
standby_horizon: 0,
};
// note this is a blocking call

View File

@@ -350,6 +350,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,
standby_horizon: sk_info.standby_horizon.0,
};
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;

View File

@@ -23,7 +23,7 @@ use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::pageserver_feedback::PageserverFeedback;
use std::cmp::min;
use std::cmp::{max, min};
use std::net::SocketAddr;
use std::str;
use std::sync::Arc;
@@ -85,8 +85,17 @@ impl StandbyReply {
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyFeedback {
reply: StandbyReply,
hs_feedback: HotStandbyFeedback,
pub reply: StandbyReply,
pub hs_feedback: HotStandbyFeedback,
}
impl StandbyFeedback {
pub fn empty() -> Self {
StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// WalSenders registry. Timeline holds it (wrapped in Arc).
@@ -162,8 +171,8 @@ impl WalSenders {
}
/// Get aggregated hot standby feedback (we send it to compute).
pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
self.mutex.lock().agg_hs_feedback
pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
self.mutex.lock().agg_standby_feedback
}
/// Record new pageserver feedback, update aggregated values.
@@ -184,6 +193,10 @@ impl WalSenders {
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
let mut shared = self.mutex.lock();
let slot = shared.get_slot_mut(id);
debug!(
"Record standby reply: ts={} apply_lsn={}",
reply.reply_ts, reply.apply_lsn
);
match &mut slot.feedback {
ReplicationFeedback::Standby(sf) => sf.reply = *reply,
ReplicationFeedback::Pageserver(_) => {
@@ -208,7 +221,7 @@ impl WalSenders {
})
}
}
shared.update_hs_feedback();
shared.update_reply_feedback();
}
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
@@ -226,13 +239,13 @@ impl WalSenders {
fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
shared.update_hs_feedback();
shared.update_reply_feedback();
}
}
struct WalSendersShared {
// aggregated over all walsenders value
agg_hs_feedback: HotStandbyFeedback,
agg_standby_feedback: StandbyFeedback,
// last feedback ever received from any pageserver, empty if none
last_ps_feedback: PageserverFeedback,
// total counter of pageserver feedbacks received
@@ -243,7 +256,7 @@ struct WalSendersShared {
impl WalSendersShared {
fn new() -> Self {
WalSendersShared {
agg_hs_feedback: HotStandbyFeedback::empty(),
agg_standby_feedback: StandbyFeedback::empty(),
last_ps_feedback: PageserverFeedback::empty(),
ps_feedback_counter: 0,
slots: Vec::new(),
@@ -260,10 +273,11 @@ impl WalSendersShared {
self.slots[id].as_mut().expect("walsender doesn't exist")
}
/// Update aggregated hot standy feedback. We just take min of valid xmins
/// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
/// and ts.
fn update_hs_feedback(&mut self) {
fn update_reply_feedback(&mut self) {
let mut agg = HotStandbyFeedback::empty();
let mut reply_agg = StandbyReply::empty();
for ws_state in self.slots.iter().flatten() {
if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
let hs_feedback = standby_feedback.hs_feedback;
@@ -276,7 +290,7 @@ impl WalSendersShared {
} else {
agg.xmin = hs_feedback.xmin;
}
agg.ts = min(agg.ts, hs_feedback.ts);
agg.ts = max(agg.ts, hs_feedback.ts);
}
if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
@@ -284,11 +298,43 @@ impl WalSendersShared {
} else {
agg.catalog_xmin = hs_feedback.catalog_xmin;
}
agg.ts = min(agg.ts, hs_feedback.ts);
agg.ts = max(agg.ts, hs_feedback.ts);
}
let reply = standby_feedback.reply;
if reply.write_lsn != Lsn::INVALID {
if reply_agg.write_lsn != Lsn::INVALID {
reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
} else {
reply_agg.write_lsn = reply.write_lsn;
}
}
if reply.flush_lsn != Lsn::INVALID {
if reply_agg.flush_lsn != Lsn::INVALID {
reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
} else {
reply_agg.flush_lsn = reply.flush_lsn;
}
}
if reply.apply_lsn != Lsn::INVALID {
if reply_agg.apply_lsn != Lsn::INVALID {
reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
} else {
reply_agg.apply_lsn = reply.apply_lsn;
}
}
if reply.reply_ts != 0 {
if reply_agg.reply_ts != 0 {
reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
} else {
reply_agg.reply_ts = reply.reply_ts;
}
}
}
}
self.agg_hs_feedback = agg;
self.agg_standby_feedback = StandbyFeedback {
reply: reply_agg,
hs_feedback: agg,
};
}
}
@@ -793,8 +839,11 @@ mod tests {
fn test_hs_feedback_no_valid() {
let mut wss = WalSendersShared::new();
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
wss.update_hs_feedback();
assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
wss.update_reply_feedback();
assert_eq!(
wss.agg_standby_feedback.hs_feedback.xmin,
INVALID_FULL_TRANSACTION_ID
);
}
#[test]
@@ -803,7 +852,7 @@ mod tests {
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
push_feedback(&mut wss, hs_feedback(1, 42));
push_feedback(&mut wss, hs_feedback(1, 64));
wss.update_hs_feedback();
assert_eq!(wss.agg_hs_feedback.xmin, 42);
wss.update_reply_feedback();
assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
}
}

View File

@@ -248,6 +248,7 @@ impl SharedState {
&self,
ttid: &TenantTimelineId,
conf: &SafeKeeperConf,
standby_apply_lsn: Lsn,
) -> SafekeeperTimelineInfo {
SafekeeperTimelineInfo {
safekeeper_id: conf.my_id.0,
@@ -270,6 +271,7 @@ impl SharedState {
backup_lsn: self.sk.state.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
standby_horizon: standby_apply_lsn.0,
}
}
@@ -663,7 +665,7 @@ impl Timeline {
// if this is AppendResponse, fill in proper hot standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
resp.hs_feedback = self.walsenders.get_hotstandby();
resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
}
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
@@ -716,7 +718,8 @@ impl Timeline {
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state().await;
shared_state.get_safekeeper_info(&self.ttid, conf)
let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
}
/// Update timeline state with peer safekeeper data.

View File

@@ -147,6 +147,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
standby_horizon: 0,
};
counter += 1;
yield info;

View File

@@ -42,6 +42,7 @@ message SafekeeperTimelineInfo {
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
uint64 local_start_lsn = 9;
uint64 standby_horizon = 14;
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// HTTP endpoint connection string
@@ -105,4 +106,6 @@ message SafekeeperDiscoveryResponse {
string safekeeper_connstr = 4;
// Availability zone of a safekeeper.
optional string availability_zone = 5;
// Replica apply LSN
uint64 standby_horizon = 6;
}

View File

@@ -736,6 +736,7 @@ mod tests {
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
standby_horizon: 0,
})
}

View File

@@ -4288,6 +4288,17 @@ def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
time.sleep(1)
def log_replica_lag(primary: Endpoint, secondary: Endpoint):
last_replay_lsn = Lsn(
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
)
primary_lsn = Lsn(
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
)
lag = primary_lsn - last_replay_lsn
log.info(f"primary_lsn={primary_lsn}, replay_lsn={last_replay_lsn}, lag={lag}")
def wait_for_last_flush_lsn(
env: NeonEnv,
endpoint: Endpoint,

View File

@@ -1,9 +1,20 @@
import os
import re
import threading
import time
from functools import partial
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, tenant_get_shards, wait_replica_caughtup
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
log_replica_lag,
tenant_get_shards,
wait_replica_caughtup,
)
from fixtures.utils import wait_until
# Check for corrupted WAL messages which might otherwise go unnoticed if
@@ -104,19 +115,28 @@ def test_2_replicas_start(neon_simple_env: NeonEnv):
wait_replica_caughtup(primary, secondary2)
# We had an issue that a standby server made GetPage requests with an
# old LSN, based on the last-written LSN cache, to avoid waits in the
# pageserver. However, requesting a page with a very old LSN, such
# that the GC horizon has already advanced past it, results in an
# error from the pageserver:
# "Bad request: tried to request a page version that was garbage collected"
# Test two different scenarios related to gc of data needed by hot standby.
#
# To avoid that, the compute<-> pageserver protocol was updated so
# that that the standby now sends two LSNs, the old last-written LSN
# and the current replay LSN.
# When pause_apply is False, standby is mostly caught up with the primary.
# However, in compute <-> pageserver protocol version 1 only one LSN had been
# sent to the pageserver in page request, and to avoid waits in the pageserver
# it was last-written LSN cache value. If page hasn't been updated for a long
# time that resulted in an error from the pageserver: "Bad request: tried to
# request a page version that was garbage collected". For primary this wasn't a
# problem because pageserver always bumped LSN to the newest one; for standy
# that would be incorrect since we might get page fresher then apply LSN. Hence,
# in protocol version v2 two LSNs were introduced: main request_lsn (apply LSN
# in case of standby) and not_modified_since which could be used as an
# optimization to avoid waiting.
#
# https://github.com/neondatabase/neon/issues/6211
def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder):
#
# When pause_apply is True we model standby lagging behind primary (e.g. due to
# high max_standby_streaming_delay). To prevent pageserver from removing data
# still needed by the standby apply LSN is propagated in standby -> safekeepers
# -> broker -> pageserver flow so that pageserver could hold off gc for it.
@pytest.mark.parametrize("pause_apply", [False, True])
def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
tenant_conf = {
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
@@ -160,6 +180,9 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder):
# so we still remember the LSNs of the pages.
s_cur.execute("SELECT clear_buffer_cache()")
if pause_apply:
s_cur.execute("SELECT pg_wal_replay_pause()")
# Do other stuff on the primary, to advance the WAL
p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g")
@@ -176,6 +199,8 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder):
# generates use old not_modified_since LSNs, older than
# the GC cutoff, but new request LSNs. (In protocol
# version 1 there was only one LSN, and this failed.)
log_replica_lag(primary, secondary)
s_cur.execute("SELECT COUNT(*) FROM test")
log_replica_lag(primary, secondary)
res = s_cur.fetchone()
assert res[0] == 10000