mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
extend test_hot_standby_gc to demonstrate that it doesn't work without hot_standby_feedback
The bug described in - https://github.com/neondatabase/neon/issues/12168 is not covered by `test_hot_standby_gc[pause_apply=True]`. The reason is that the first standby reply _does_ trigger an update of the aggregate, but subsequent standby replies don't. Only hot_standby_feedback messaages do. In the test, that first reply sets standby_horizon to a low value X, thereby inhibiting gc, thereby making the test pass. However, the aggregate standby_horizon pushed by SKs remains at that low value X from here on. Actually, uh oh, this means standby_horizon is _kept_ at X!? And thus, if the PS receives re-push of low value X before the next GC, it will continue to limit GC to below X, until we hit the hard-coded `MAX_ALLOWED_STANDBY_LAG`. This is worse than I thought: it means that the current code _is_ preventing GC _even_ for standbys _without_ `hot_standby_feedback`, if those standbys shut down before 10GiB of WAL has been written since they were launched. Turn some debug logging into info logging to make the case. Sigh. refs - https://github.com/neondatabase/neon/issues/12168
This commit is contained in:
@@ -6633,7 +6633,7 @@ impl Timeline {
|
||||
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
|
||||
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
|
||||
new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff);
|
||||
trace!("holding off GC for standby apply LSN {}", standby_horizon);
|
||||
info!("holding off GC for standby apply LSN {}", standby_horizon);
|
||||
} else {
|
||||
warn!(
|
||||
"standby is lagging for more than {}MB, not holding gc for it",
|
||||
|
||||
@@ -750,7 +750,7 @@ impl ConnectionManagerState {
|
||||
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
trace!(
|
||||
info!(
|
||||
"safekeeper info update: standby_horizon(cutoff)={}",
|
||||
timeline_update.standby_horizon
|
||||
);
|
||||
|
||||
@@ -220,7 +220,7 @@ impl WalSenders {
|
||||
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
|
||||
let mut shared = self.mutex.lock();
|
||||
let slot = shared.get_slot_mut(id);
|
||||
debug!(
|
||||
info!(
|
||||
"Record standby reply: ts={} apply_lsn={}",
|
||||
reply.reply_ts, reply.apply_lsn
|
||||
);
|
||||
@@ -400,7 +400,10 @@ impl WalSendersShared {
|
||||
}
|
||||
}
|
||||
self.agg_standby_feedback = StandbyFeedback {
|
||||
reply: reply_agg,
|
||||
reply: {
|
||||
info!(prev=%self.agg_standby_feedback.reply.apply_lsn, new=%reply_agg.apply_lsn, "updating agg_standby_feedback apply_lsn");
|
||||
reply_agg
|
||||
},
|
||||
hs_feedback: agg,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import time
|
||||
from functools import partial
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
@@ -163,6 +164,11 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
s_cur.execute("SHOW hot_standby_feedback")
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
assert res[0] == "off"
|
||||
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
@@ -198,6 +204,70 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
|
||||
if pause_apply:
|
||||
s_cur.execute("SELECT pg_wal_replay_resume()")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
# Wait for standby horizon to catch up, then gc again.
|
||||
# (When we switch to leases (LKB-88) we need to wait for leases to expir.e)
|
||||
shards = tenant_get_shards(env, tenant_id, None)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
while True:
|
||||
secondary_apply_lsn = Lsn(
|
||||
secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
)
|
||||
standby_horizon_metric = client.get_metrics().query_one(
|
||||
"pageserver_standby_horizon",
|
||||
{
|
||||
"tenant_id": str(tenant_shard_id.tenant_id),
|
||||
"shard_id": str(tenant_shard_id.shard_index),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
)
|
||||
standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value))
|
||||
log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}")
|
||||
if secondary_apply_lsn == standby_horizon_at_ps:
|
||||
break
|
||||
time.sleep(1)
|
||||
client.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
client.timeline_compact(tenant_shard_id, timeline_id)
|
||||
client.timeline_gc(tenant_shard_id, timeline_id, 0)
|
||||
|
||||
# Clear the cache in the standby, so that when we
|
||||
# re-execute the query, it will make GetPage
|
||||
# requests. This does not clear the last-written LSN cache
|
||||
# so we still remember the LSNs of the pages.
|
||||
secondary.clear_buffers(cursor=s_cur)
|
||||
|
||||
if pause_apply:
|
||||
s_cur.execute("SELECT pg_wal_replay_pause()")
|
||||
|
||||
# Do other stuff on the primary, to advance the WAL
|
||||
p_cur.execute("CREATE TABLE test3 AS SELECT generate_series(1, 1000000) AS g")
|
||||
|
||||
# Run GC. The PITR interval is very small, so this advances the GC cutoff LSN
|
||||
# very close to the primary's current insert LSN.
|
||||
shards = tenant_get_shards(env, tenant_id, None)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
client.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
client.timeline_compact(tenant_shard_id, timeline_id)
|
||||
client.timeline_gc(tenant_shard_id, timeline_id, 0)
|
||||
|
||||
# Re-execute the query. The GetPage requests that this
|
||||
# generates use old not_modified_since LSNs, older than
|
||||
# the GC cutoff, but new request LSNs. (In protocol
|
||||
# version 1 there was only one LSN, and this failed.)
|
||||
log_replica_lag(primary, secondary)
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
log_replica_lag(primary, secondary)
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
|
||||
|
||||
def run_pgbench(connstr: str, pg_bin: PgBin):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
Reference in New Issue
Block a user