Compare commits

...

1 Commits

Author SHA1 Message Date
Heikki Linnakangas
13a0abcf71 XXX: Add test that currently fails.
endpoint's log:

    PG:2024-06-30 20:49:08.805 GMT [1248196] ERROR:  could not find record while sending logically-decoded data: invalid info bits 0000 in WAL segment 000000010000000000000002, LSN 0/2000000, offset 0
    PG:2024-06-30 20:49:08.805 GMT [1248196] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
2024-06-30T20:49:10.616322Z  INFO received 2 termination signal
2024-07-01 00:07:12 +03:00

View File

@@ -414,6 +414,53 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
"select sum(somedata) from replication_example"
) == endpoint.safe_psql("select sum(somedata) from replication_example")
# Test compute start at a new WAL segment.
# Similar to issue https://github.com/neondatabase/neon/issues/5749, but start
# the new compute right at WAL segment boundary.
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_restart_streaming_at_segment_boundary(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
env.neon_cli.create_branch("init")
endpoint = env.endpoints.create_start("init")
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
cur = endpoint.connect().cursor()
cur.execute("create table t(key int, value text)")
cur.execute("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);")
cur.execute("insert into replication_example values (1, 2)")
cur.execute("create publication pub1 for table replication_example")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key, value text)")
vanilla_pg.safe_psql("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()
with endpoint.cursor() as cur:
cur.execute(f"select pg_switch_wal()")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop(mode="immediate")
endpoint.start()
cur = endpoint.connect().cursor()
# this should flush current wal page
cur.execute("insert into replication_example values (3, 4)")
vanilla_pg.start()
logical_replication_sync(vanilla_pg, endpoint)
assert vanilla_pg.safe_psql(
"select sum(somedata) from replication_example"
) == endpoint.safe_psql("select sum(somedata) from replication_example")
# Test that WAL redo works for fairly large records.
#