mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
Add logical replication test to exercise snapfiles (#8364)
This commit is contained in:
@@ -262,3 +262,85 @@ def test_publisher_restart(
|
||||
sub_workload.terminate()
|
||||
finally:
|
||||
pub_workload.terminate()
|
||||
|
||||
|
||||
@pytest.mark.remote_cluster
|
||||
@pytest.mark.timeout(2 * 60 * 60)
|
||||
def test_snap_files(
|
||||
pg_bin: PgBin,
|
||||
benchmark_project_pub: NeonApiEndpoint,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
):
|
||||
"""
|
||||
Creates a node with a replication slot. Generates pgbench into the replication slot,
|
||||
then runs pgbench inserts while generating large numbers of snapfiles. Then restarts
|
||||
the node and tries to peek the replication changes.
|
||||
"""
|
||||
test_duration_min = 60
|
||||
test_interval_min = 5
|
||||
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
|
||||
|
||||
env = benchmark_project_pub.pgbench_env
|
||||
connstr = benchmark_project_pub.connstr
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env)
|
||||
|
||||
with psycopg2.connect(connstr) as conn:
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
|
||||
is_super = cur.fetchall()[0]
|
||||
assert is_super, "This benchmark won't work if we don't have superuser"
|
||||
|
||||
conn = psycopg2.connect(connstr)
|
||||
conn.autocommit = True
|
||||
cur = conn.cursor()
|
||||
cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1")
|
||||
|
||||
with psycopg2.connect(connstr) as conn:
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT pg_reload_conf()")
|
||||
|
||||
with psycopg2.connect(connstr) as conn:
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_replication_slots
|
||||
WHERE slot_name = 'slotter'
|
||||
) THEN
|
||||
PERFORM pg_drop_replication_slot('slotter');
|
||||
END IF;
|
||||
END $$;
|
||||
"""
|
||||
)
|
||||
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
|
||||
|
||||
workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
|
||||
try:
|
||||
start = time.time()
|
||||
prev_measurement = time.time()
|
||||
while time.time() - start < test_duration_min * 60:
|
||||
with psycopg2.connect(connstr) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
|
||||
)
|
||||
check_pgbench_still_running(workload)
|
||||
cur.execute(
|
||||
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
|
||||
)
|
||||
|
||||
# Measure storage
|
||||
if time.time() - prev_measurement > test_interval_min * 60:
|
||||
storage = benchmark_project_pub.get_synthetic_storage_size()
|
||||
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
|
||||
prev_measurement = time.time()
|
||||
time.sleep(test_interval_min * 60 / 3)
|
||||
|
||||
finally:
|
||||
workload.terminate()
|
||||
|
||||
Reference in New Issue
Block a user