diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 44f14f8c7e..70590a0f95 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -391,15 +391,8 @@ impl SafekeeperPostgresHandler { // application_name: give only committed WAL (used by pageserver) or all // existing WAL (up to flush_lsn, used by walproposer or peer recovery). // The second case is always driven by a consensus leader which term - // must generally be also supplied. However we're sloppy to do this in - // walproposer recovery which will be removed soon. So TODO is to make - // it not Option'al then. - // - // Fetching WAL without term in recovery creates a small risk of this - // WAL getting concurrently garbaged if another compute rises which - // collects majority and starts fixing log on this safekeeper itself. - // That's ok as (old) proposer will never be able to commit such WAL. - let end_watch = if self.is_walproposer_recovery() { + // must be supplied. + let end_watch = if term.is_some() { EndWatch::Flush(tli.get_term_flush_lsn_watch_rx()) } else { EndWatch::Commit(tli.get_commit_lsn_watch_rx()) diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index feab7e605b..77d67cd63a 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -475,6 +475,46 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder): asyncio.run(run_unavailability(env, endpoint)) +async def run_recovery_uncommitted(env: NeonEnv): + (sk1, sk2, _) = env.safekeepers + + env.neon_cli.create_branch("test_recovery_uncommitted") + ep = env.endpoints.create_start("test_recovery_uncommitted") + ep.safe_psql("create table t(key int, value text)") + ep.safe_psql("insert into t select generate_series(1, 100), 'payload'") + + # insert with only one safekeeper up to create tail of flushed but not committed WAL + sk1.stop() + sk2.stop() + conn = await ep.connect_async() + # query should hang, so execute in separate task + bg_query = asyncio.create_task( + conn.execute("insert into t select generate_series(1, 2000), 'payload'") + ) + sleep_sec = 2 + await asyncio.sleep(sleep_sec) + # it must still be not finished + assert not bg_query.done() + # note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers. + ep.stop_and_destroy() + + # Start one of sks to make quorum online plus compute and ensure they can + # sync. + sk2.start() + ep = env.endpoints.create_start( + "test_recovery_uncommitted", + ) + ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'") + + +# Test pulling uncommitted WAL (up to flush_lsn) during recovery. +def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + asyncio.run(run_recovery_uncommitted(env)) + + @dataclass class RaceConditionTest: iteration: int