Modify test_multixact to reproduce the problem with incorrect offset calculation

This commit is contained in:
Konstantin Knizhnik
2023-07-21 22:16:42 +03:00
parent bb18c9958a
commit 10774a5132

View File

@@ -1,3 +1,7 @@
import random
import threading
from threading import Thread
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.utils import query_scalar
@@ -15,11 +19,17 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
endpoint = env.endpoints.create_start("test_multixact")
log.info("postgres is running on 'test_multixact' branch")
n_records = 100
n_threads = 5
n_iters = 1000
n_restarts = 10
cur = endpoint.connect().cursor()
cur.execute(
"""
CREATE TABLE t1(i int primary key);
INSERT INTO t1 select * from generate_series(1, 100);
f"""
CREATE TABLE t1(pk int primary key, val integer);
INSERT INTO t1 values (generate_series(1, {n_records}), 0);
"""
)
@@ -28,26 +38,32 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
)
# Lock entries using parallel connections in a round-robin fashion.
nclients = 20
connections = []
for i in range(nclients):
# Do not turn on autocommit. We want to hold the key-share locks.
def do_updates():
conn = endpoint.connect(autocommit=False)
connections.append(conn)
for i in range(n_iters):
pk = random.randrange(1, n_records)
conn.cursor().execute(f"update t1 set val=val+1 where pk={pk}")
conn.cursor().execute("select * from t1 for key share")
conn.commit()
conn.close()
# On each iteration, we commit the previous transaction on a connection,
# and issue antoher select. Each SELECT generates a new multixact that
# includes the new XID, and the XIDs of all the other parallel transactions.
# This generates enough traffic on both multixact offsets and members SLRUs
# to cross page boundaries.
for i in range(5000):
conn = connections[i % nclients]
conn.commit()
conn.cursor().execute("select * from t1 for key share")
for iter in range(n_restarts):
threads: List[threading.Thread] = []
for i in range(n_threads):
threads.append(threading.Thread(target=do_updates, args=(), daemon=False))
threads[-1].start()
# We have multixacts now. We can close the connections.
for c in connections:
c.close()
for thread in threads:
thread.join()
# Restart endpoint
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("select count(*) from t1")
assert cur.fetchone() == (n_records,)
# force wal flush
cur.execute("checkpoint")
@@ -74,6 +90,3 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
# Check that we restored pg_controlfile correctly
assert next_multixact_id_new == next_multixact_id
# Check that we can restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)