mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
Add test_multixact to check that we replay multixact and advance next_multixact_id correctly
This commit is contained in:
71
test_runner/batch_others/test_multixact.py
Normal file
71
test_runner/batch_others/test_multixact.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import pytest
|
||||
import os
|
||||
import psycopg2
|
||||
import multiprocessing
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
#
|
||||
# Test multixact state after branching
|
||||
# Now this test is very minimalistic -
|
||||
# it only checks next_multixact_id field in restored pg_control,
|
||||
# since we don't have functions to check multixact internals.
|
||||
#
|
||||
|
||||
def runQuery(connstr):
|
||||
con = psycopg2.connect(connstr)
|
||||
con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = con.cursor()
|
||||
cur.execute('select * from t1 for key share;')
|
||||
|
||||
|
||||
def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
|
||||
|
||||
# Create a branch for us
|
||||
zenith_cli.run(["branch", "test_multixact", "empty"])
|
||||
pg = postgres.create_start('test_multixact')
|
||||
|
||||
print("postgres is running on 'test_multixact' branch")
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute('CREATE TABLE t1(i int primary key);'
|
||||
'INSERT INTO t1 select * from generate_series(1,100);')
|
||||
|
||||
cur.execute('SELECT next_multixact_id FROM pg_control_checkpoint();')
|
||||
next_multixact_id_old = cur.fetchone()[0]
|
||||
|
||||
# Lock entries in parallel connections to set multixact
|
||||
nclients = 3
|
||||
pool = multiprocessing.Pool(nclients)
|
||||
args = [pg.connstr()] * nclients
|
||||
pool.map(runQuery, args)
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
# force wal flush
|
||||
cur.execute('checkpoint')
|
||||
|
||||
cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint();')
|
||||
res = cur.fetchone()
|
||||
next_multixact_id = res[0]
|
||||
lsn = res[1]
|
||||
|
||||
# Ensure that we did lock some tuples
|
||||
assert(int(next_multixact_id) > int(next_multixact_id_old))
|
||||
|
||||
# Branch at this point
|
||||
zenith_cli.run(["branch", "test_multixact_new", "test_multixact@"+lsn]);
|
||||
pg_new = postgres.create_start('test_multixact_new')
|
||||
|
||||
print("postgres is running on 'test_multixact_new' branch")
|
||||
pg_new_conn = psycopg2.connect(pg_new.connstr())
|
||||
pg_new_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur_new = pg_new_conn.cursor()
|
||||
|
||||
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint();')
|
||||
next_multixact_id_new = cur_new.fetchone()[0]
|
||||
|
||||
# Check that we restored pg_controlfile correctly
|
||||
assert(next_multixact_id_new == next_multixact_id)
|
||||
Reference in New Issue
Block a user