mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Add tests with concurrent computes.
Removes test_restart_compute, as added test_compute_restarts is stronger.
This commit is contained in:
committed by
Arseny Sher
parent
07df7c2edd
commit
8fabdc6708
@@ -1,74 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from contextlib import closing
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
#
|
||||
# Test restarting and recreating a postgres instance
|
||||
#
|
||||
@pytest.mark.parametrize('with_safekeepers', [False, True])
|
||||
def test_restart_compute(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool):
|
||||
neon_env_builder.auth_enabled = True
|
||||
if with_safekeepers:
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_restart_compute')
|
||||
pg = env.postgres.create_start('test_restart_compute')
|
||||
log.info("postgres is running on 'test_restart_compute' branch")
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('CREATE TABLE t(key int primary key, value text)')
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# Remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute')
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the row
|
||||
cur.execute('SELECT sum(key) FROM t')
|
||||
r = cur.fetchone()
|
||||
assert r == (5000050000, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# Insert another row
|
||||
cur.execute("INSERT INTO t VALUES (100001, 'payload2')")
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# Again remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute')
|
||||
|
||||
# That select causes lots of FPI's and increases probability of wakeepers
|
||||
# lagging behind after query completion
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
log.info(f"res = {r}")
|
||||
|
||||
# And again remove data directory and restart
|
||||
pg.stop_and_destroy().create_start('test_restart_compute')
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM t')
|
||||
|
||||
r = cur.fetchone()
|
||||
assert r == (100001, )
|
||||
log.info(f"res = {r}")
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import uuid
|
||||
|
||||
import asyncpg
|
||||
import random
|
||||
import time
|
||||
@@ -7,7 +8,7 @@ import time
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
log = getLogger('root.safekeeper_async')
|
||||
|
||||
@@ -234,3 +235,156 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
|
||||
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
|
||||
# are not removed before broadcasted to all safekeepers, with the help of replication slot
|
||||
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5))
|
||||
|
||||
|
||||
def postgres_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
|
||||
pg = Postgres(
|
||||
env,
|
||||
tenant_id=env.initial_tenant,
|
||||
port=env.port_distributor.get_port(),
|
||||
# In these tests compute has high probability of terminating on its own
|
||||
# before our stop() due to lost consensus leadership.
|
||||
check_stop_result=False)
|
||||
|
||||
# embed current time in node name
|
||||
node_name = pgdir_name or f'pg_node_{time.time()}'
|
||||
return pg.create_start(branch_name=branch,
|
||||
node_name=node_name,
|
||||
config_lines=['log_statement=all'])
|
||||
|
||||
|
||||
async def exec_compute_query(env: NeonEnv,
|
||||
branch: str,
|
||||
query: str,
|
||||
pgdir_name: Optional[str] = None):
|
||||
with postgres_create_start(env, branch=branch, pgdir_name=pgdir_name) as pg:
|
||||
before_conn = time.time()
|
||||
conn = await pg.connect_async()
|
||||
res = await conn.fetch(query)
|
||||
await conn.close()
|
||||
after_conn = time.time()
|
||||
log.info(f'{query} took {after_conn - before_conn}s')
|
||||
return res
|
||||
|
||||
|
||||
async def run_compute_restarts(env: NeonEnv,
|
||||
queries=16,
|
||||
batch_insert=10000,
|
||||
branch='test_compute_restarts'):
|
||||
cnt = 0
|
||||
sum = 0
|
||||
|
||||
await exec_compute_query(env, branch, 'CREATE TABLE t (i int)')
|
||||
|
||||
for i in range(queries):
|
||||
if i % 4 == 0:
|
||||
await exec_compute_query(
|
||||
env, branch, f'INSERT INTO t SELECT 1 FROM generate_series(1, {batch_insert})')
|
||||
sum += batch_insert
|
||||
cnt += batch_insert
|
||||
elif (i % 4 == 1) or (i % 4 == 3):
|
||||
# Note that select causes lots of FPI's and increases probability of safekeepers
|
||||
# standing at different LSNs after compute termination.
|
||||
actual_sum = (await exec_compute_query(env, branch, 'SELECT SUM(i) FROM t'))[0][0]
|
||||
assert actual_sum == sum, f'Expected sum={sum}, actual={actual_sum}'
|
||||
elif i % 4 == 2:
|
||||
await exec_compute_query(env, branch, 'UPDATE t SET i = i + 1')
|
||||
sum += cnt
|
||||
|
||||
|
||||
# Add a test which creates compute for every query, and then destroys it right after.
|
||||
def test_compute_restarts(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_compute_restarts')
|
||||
asyncio.run(run_compute_restarts(env))
|
||||
|
||||
|
||||
class BackgroundCompute(object):
|
||||
def __init__(self, index: int, env: NeonEnv, branch: str):
|
||||
self.index = index
|
||||
self.env = env
|
||||
self.branch = branch
|
||||
self.running = False
|
||||
self.stopped = False
|
||||
self.total_tries = 0
|
||||
self.successful_queries: List[int] = []
|
||||
|
||||
async def run(self):
|
||||
if self.running:
|
||||
raise Exception('BackgroundCompute is already running')
|
||||
|
||||
self.running = True
|
||||
i = 0
|
||||
while not self.stopped:
|
||||
try:
|
||||
verify_key = (self.index << 16) + i
|
||||
i += 1
|
||||
self.total_tries += 1
|
||||
res = await exec_compute_query(
|
||||
self.env,
|
||||
self.branch,
|
||||
f'INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key',
|
||||
pgdir_name=f'bgcompute{self.index}_key{verify_key}',
|
||||
)
|
||||
log.info(f'result: {res}')
|
||||
if len(res) != 1:
|
||||
raise Exception('No result returned')
|
||||
if res[0][0] != verify_key:
|
||||
raise Exception('Wrong result returned')
|
||||
self.successful_queries.append(verify_key)
|
||||
except Exception as e:
|
||||
log.info(f'BackgroundCompute {self.index} query failed: {e}')
|
||||
|
||||
# With less sleep, there is a very big chance of not committing
|
||||
# anything or only 1 xact during test run.
|
||||
await asyncio.sleep(2 * random.random())
|
||||
self.running = False
|
||||
|
||||
|
||||
async def run_concurrent_computes(env: NeonEnv,
|
||||
num_computes=10,
|
||||
run_seconds=20,
|
||||
branch='test_concurrent_computes'):
|
||||
await exec_compute_query(
|
||||
env,
|
||||
branch,
|
||||
'CREATE TABLE query_log (t timestamp default now(), index int, verify_key int)')
|
||||
|
||||
computes = [BackgroundCompute(i, env, branch) for i in range(num_computes)]
|
||||
background_tasks = [asyncio.create_task(compute.run()) for compute in computes]
|
||||
|
||||
await asyncio.sleep(run_seconds)
|
||||
for compute in computes[1:]:
|
||||
compute.stopped = True
|
||||
log.info("stopped all tasks but one")
|
||||
|
||||
# work for some time with only one compute -- it should be able to make some xacts
|
||||
await asyncio.sleep(8)
|
||||
computes[0].stopped = True
|
||||
|
||||
await asyncio.gather(*background_tasks)
|
||||
|
||||
result = await exec_compute_query(env, branch, 'SELECT * FROM query_log')
|
||||
# we should have inserted something while single compute was running
|
||||
assert len(result) >= 4
|
||||
log.info(f'Executed {len(result)} queries')
|
||||
for row in result:
|
||||
log.info(f'{row[0]} {row[1]} {row[2]}')
|
||||
|
||||
# ensure everything reported as committed wasn't lost
|
||||
for compute in computes:
|
||||
for verify_key in compute.successful_queries:
|
||||
assert verify_key in [row[2] for row in result]
|
||||
|
||||
|
||||
# Run multiple computes concurrently, creating-destroying them after single
|
||||
# query. Ensure we don't lose any xacts reported as committed and be able to
|
||||
# progress once only one compute remains.
|
||||
def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_concurrent_computes')
|
||||
asyncio.run(run_concurrent_computes(env))
|
||||
|
||||
@@ -1160,6 +1160,7 @@ class NeonCli:
|
||||
node_name: str,
|
||||
tenant_id: Optional[uuid.UUID] = None,
|
||||
destroy=False,
|
||||
check_return_code=True,
|
||||
) -> 'subprocess.CompletedProcess[str]':
|
||||
args = [
|
||||
'pg',
|
||||
@@ -1172,7 +1173,7 @@ class NeonCli:
|
||||
if node_name is not None:
|
||||
args.append(node_name)
|
||||
|
||||
return self.raw_cli(args)
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def raw_cli(self,
|
||||
arguments: List[str],
|
||||
@@ -1188,6 +1189,8 @@ class NeonCli:
|
||||
>>> result = env.neon_cli.raw_cli(...)
|
||||
>>> assert result.stderr == ""
|
||||
>>> log.info(result.stdout)
|
||||
|
||||
If `check_return_code`, on non-zero exit code logs failure and raises.
|
||||
"""
|
||||
|
||||
assert type(arguments) == list
|
||||
@@ -1213,27 +1216,27 @@ class NeonCli:
|
||||
env_vars[var] = val
|
||||
|
||||
# Intercept CalledProcessError and print more info
|
||||
try:
|
||||
res = subprocess.run(args,
|
||||
env=env_vars,
|
||||
check=True,
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
res = subprocess.run(args,
|
||||
env=env_vars,
|
||||
check=False,
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
Run failed: {exc}
|
||||
stdout: {exc.stdout}
|
||||
stderr: {exc.stderr}
|
||||
Run {res.args} failed:
|
||||
stdout: {res.stdout}
|
||||
stderr: {res.stderr}
|
||||
"""
|
||||
log.info(msg)
|
||||
raise Exception(msg) from subprocess.CalledProcessError(res.returncode,
|
||||
res.args,
|
||||
res.stdout,
|
||||
res.stderr)
|
||||
|
||||
raise Exception(msg) from exc
|
||||
|
||||
if check_return_code:
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
|
||||
@@ -1526,7 +1529,11 @@ def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
|
||||
|
||||
class Postgres(PgProtocol):
|
||||
""" An object representing a running postgres daemon. """
|
||||
def __init__(self, env: NeonEnv, tenant_id: uuid.UUID, port: int):
|
||||
def __init__(self,
|
||||
env: NeonEnv,
|
||||
tenant_id: uuid.UUID,
|
||||
port: int,
|
||||
check_stop_result: bool = True):
|
||||
super().__init__(host='localhost', port=port, user='cloud_admin', dbname='postgres')
|
||||
self.env = env
|
||||
self.running = False
|
||||
@@ -1534,6 +1541,7 @@ class Postgres(PgProtocol):
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
self.tenant_id = tenant_id
|
||||
self.port = port
|
||||
self.check_stop_result = check_stop_result
|
||||
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
|
||||
|
||||
def create(
|
||||
@@ -1585,8 +1593,6 @@ class Postgres(PgProtocol):
|
||||
port=self.port)
|
||||
self.running = True
|
||||
|
||||
log.info(f"stdout: {run_result.stdout}")
|
||||
|
||||
return self
|
||||
|
||||
def pg_data_dir_path(self) -> str:
|
||||
@@ -1650,7 +1656,9 @@ class Postgres(PgProtocol):
|
||||
|
||||
if self.running:
|
||||
assert self.node_name is not None
|
||||
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id)
|
||||
self.env.neon_cli.pg_stop(self.node_name,
|
||||
self.tenant_id,
|
||||
check_return_code=self.check_stop_result)
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
@@ -1662,7 +1670,10 @@ class Postgres(PgProtocol):
|
||||
"""
|
||||
|
||||
assert self.node_name is not None
|
||||
self.env.neon_cli.pg_stop(self.node_name, self.tenant_id, True)
|
||||
self.env.neon_cli.pg_stop(self.node_name,
|
||||
self.tenant_id,
|
||||
True,
|
||||
check_return_code=self.check_stop_result)
|
||||
self.node_name = None
|
||||
self.running = False
|
||||
|
||||
@@ -1681,6 +1692,8 @@ class Postgres(PgProtocol):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
started_at = time.time()
|
||||
|
||||
self.create(
|
||||
branch_name=branch_name,
|
||||
node_name=node_name,
|
||||
@@ -1688,6 +1701,8 @@ class Postgres(PgProtocol):
|
||||
lsn=lsn,
|
||||
).start()
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
|
||||
return self
|
||||
|
||||
def __enter__(self):
|
||||
|
||||
Reference in New Issue
Block a user