mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Fix safekeeper START_REPLICATION (term=n).
It was giving WAL only up to commit_lsn instead of flush_lsn, so recovery of
uncommitted WAL since cdb08f03 hanged. Add test for this.
This commit is contained in:
@@ -391,15 +391,8 @@ impl SafekeeperPostgresHandler {
|
||||
// application_name: give only committed WAL (used by pageserver) or all
|
||||
// existing WAL (up to flush_lsn, used by walproposer or peer recovery).
|
||||
// The second case is always driven by a consensus leader which term
|
||||
// must generally be also supplied. However we're sloppy to do this in
|
||||
// walproposer recovery which will be removed soon. So TODO is to make
|
||||
// it not Option'al then.
|
||||
//
|
||||
// Fetching WAL without term in recovery creates a small risk of this
|
||||
// WAL getting concurrently garbaged if another compute rises which
|
||||
// collects majority and starts fixing log on this safekeeper itself.
|
||||
// That's ok as (old) proposer will never be able to commit such WAL.
|
||||
let end_watch = if self.is_walproposer_recovery() {
|
||||
// must be supplied.
|
||||
let end_watch = if term.is_some() {
|
||||
EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
|
||||
} else {
|
||||
EndWatch::Commit(tli.get_commit_lsn_watch_rx())
|
||||
|
||||
@@ -475,6 +475,46 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_unavailability(env, endpoint))
|
||||
|
||||
|
||||
async def run_recovery_uncommitted(env: NeonEnv):
|
||||
(sk1, sk2, _) = env.safekeepers
|
||||
|
||||
env.neon_cli.create_branch("test_recovery_uncommitted")
|
||||
ep = env.endpoints.create_start("test_recovery_uncommitted")
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
|
||||
|
||||
# insert with only one safekeeper up to create tail of flushed but not committed WAL
|
||||
sk1.stop()
|
||||
sk2.stop()
|
||||
conn = await ep.connect_async()
|
||||
# query should hang, so execute in separate task
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# it must still be not finished
|
||||
assert not bg_query.done()
|
||||
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
|
||||
ep.stop_and_destroy()
|
||||
|
||||
# Start one of sks to make quorum online plus compute and ensure they can
|
||||
# sync.
|
||||
sk2.start()
|
||||
ep = env.endpoints.create_start(
|
||||
"test_recovery_uncommitted",
|
||||
)
|
||||
ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'")
|
||||
|
||||
|
||||
# Test pulling uncommitted WAL (up to flush_lsn) during recovery.
|
||||
def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_recovery_uncommitted(env))
|
||||
|
||||
|
||||
@dataclass
|
||||
class RaceConditionTest:
|
||||
iteration: int
|
||||
|
||||
Reference in New Issue
Block a user