Fix autocommit footguns in performance tests

psycopg2 has the following warning related to autocommit:

> By default, any query execution, including a simple SELECT will start
> a transaction: for long-running programs, if no further action is
> taken, the session will remain “idle in transaction”, an undesirable
> condition for several reasons (locks are held by the session, tables
> bloat…). For long lived scripts, either ensure to terminate a
> transaction as soon as possible or use an autocommit connection.

In the 2.9 release notes, psycopg2 also made the following change:

> `with connection` starts a transaction on autocommit transactions too

Some of these connections are indeed long-lived, so we were retaining
tons of WAL on the endpoints because we had a transaction pinned in the
past.

Link: https://www.psycopg.org/docs/news.html#what-s-new-in-psycopg-2-9
Link: https://github.com/psycopg/psycopg2/issues/941
Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
Tristan Partin
2024-11-12 15:48:19 -06:00
committed by GitHub
parent 2256a5727a
commit d8f5d43549
2 changed files with 98 additions and 74 deletions

View File

@@ -149,12 +149,16 @@ def test_subscriber_lag(
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
with (
psycopg2.connect(pub_connstr) as pub_conn,
psycopg2.connect(sub_connstr) as sub_conn,
):
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
@@ -206,6 +210,7 @@ def test_publisher_restart(
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
pub_cur.execute("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'")
pub_exists = len(pub_cur.fetchall()) != 0
@@ -222,6 +227,7 @@ def test_publisher_restart(
sub_cur.execute(f"create subscription sub1 connection '{pub_connstr}' publication pub1")
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
@@ -248,12 +254,17 @@ def test_publisher_restart(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=pub_env,
)
with (
psycopg2.connect(pub_connstr) as pub_conn,
psycopg2.connect(sub_connstr) as sub_conn,
):
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
@@ -288,58 +299,56 @@ def test_snap_files(
env = benchmark_project_pub.pgbench_env
connstr = benchmark_project_pub.connstr
with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
is_super = cast("bool", cur.fetchall()[0][0])
assert is_super, "This benchmark won't work if we don't have superuser"
conn = psycopg2.connect(connstr)
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
is_super = cast("bool", cur.fetchall()[0][0])
assert is_super, "This benchmark won't work if we don't have superuser"
conn.close()
pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=env)
conn = psycopg2.connect(connstr)
conn.autocommit = True
cur = conn.cursor()
cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1")
with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT pg_reload_conf()")
with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;
with conn.cursor() as cur:
cur.execute(
"""
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;
"""
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
conn.close()
workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
with psycopg2.connect(connstr) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute(
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
)
conn = psycopg2.connect(connstr)
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())")
conn.close()
# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:

View File

@@ -102,15 +102,21 @@ def test_ro_replica_lag(
check_pgbench_still_running(master_workload)
check_pgbench_still_running(replica_workload)
time.sleep(sync_interval_min * 60)
conn_master = psycopg2.connect(master_connstr)
conn_replica = psycopg2.connect(replica_connstr)
conn_master.autocommit = True
conn_replica.autocommit = True
with (
psycopg2.connect(master_connstr) as conn_master,
psycopg2.connect(replica_connstr) as conn_replica,
conn_master.cursor() as cur_master,
conn_replica.cursor() as cur_replica,
):
with (
conn_master.cursor() as cur_master,
conn_replica.cursor() as cur_replica,
):
lag = measure_replication_lag(cur_master, cur_replica)
lag = measure_replication_lag(cur_master, cur_replica)
conn_master.close()
conn_replica.close()
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
finally:
@@ -219,11 +225,15 @@ def test_replication_start_stop(
pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10"], env=master_env)
# Sync replicas
with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for i in range(num_replicas):
conn_replica = psycopg2.connect(replica_connstr[i])
measure_replication_lag(cur_master, conn_replica.cursor())
conn_master = psycopg2.connect(master_connstr)
conn_master.autocommit = True
with conn_master.cursor() as cur_master:
for i in range(num_replicas):
conn_replica = psycopg2.connect(replica_connstr[i])
measure_replication_lag(cur_master, conn_replica.cursor())
conn_master.close()
master_pgbench = pg_bin.run_nonblocking(
[
@@ -277,17 +287,22 @@ def test_replication_start_stop(
time.sleep(configuration_test_time_sec)
with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for ireplica in range(num_replicas):
replica_conn = psycopg2.connect(replica_connstr[ireplica])
lag = measure_replication_lag(cur_master, replica_conn.cursor())
zenbenchmark.record(
f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER
)
log.info(
f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}"
)
conn_master = psycopg2.connect(master_connstr)
conn_master.autocommit = True
with conn_master.cursor() as cur_master:
for ireplica in range(num_replicas):
replica_conn = psycopg2.connect(replica_connstr[ireplica])
lag = measure_replication_lag(cur_master, replica_conn.cursor())
zenbenchmark.record(
f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER
)
log.info(
f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}"
)
conn_master.close()
master_pgbench.terminate()
except Exception as e:
error_occurred = True