Compare commits

..

1 Commits

Author SHA1 Message Date
Heikki Linnakangas
13a0abcf71 XXX: Add test that currently fails.
endpoint's log:

    PG:2024-06-30 20:49:08.805 GMT [1248196] ERROR:  could not find record while sending logically-decoded data: invalid info bits 0000 in WAL segment 000000010000000000000002, LSN 0/2000000, offset 0
    PG:2024-06-30 20:49:08.805 GMT [1248196] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
2024-06-30T20:49:10.616322Z  INFO received 2 termination signal
2024-07-01 00:07:12 +03:00
3 changed files with 47 additions and 103 deletions

View File

@@ -581,8 +581,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
}
case PS_Connecting_PageStream:
{
PGresult *result;
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_PageStream");
if (PQstatus(shard->conn) == CONNECTION_BAD)
@@ -622,48 +620,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
}
}
}
result = PQgetResult(shard->conn);
if (!result)
{
CLEANUP_AND_DISCONNECT(shard);
neon_shard_log(shard_no, elevel,
"could not complete handshake: unexpected end of data");
return false;
}
switch (PQresultStatus(result))
{
/*
* We only expect COPY_BOTH, all other responses are indications of
* problems out of our control
*/
case PGRES_COPY_BOTH:
break;
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
CLEANUP_AND_DISCONNECT(shard);
neon_shard_log(shard_no, elevel,
"could not complete handshake: PageServer returned error: %s",
PQresultErrorMessage(result));
PQclear(result);
return false;
case PGRES_EMPTY_QUERY:
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_SINGLE_TUPLE:
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
CLEANUP_AND_DISCONNECT(shard);
neon_shard_log(shard_no, elevel,
"could not complete handshake: unexpected result type: %d",
PQresultStatus(result));
PQclear(result);
return false;
}
shard->state = PS_Connected;
/* fallthrough */

View File

@@ -414,6 +414,53 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
"select sum(somedata) from replication_example"
) == endpoint.safe_psql("select sum(somedata) from replication_example")
# Test compute start at a new WAL segment.
# Similar to issue https://github.com/neondatabase/neon/issues/5749, but start
# the new compute right at WAL segment boundary.
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_restart_streaming_at_segment_boundary(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
env.neon_cli.create_branch("init")
endpoint = env.endpoints.create_start("init")
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
cur = endpoint.connect().cursor()
cur.execute("create table t(key int, value text)")
cur.execute("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);")
cur.execute("insert into replication_example values (1, 2)")
cur.execute("create publication pub1 for table replication_example")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key, value text)")
vanilla_pg.safe_psql("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()
with endpoint.cursor() as cur:
cur.execute(f"select pg_switch_wal()")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop(mode="immediate")
endpoint.start()
cur = endpoint.connect().cursor()
# this should flush current wal page
cur.execute("insert into replication_example values (3, 4)")
vanilla_pg.start()
logical_replication_sync(vanilla_pg, endpoint)
assert vanilla_pg.safe_psql(
"select sum(somedata) from replication_example"
) == endpoint.safe_psql("select sum(somedata) from replication_example")
# Test that WAL redo works for fairly large records.
#

View File

@@ -1,59 +0,0 @@
from contextlib import closing
import pytest
from psycopg2.errors import QueryCanceled
"""
Test that we can handle broken pageservers correctly
"""
def test_pageserver_breaks_while_running(neon_simple_env):
env = neon_simple_env
ps = env.pageserver
ps_http = ps.http_client()
ps_http.is_testing_enabled_or_skip()
(tid, tlid) = env.neon_cli.create_tenant()
env.neon_cli.create_branch("test_config", tenant_id=tid)
env.pageserver.quiesce_tenants()
# We don't want to have any racy behaviour with autovacuum IOs
ep = env.endpoints.create_start(
"test_config",
tenant_id=tid,
config_lines=[
"autovacuum = off",
"shared_buffers = 128MB",
],
)
# tenant is still attached, no errors from PS
with closing(ep.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE test1 AS
SELECT id, sha256(id::text::bytea) payload
FROM generate_series(1, 1024::bigint) p(id);
"""
)
ps_http.tenant_detach(tid)
with conn.cursor() as cur:
cur.execute(
"""
SET statement_timeout = '1s';
"""
)
with pytest.raises(QueryCanceled):
# definitely uncached relation
cur.execute(
"""
SELECT count(*) FROM pg_rewrite;
"""
)
ep.stop()
ep.log_contains("""could not complete handshake: PageServer returned error: """)