mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
7 Commits
arpad/log_
...
problame/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5c4ea45a0 | ||
|
|
3d8ca0245c | ||
|
|
b401cec157 | ||
|
|
02aa6c8302 | ||
|
|
999f3900f2 | ||
|
|
d2e72f119f | ||
|
|
45f5dfc685 |
@@ -1195,6 +1195,8 @@ impl PageServerHandler {
|
||||
})
|
||||
.await?;
|
||||
|
||||
failpoint_support::pausable_failpoint!("pagestream_read_message:before_gc_cutoff_check", cancel).unwrap();
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_lsn = match Self::effective_request_lsn(
|
||||
&shard,
|
||||
|
||||
@@ -6628,12 +6628,12 @@ impl Timeline {
|
||||
let standby_horizon = self.standby_horizon.load();
|
||||
// Hold GC for the standby, but as a safety guard do it only within some
|
||||
// reasonable lag.
|
||||
if standby_horizon != Lsn::INVALID {
|
||||
if true && standby_horizon != Lsn::INVALID {
|
||||
if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) {
|
||||
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
|
||||
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
|
||||
new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff);
|
||||
trace!("holding off GC for standby apply LSN {}", standby_horizon);
|
||||
info!("holding off GC for standby apply LSN {}", standby_horizon);
|
||||
} else {
|
||||
warn!(
|
||||
"standby is lagging for more than {}MB, not holding gc for it",
|
||||
|
||||
@@ -750,7 +750,7 @@ impl ConnectionManagerState {
|
||||
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
trace!(
|
||||
info!(
|
||||
"safekeeper info update: standby_horizon(cutoff)={}",
|
||||
timeline_update.standby_horizon
|
||||
);
|
||||
|
||||
@@ -220,7 +220,7 @@ impl WalSenders {
|
||||
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
|
||||
let mut shared = self.mutex.lock();
|
||||
let slot = shared.get_slot_mut(id);
|
||||
debug!(
|
||||
info!(
|
||||
"Record standby reply: ts={} apply_lsn={}",
|
||||
reply.reply_ts, reply.apply_lsn
|
||||
);
|
||||
@@ -233,6 +233,7 @@ impl WalSenders {
|
||||
})
|
||||
}
|
||||
}
|
||||
shared.update_reply_feedback();
|
||||
}
|
||||
|
||||
/// Record hot standby feedback, update aggregated value.
|
||||
@@ -400,7 +401,10 @@ impl WalSendersShared {
|
||||
}
|
||||
}
|
||||
self.agg_standby_feedback = StandbyFeedback {
|
||||
reply: reply_agg,
|
||||
reply: {
|
||||
info!(prev=%self.agg_standby_feedback.reply.apply_lsn, new=%reply_agg.apply_lsn, "updating agg_standby_feedback apply_lsn");
|
||||
reply_agg
|
||||
},
|
||||
hs_feedback: agg,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -5631,21 +5631,30 @@ def tenant_get_shards(
|
||||
]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = Lsn(
|
||||
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = Lsn(
|
||||
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint, primary_cursor=None, secondary_cursor=None):
|
||||
if primary_cursor is not None:
|
||||
primary_cursor.execute("SELECT pg_current_wal_flush_lsn()")
|
||||
[res] = primary_cursor.fetchone()
|
||||
primary_lsn = Lsn(res)
|
||||
else:
|
||||
primary_lsn = Lsn(
|
||||
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
|
||||
)
|
||||
while True:
|
||||
if secondary_cursor is not None:
|
||||
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
[res] = secondary_cursor.fetchone()
|
||||
secondary_lsn = Lsn(res)
|
||||
else:
|
||||
secondary_lsn = Lsn(
|
||||
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def log_replica_lag(primary: Endpoint, secondary: Endpoint):
|
||||
last_replay_lsn = Lsn(
|
||||
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import contextlib
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
@@ -133,6 +137,9 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
tenant_conf = {
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "0 s",
|
||||
# this test is largely about PS GC behavior, we control it manually
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -163,6 +170,11 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
s_cur.execute("SHOW hot_standby_feedback")
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
assert res[0] == "off"
|
||||
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
@@ -198,6 +210,230 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
|
||||
if pause_apply:
|
||||
s_cur.execute("SELECT pg_wal_replay_resume()")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
# Wait for PS's view of standby horizon to catch up.
|
||||
# (When we switch to leases (LKB-88) we need to change this to watch the lease lsn move.)
|
||||
# (TODO: instead of checking impl details here, somehow assert that gc can delete layers now.
|
||||
# Tricky to do that without flakiness though.)
|
||||
# We already waited for replica to catch up, so, this timeout is strictly on
|
||||
# a few few in-memory only RPCs to propagate standby_horizon.
|
||||
timeout_secs = 10
|
||||
started_at = time.time()
|
||||
shards = tenant_get_shards(env, tenant_id, None)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
while True:
|
||||
secondary_apply_lsn = Lsn(
|
||||
secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
)
|
||||
standby_horizon_metric = client.get_metrics().query_one(
|
||||
"pageserver_standby_horizon",
|
||||
{
|
||||
"tenant_id": str(tenant_shard_id.tenant_id),
|
||||
"shard_id": str(tenant_shard_id.shard_index),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
)
|
||||
standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value))
|
||||
log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}")
|
||||
if secondary_apply_lsn == standby_horizon_at_ps:
|
||||
break
|
||||
if time.time() - started_at > timeout_secs:
|
||||
pytest.fail(f"standby_horizon didn't propagate within {timeout_secs=}, this is holding up gc on secondary")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def test_hot_standby_gc_with_inflight_requests(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_conf = {
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "0 s",
|
||||
# this test is largely about PS GC behavior, we control it manually
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
timeline_id = env.initial_timeline
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
with contextlib.ExitStack() as stack:
|
||||
executor = stack.enter_context(concurrent.futures.ThreadPoolExecutor(max_workers=2))
|
||||
primary = stack.enter_context(env.endpoints.create_start( branch_name="main", endpoint_id="primary"))
|
||||
secondary = stack.enter_context(env.endpoints.new_replica_start(
|
||||
origin=primary,
|
||||
endpoint_id="secondary",
|
||||
# Protocol version 2 was introduced to fix the issue
|
||||
# that this test exercises. With protocol version 1 it
|
||||
# fails.
|
||||
config_lines=["neon.protocol_version=2"],
|
||||
))
|
||||
|
||||
|
||||
p_cur = primary.connect().cursor()
|
||||
s_cur = secondary.connect().cursor()
|
||||
s_cur2 = secondary.connect().cursor()
|
||||
|
||||
p_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
p_cur.execute("CREATE TABLE test (id int primary key) WITH (autovacuum_enabled=false)")
|
||||
|
||||
# helper
|
||||
def replay_lsn():
|
||||
s_cur2.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
[replay_lsn] = s_cur2.fetchone()
|
||||
return Lsn(replay_lsn)
|
||||
|
||||
#
|
||||
# create initial set of data the standby can query
|
||||
#
|
||||
p_cur.execute("INSERT INTO test SELECT generate_series(1, 10000) AS g")
|
||||
|
||||
#
|
||||
# wait until standby is caught up
|
||||
#
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
|
||||
#
|
||||
# clear standby cache so that the select we
|
||||
# do in make_replica_send_getpage() below actually
|
||||
# sends getpage requests.
|
||||
#
|
||||
secondary.clear_buffers(cursor=s_cur)
|
||||
|
||||
#
|
||||
# Even a simple SELECT pg_last_wal_replay_lsn() needs to read some pages.
|
||||
# Fault them in now so that s_cur2 will be unaffected by
|
||||
# the pausing of getpage request handling below.
|
||||
# (This was quite tricky to debug; if something breaks in the future and
|
||||
# you need to debug this test, try printing backend IDs and correlating them
|
||||
# with "slow getpage" logs in compute.log)
|
||||
# (All of this pain would go away if we could scope the failpoint to
|
||||
# individual backends' getpage requests.
|
||||
# TODO: add such capability in this PR, it's not much work and the failpoint
|
||||
# is too high overhead anyway.
|
||||
#
|
||||
s_cur2.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
|
||||
#
|
||||
# Stop wal replay on secondary.
|
||||
#
|
||||
s_cur2.execute("SELECT pg_wal_replay_pause()")
|
||||
|
||||
#
|
||||
# Advance lsn on primary, we'll use it later to advance lsn on secondary
|
||||
# when we resume replay on secondary.
|
||||
#
|
||||
p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g")
|
||||
|
||||
#
|
||||
# Block any more getpage requests really early, before we get the RCU read
|
||||
# for applied_gc_cutoff_lsn.
|
||||
# This simulates delayed requests.
|
||||
#
|
||||
shards = tenant_get_shards(env, tenant_id, None)
|
||||
for _, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
client.configure_failpoints(
|
||||
(
|
||||
"pagestream_read_message:before_gc_cutoff_check",
|
||||
f"pause",
|
||||
)
|
||||
)
|
||||
|
||||
#
|
||||
# on the secondary, kick off a bunch of getpage requests
|
||||
# at the current apply_lsn; they will hit the failpoint,
|
||||
# simulating that they're stuck somewhere in the network
|
||||
#
|
||||
def make_replica_send_getpage():
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
|
||||
getpage_requests_task = executor.submit(make_replica_send_getpage)
|
||||
|
||||
#
|
||||
# wait until the requests have hit the failpoint
|
||||
# TODO: use something not timing dependent here
|
||||
#
|
||||
time.sleep(5)
|
||||
|
||||
#
|
||||
# advance apply_lsn by resuming replay temporarily
|
||||
#
|
||||
submitted_lsn = replay_lsn()
|
||||
log.info(f"submitted getpages with request_lsn={submitted_lsn}")
|
||||
log.info("resuming wal replay")
|
||||
s_cur2.execute("SELECT pg_wal_replay_resume()")
|
||||
log.info("waiting for secondary to catch up")
|
||||
while True:
|
||||
current_replay_lsn = replay_lsn()
|
||||
log.info(f"{current_replay_lsn=} {submitted_lsn}")
|
||||
if current_replay_lsn > submitted_lsn + 8192:
|
||||
break
|
||||
time.sleep(1)
|
||||
log.info("pausing wal replay")
|
||||
s_cur2.execute("SELECT pg_wal_replay_pause()")
|
||||
|
||||
#
|
||||
# secondary now is at a higher apply_lsn
|
||||
# wait for it to propagate into standby_horizon
|
||||
#
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
# wait for standby horizon to catch up
|
||||
while True:
|
||||
metrics = client.get_metrics()
|
||||
sample = metrics.query_one("pageserver_standby_horizon", {"tenant_id": str(tenant_shard_id.tenant_id), "shard_id": str(tenant_shard_id.shard_index), "timeline_id": str(timeline_id)})
|
||||
current_standby_horizon = Lsn(int(sample.value))
|
||||
log.info(f"{current_standby_horizon}")
|
||||
current_replay_lsn = replay_lsn()
|
||||
log.info(f"{current_standby_horizon=} {current_replay_lsn=}")
|
||||
if current_standby_horizon == current_replay_lsn:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
#
|
||||
# now trigger gc; it will cutoff at standby_horizon, i.e.,
|
||||
# at the advanced apply_lsn, above the delayed requests' request_lsn
|
||||
#
|
||||
log.info("do gc")
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
client.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
client.timeline_compact(tenant_shard_id, timeline_id)
|
||||
gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0)
|
||||
log.info(f"{gc_status=}")
|
||||
detail = client.timeline_detail(tenant_shard_id, timeline_id)
|
||||
log.info(f"{detail=}")
|
||||
assert Lsn(detail["applied_gc_cutoff_lsn"]) == current_standby_horizon
|
||||
|
||||
#
|
||||
# unblock the requests that were delayed
|
||||
# until we fix the bug, they will fail because their request_lsn is below standby horizon
|
||||
#
|
||||
log.info("unblock requests")
|
||||
for _, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
client.configure_failpoints(
|
||||
(
|
||||
"pagestream_read_message:before_gc_cutoff_check",
|
||||
f"off",
|
||||
)
|
||||
)
|
||||
log.info("waiting for select to complete")
|
||||
# until the bug is fixed, the blocked getpage requests will fail with the error below
|
||||
expect_fail = f"requested at {submitted_lsn} gc cutoff {current_standby_horizon}"
|
||||
log.info(f"until the bug is fixed, we expect task1 to fail with a postgres IO error because of failed getpage, witht he following messages: {expect_fail}")
|
||||
getpage_requests_task.result()
|
||||
log.info("the delayed requests completed without errors, wohoo, the bug is fixed")
|
||||
|
||||
|
||||
def run_pgbench(connstr: str, pg_bin: PgBin):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
Reference in New Issue
Block a user