mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 04:52:55 +00:00
Remove the replication slot in test_snap_files at the end of the test
Analysis of the LR benchmarking tests indicates that in the duration of test_subscriber_lag, a leftover 'slotter' replication slot can lead to retained WAL growing on the publisher. This replication slot is not used by any subscriber. The only purpose of the slot is to generate snapshot files for the puspose of test_snap_files. Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import psycopg2
|
||||
@@ -18,7 +20,7 @@ if TYPE_CHECKING:
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.neon_api import NeonApiEndpoint
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres
|
||||
from psycopg2.extensions import cursor
|
||||
from psycopg2.extensions import connection, cursor
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@@ -292,6 +294,48 @@ def test_snap_files(
|
||||
then runs pgbench inserts while generating large numbers of snapfiles. Then restarts
|
||||
the node and tries to peek the replication changes.
|
||||
"""
|
||||
|
||||
@contextmanager
|
||||
def replication_slot(conn: connection, slot_name: str) -> Iterator[None]:
|
||||
"""
|
||||
Make sure that the replication slot doesn't outlive the test. Normally
|
||||
we wouldn't want this behavior, but since the test creates and drops
|
||||
the replication slot, we do.
|
||||
|
||||
We've had problems in the past where this slot sticking around caused
|
||||
issues with the publisher retaining WAL during the execution of the
|
||||
other benchmarks in this suite.
|
||||
"""
|
||||
|
||||
def __drop_replication_slot(c: cursor) -> None:
|
||||
c.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_replication_slots
|
||||
WHERE slot_name = %(slot_name)s
|
||||
) THEN
|
||||
PERFORM pg_drop_replication_slot(%(slot_name)s);
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
{"slot_name": slot_name},
|
||||
)
|
||||
|
||||
with conn.cursor() as c:
|
||||
__drop_replication_slot(c)
|
||||
c.execute(
|
||||
"SELECT pg_create_logical_replication_slot(%(slot_name)s, 'test_decoding')",
|
||||
{"slot_name": slot_name},
|
||||
)
|
||||
|
||||
yield
|
||||
|
||||
with conn.cursor() as c:
|
||||
__drop_replication_slot(c)
|
||||
|
||||
test_duration_min = 60
|
||||
test_interval_min = 5
|
||||
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
|
||||
@@ -314,48 +358,35 @@ def test_snap_files(
|
||||
conn = psycopg2.connect(connstr)
|
||||
conn.autocommit = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_replication_slots
|
||||
WHERE slot_name = 'slotter'
|
||||
) THEN
|
||||
PERFORM pg_drop_replication_slot('slotter');
|
||||
END IF;
|
||||
END $$;
|
||||
"""
|
||||
with replication_slot(conn, "slotter"):
|
||||
workload = pg_bin.run_nonblocking(
|
||||
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env
|
||||
)
|
||||
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
|
||||
try:
|
||||
start = time.time()
|
||||
prev_measurement = time.time()
|
||||
while time.time() - start < test_duration_min * 60:
|
||||
conn = psycopg2.connect(connstr)
|
||||
conn.autocommit = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
|
||||
)
|
||||
check_pgbench_still_running(workload)
|
||||
cur.execute(
|
||||
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
|
||||
)
|
||||
|
||||
conn.close()
|
||||
|
||||
# Measure storage
|
||||
if time.time() - prev_measurement > test_interval_min * 60:
|
||||
storage = benchmark_project_pub.get_synthetic_storage_size()
|
||||
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
|
||||
prev_measurement = time.time()
|
||||
time.sleep(test_interval_min * 60 / 3)
|
||||
finally:
|
||||
workload.terminate()
|
||||
|
||||
conn.close()
|
||||
|
||||
workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
|
||||
try:
|
||||
start = time.time()
|
||||
prev_measurement = time.time()
|
||||
while time.time() - start < test_duration_min * 60:
|
||||
conn = psycopg2.connect(connstr)
|
||||
conn.autocommit = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
|
||||
)
|
||||
check_pgbench_still_running(workload)
|
||||
cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())")
|
||||
|
||||
conn.close()
|
||||
|
||||
# Measure storage
|
||||
if time.time() - prev_measurement > test_interval_min * 60:
|
||||
storage = benchmark_project_pub.get_synthetic_storage_size()
|
||||
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
|
||||
prev_measurement = time.time()
|
||||
time.sleep(test_interval_min * 60 / 3)
|
||||
|
||||
finally:
|
||||
workload.terminate()
|
||||
|
||||
Reference in New Issue
Block a user