Files
neon/test_runner/batch_others/test_multixact.py

63 lines
2.0 KiB
Python

import pytest
import os
import psycopg2
import multiprocessing
pytest_plugins = ("fixtures.zenith_fixtures")
def runQuery(connstr):
con = psycopg2.connect(connstr)
con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute('select * from t1 for key share;')
def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
# Create a branch for us
zenith_cli.run(["branch", "test_multixact", "empty"])
pg = postgres.create_start('test_multixact')
print("postgres is running on 'test_multixact' branch")
pg_conn = psycopg2.connect(pg.connstr())
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pg_conn.cursor()
cur.execute('CREATE TABLE t1(i int primary key);'
'INSERT INTO t1 select * from generate_series(1,100);')
# Lock entries in parallel connections to set multixact
nclients = 3
pool = multiprocessing.Pool(nclients)
args = [pg.connstr()] * nclients
pool.map(runQuery, args)
pool.close()
pool.join()
# force wal flush
cur.execute('checkpoint')
cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint();')
res = cur.fetchone()
next_multixact_id = res[0]
lsn = res[1]
# Ensure that we did lock some tuples
assert(int(next_multixact_id) > 1)
# Branch at this point
zenith_cli.run(["branch", "test_multixact_new", "test_multixact@"+lsn]);
pg_new = postgres.create_start('test_multixact_new')
print("postgres is running on 'test_multixact_new' branch")
pg_new_conn = psycopg2.connect(pg_new.connstr())
pg_new_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur_new = pg_new_conn.cursor()
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint();')
next_multixact_id_new = cur_new.fetchone()[0]
# Check that we restored pg_controlfile correctly
# TODO compare content of pg_multixact files?
assert(next_multixact_id_new == next_multixact_id)