mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
Also, modify the "neon_local timeline import" command so that it doesn't create the endpoint any more. I don't see any reason to bundle that in the same command, the "timeline create" and "timeline branch" commands don't do that either. I plan to add more tests similar to 'test_import_at_2bil', this will help to reduce the copy-pasting.
185 lines
5.9 KiB
Python
185 lines
5.9 KiB
Python
import os
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from fixtures.common_types import TenantId, TimelineId
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import (
|
|
NeonEnvBuilder,
|
|
PgBin,
|
|
import_timeline_from_vanilla_postgres,
|
|
wait_for_wal_insert_lsn,
|
|
)
|
|
from fixtures.remote_storage import RemoteStorageKind
|
|
from fixtures.utils import query_scalar
|
|
|
|
|
|
def test_next_xid(neon_env_builder: NeonEnvBuilder):
|
|
env = neon_env_builder.init_start()
|
|
|
|
endpoint = env.endpoints.create_start("main")
|
|
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
cur.execute("CREATE TABLE t(x integer)")
|
|
|
|
iterations = 32
|
|
for i in range(1, iterations + 1):
|
|
print(f"iteration {i} / {iterations}")
|
|
|
|
# Kill and restart the pageserver.
|
|
endpoint.stop()
|
|
env.pageserver.stop(immediate=True)
|
|
env.pageserver.start()
|
|
endpoint.start()
|
|
|
|
retry_sleep = 0.5
|
|
max_retries = 200
|
|
retries = 0
|
|
while True:
|
|
try:
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
cur.execute(f"INSERT INTO t values({i})")
|
|
conn.close()
|
|
|
|
except Exception as error:
|
|
# It's normal that it takes some time for the pageserver to
|
|
# restart, and for the connection to fail until it does. It
|
|
# should eventually recover, so retry until it succeeds.
|
|
print(f"failed: {error}")
|
|
if retries < max_retries:
|
|
retries += 1
|
|
print(f"retry {retries} / {max_retries}")
|
|
time.sleep(retry_sleep)
|
|
continue
|
|
else:
|
|
raise
|
|
break
|
|
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT count(*) FROM t")
|
|
assert cur.fetchone() == (iterations,)
|
|
|
|
|
|
# Test for a bug we had, where nextXid was incorrectly updated when the
|
|
# XID counter reached 2 billion. The nextXid tracking logic incorrectly
|
|
# treated 0 (InvalidTransactionId) as a regular XID, and after reaching
|
|
# 2 billion, it started to look like a very new XID, which caused nextXid
|
|
# to be immediately advanced to the next epoch.
|
|
#
|
|
def test_import_at_2bil(
|
|
neon_env_builder: NeonEnvBuilder,
|
|
test_output_dir: Path,
|
|
pg_bin: PgBin,
|
|
vanilla_pg,
|
|
):
|
|
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
|
env = neon_env_builder.init_start()
|
|
|
|
# Reset the vanilla Postgres instance to somewhat before 2 billion transactions.
|
|
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
|
cmd = [pg_resetwal_path, "--next-transaction-id=2129920000", "-D", str(vanilla_pg.pgdatadir)]
|
|
pg_bin.run_capture(cmd)
|
|
|
|
vanilla_pg.start()
|
|
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
|
vanilla_pg.safe_psql(
|
|
"""create table tt as select 'long string to consume some space' || g
|
|
from generate_series(1,300000) g"""
|
|
)
|
|
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
|
|
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
|
|
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
|
|
vanilla_pg.safe_psql("CHECKPOINT")
|
|
|
|
tenant_id = TenantId.generate()
|
|
env.pageserver.tenant_create(tenant_id)
|
|
timeline_id = TimelineId.generate()
|
|
|
|
# Import the cluster to Neon
|
|
import_timeline_from_vanilla_postgres(
|
|
test_output_dir,
|
|
env,
|
|
pg_bin,
|
|
tenant_id,
|
|
timeline_id,
|
|
"imported_2bil_xids",
|
|
vanilla_pg.connstr(),
|
|
)
|
|
vanilla_pg.stop() # don't need the original server anymore
|
|
|
|
# Check that it works
|
|
endpoint = env.endpoints.create_start(
|
|
"imported_2bil_xids",
|
|
tenant_id=tenant_id,
|
|
config_lines=[
|
|
"log_autovacuum_min_duration = 0",
|
|
"autovacuum_naptime='5 s'",
|
|
],
|
|
)
|
|
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
|
|
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
|
|
# Install extension containing function needed for test
|
|
cur.execute("CREATE EXTENSION neon_test_utils")
|
|
|
|
# Advance nextXid close to 2 billion XIDs
|
|
while True:
|
|
xid = int(query_scalar(cur, "SELECT txid_current()"))
|
|
log.info(f"xid now {xid}")
|
|
# Consume 10k transactons at a time until we get to 2^31 - 200k
|
|
if xid < 2 * 1024 * 1024 * 1024 - 100000:
|
|
cur.execute("select test_consume_xids(50000);")
|
|
elif xid < 2 * 1024 * 1024 * 1024 - 10000:
|
|
cur.execute("select test_consume_xids(5000);")
|
|
else:
|
|
break
|
|
|
|
# Run a bunch of real INSERTs to cross over the 2 billion mark
|
|
# Use a begin-exception block to have a separate sub-XID for each insert.
|
|
cur.execute(
|
|
"""
|
|
do $$
|
|
begin
|
|
for i in 1..10000 loop
|
|
-- Use a begin-exception block to generate a new subtransaction on each iteration
|
|
begin
|
|
insert into t values (i);
|
|
exception when others then
|
|
raise 'not expected %', sqlerrm;
|
|
end;
|
|
end loop;
|
|
end;
|
|
$$;
|
|
"""
|
|
)
|
|
|
|
# Also create a multi-XID with members past the 2 billion mark
|
|
conn2 = endpoint.connect()
|
|
cur2 = conn2.cursor()
|
|
cur.execute("INSERT INTO t VALUES ('x')")
|
|
cur.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
|
|
cur2.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
|
|
cur.execute("COMMIT")
|
|
cur2.execute("COMMIT")
|
|
|
|
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
|
|
# records would have the same effect.
|
|
cur.execute("checkpoint")
|
|
|
|
# wait until pageserver receives that data
|
|
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
|
|
|
|
# Restart endpoint
|
|
endpoint.stop()
|
|
endpoint.start()
|
|
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT count(*) from t")
|
|
assert cur.fetchone() == (10000 + 1 + 1,)
|