Files
neon/test_runner/batch_others/test_wal_acceptor_async.py
Arthur Petukhovsky 116ecdf87a Improve walreceiver logic (#2253)
This patch makes walreceiver logic more complicated, but it should work better in most cases. Added `test_wal_lagging` to test scenarios where alive safekeepers can lag behind other alive safekeepers.

- There was a bug which looks like `etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn())` filtered all safekeepers in some strange cases. I removed this filter, it should probably help with #2237
- Now walreceiver_connection reports status, including commit_lsn. This allows keeping safekeeper connection even when etcd is down.
- Safekeeper connection now fails if pageserver doesn't receive safekeeper messages for some time. Usually safekeeper sends messages at least once per second.
- `LaggingWal` check now uses `commit_lsn` directly from safekeeper. This fixes the issue with often reconnects, when compute generates WAL really fast.
- `NoWalTimeout` is rewritten to trigger only when we know about the new WAL and the connected safekeeper doesn't stream any WAL. This allows setting a small `lagging_wal_timeout` because it will trigger only when we observe that the connected safekeeper has stuck.
2022-08-15 13:31:26 +03:00

588 lines
21 KiB
Python

import asyncio
import uuid
import asyncpg
import random
import time
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List, Optional
from dataclasses import dataclass
log = getLogger('root.safekeeper_async')
class BankClient(object):
def __init__(self, conn: asyncpg.Connection, n_accounts, init_amount):
self.conn: asyncpg.Connection = conn
self.n_accounts = n_accounts
self.init_amount = init_amount
async def initdb(self):
await self.conn.execute('DROP TABLE IF EXISTS bank_accs')
await self.conn.execute('CREATE TABLE bank_accs(uid int primary key, amount int)')
await self.conn.execute(
'''
INSERT INTO bank_accs
SELECT *, $1 FROM generate_series(0, $2)
''',
self.init_amount,
self.n_accounts - 1)
await self.conn.execute('DROP TABLE IF EXISTS bank_log')
await self.conn.execute('CREATE TABLE bank_log(from_uid int, to_uid int, amount int)')
async def check_invariant(self):
row = await self.conn.fetchrow('SELECT sum(amount) AS sum FROM bank_accs')
assert row['sum'] == self.n_accounts * self.init_amount
async def bank_transfer(conn: asyncpg.Connection, from_uid, to_uid, amount):
# avoid deadlocks by sorting uids
if from_uid > to_uid:
from_uid, to_uid, amount = to_uid, from_uid, -amount
async with conn.transaction():
await conn.execute(
'UPDATE bank_accs SET amount = amount + ($1) WHERE uid = $2',
amount,
to_uid,
)
await conn.execute(
'UPDATE bank_accs SET amount = amount - ($1) WHERE uid = $2',
amount,
from_uid,
)
await conn.execute(
'INSERT INTO bank_log VALUES ($1, $2, $3)',
from_uid,
to_uid,
amount,
)
class WorkerStats(object):
def __init__(self, n_workers):
self.counters = [0] * n_workers
self.running = True
def reset(self):
self.counters = [0] * len(self.counters)
def inc_progress(self, worker_id):
self.counters[worker_id] += 1
def check_progress(self):
log.debug("Workers progress: {}".format(self.counters))
# every worker should finish at least one tx
assert all(cnt > 0 for cnt in self.counters)
progress = sum(self.counters)
log.info('All workers made {} transactions'.format(progress))
async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accounts, max_transfer):
pg_conn = await pg.connect_async()
log.debug('Started worker {}'.format(worker_id))
while stats.running:
from_uid = random.randint(0, n_accounts - 1)
to_uid = (from_uid + random.randint(1, n_accounts - 1)) % n_accounts
amount = random.randint(1, max_transfer)
await bank_transfer(pg_conn, from_uid, to_uid, amount)
stats.inc_progress(worker_id)
log.debug('Executed transfer({}) {} => {}'.format(amount, from_uid, to_uid))
log.debug('Finished worker {}'.format(worker_id))
await pg_conn.close()
async def wait_for_lsn(safekeeper: Safekeeper,
tenant_id: str,
timeline_id: str,
wait_lsn: str,
polling_interval=1,
timeout=60):
"""
Poll flush_lsn from safekeeper until it's greater or equal than
provided wait_lsn. To do that, timeline_status is fetched from
safekeeper every polling_interval seconds.
"""
started_at = time.time()
client = safekeeper.http_client()
flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn
log.info(
f'Safekeeper at port {safekeeper.port.pg} has flush_lsn {flush_lsn}, waiting for lsn {wait_lsn}'
)
while lsn_from_hex(wait_lsn) > lsn_from_hex(flush_lsn):
elapsed = time.time() - started_at
if elapsed > timeout:
raise RuntimeError(
f"timed out waiting for safekeeper at port {safekeeper.port.pg} to reach {wait_lsn}, current lsn is {flush_lsn}"
)
await asyncio.sleep(polling_interval)
flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn
log.debug(f'safekeeper port={safekeeper.port.pg} flush_lsn={flush_lsn} wait_lsn={wait_lsn}')
# This test will run several iterations and check progress in each of them.
# On each iteration 1 acceptor is stopped, and 2 others should allow
# background workers execute transactions. In the end, state should remain
# consistent.
async def run_restarts_under_load(env: NeonEnv,
pg: Postgres,
acceptors: List[Safekeeper],
n_workers=10,
n_accounts=100,
init_amount=100000,
max_transfer=100,
period_time=4,
iterations=10):
# Set timeout for this test at 5 minutes. It should be enough for test to complete,
# taking into account that this timeout is checked only at the beginning of every iteration.
test_timeout_at = time.monotonic() + 5 * 60
pg_conn = await pg.connect_async()
tenant_id = await pg_conn.fetchval("show neon.tenant_id")
timeline_id = await pg_conn.fetchval("show neon.timeline_id")
bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount)
# create tables and initial balances
await bank.initdb()
stats = WorkerStats(n_workers)
workers = []
for worker_id in range(n_workers):
worker = run_random_worker(stats, pg, worker_id, bank.n_accounts, max_transfer)
workers.append(asyncio.create_task(worker))
for it in range(iterations):
assert time.monotonic() < test_timeout_at, 'test timed out'
victim_idx = it % len(acceptors)
victim = acceptors[victim_idx]
victim.stop()
flush_lsn = await pg_conn.fetchval('SELECT pg_current_wal_flush_lsn()')
flush_lsn = lsn_to_hex(flush_lsn)
log.info(f'Postgres flush_lsn {flush_lsn}')
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
sk_ps_lag = lsn_from_hex(flush_lsn) - lsn_from_hex(pageserver_lsn)
log.info(f'Pageserver last_record_lsn={pageserver_lsn} lag={sk_ps_lag / 1024}kb')
# Wait until alive safekeepers catch up with postgres
for idx, safekeeper in enumerate(acceptors):
if idx != victim_idx:
await wait_for_lsn(safekeeper, tenant_id, timeline_id, flush_lsn)
stats.reset()
await asyncio.sleep(period_time)
# assert that at least one transaction has completed in every worker
stats.check_progress()
victim.start()
log.info('Iterations are finished, exiting coroutines...')
stats.running = False
# await all workers
await asyncio.gather(*workers)
# assert balances sum hasn't changed
await bank.check_invariant()
await pg_conn.close()
# Restart acceptors one by one, while executing and validating bank transactions
def test_restarts_under_load(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_restarts_under_load')
# Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long
pg = env.postgres.create_start('test_safekeepers_restarts_under_load',
config_lines=['max_replication_write_lag=1MB'])
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers))
# Restart acceptors one by one and test that everything is working as expected
# when checkpoins are triggered frequently by max_wal_size=32MB. Because we have
# wal_keep_size=0, there will be aggressive WAL segments recycling.
def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_restarts_frequent_checkpoints')
# Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long
pg = env.postgres.create_start('test_restarts_frequent_checkpoints',
config_lines=[
'max_replication_write_lag=1MB',
'min_wal_size=32MB',
'max_wal_size=32MB',
'log_checkpoints=on'
])
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
# are not removed before broadcasted to all safekeepers, with the help of replication slot
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5))
def postgres_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
pg = Postgres(
env,
tenant_id=env.initial_tenant,
port=env.port_distributor.get_port(),
# In these tests compute has high probability of terminating on its own
# before our stop() due to lost consensus leadership.
check_stop_result=False)
# embed current time in node name
node_name = pgdir_name or f'pg_node_{time.time()}'
return pg.create_start(branch_name=branch,
node_name=node_name,
config_lines=['log_statement=all'])
async def exec_compute_query(env: NeonEnv,
branch: str,
query: str,
pgdir_name: Optional[str] = None):
with postgres_create_start(env, branch=branch, pgdir_name=pgdir_name) as pg:
before_conn = time.time()
conn = await pg.connect_async()
res = await conn.fetch(query)
await conn.close()
after_conn = time.time()
log.info(f'{query} took {after_conn - before_conn}s')
return res
async def run_compute_restarts(env: NeonEnv,
queries=16,
batch_insert=10000,
branch='test_compute_restarts'):
cnt = 0
sum = 0
await exec_compute_query(env, branch, 'CREATE TABLE t (i int)')
for i in range(queries):
if i % 4 == 0:
await exec_compute_query(
env, branch, f'INSERT INTO t SELECT 1 FROM generate_series(1, {batch_insert})')
sum += batch_insert
cnt += batch_insert
elif (i % 4 == 1) or (i % 4 == 3):
# Note that select causes lots of FPI's and increases probability of safekeepers
# standing at different LSNs after compute termination.
actual_sum = (await exec_compute_query(env, branch, 'SELECT SUM(i) FROM t'))[0][0]
assert actual_sum == sum, f'Expected sum={sum}, actual={actual_sum}'
elif i % 4 == 2:
await exec_compute_query(env, branch, 'UPDATE t SET i = i + 1')
sum += cnt
# Add a test which creates compute for every query, and then destroys it right after.
def test_compute_restarts(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_compute_restarts')
asyncio.run(run_compute_restarts(env))
class BackgroundCompute(object):
MAX_QUERY_GAP_SECONDS = 2
def __init__(self, index: int, env: NeonEnv, branch: str):
self.index = index
self.env = env
self.branch = branch
self.running = False
self.stopped = False
self.total_tries = 0
self.successful_queries: List[int] = []
async def run(self):
if self.running:
raise Exception('BackgroundCompute is already running')
self.running = True
i = 0
while not self.stopped:
try:
verify_key = (self.index << 16) + i
i += 1
self.total_tries += 1
res = await exec_compute_query(
self.env,
self.branch,
f'INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key',
pgdir_name=f'bgcompute{self.index}_key{verify_key}',
)
log.info(f'result: {res}')
if len(res) != 1:
raise Exception('No result returned')
if res[0][0] != verify_key:
raise Exception('Wrong result returned')
self.successful_queries.append(verify_key)
except Exception as e:
log.info(f'BackgroundCompute {self.index} query failed: {e}')
# With less sleep, there is a very big chance of not committing
# anything or only 1 xact during test run.
await asyncio.sleep(random.uniform(0, self.MAX_QUERY_GAP_SECONDS))
self.running = False
async def run_concurrent_computes(env: NeonEnv,
num_computes=10,
run_seconds=20,
branch='test_concurrent_computes'):
await exec_compute_query(
env,
branch,
'CREATE TABLE query_log (t timestamp default now(), index int, verify_key int)')
computes = [BackgroundCompute(i, env, branch) for i in range(num_computes)]
background_tasks = [asyncio.create_task(compute.run()) for compute in computes]
await asyncio.sleep(run_seconds)
log.info("stopping all tasks but one")
for compute in computes[1:]:
compute.stopped = True
await asyncio.gather(*background_tasks[1:])
log.info("stopped all tasks but one")
# work for some time with only one compute -- it should be able to make some xacts
TIMEOUT_SECONDS = computes[0].MAX_QUERY_GAP_SECONDS + 3
initial_queries_by_0 = len(computes[0].successful_queries)
log.info(f'Waiting for another query by computes[0], '
f'it already had {initial_queries_by_0}, timeout is {TIMEOUT_SECONDS}s')
for _ in range(10 * TIMEOUT_SECONDS):
current_queries_by_0 = len(computes[0].successful_queries) - initial_queries_by_0
if current_queries_by_0 >= 1:
log.info(f'Found {current_queries_by_0} successful queries '
f'by computes[0], completing the test')
break
await asyncio.sleep(0.1)
else:
assert False, "Timed out while waiting for another query by computes[0]"
computes[0].stopped = True
await asyncio.gather(background_tasks[0])
result = await exec_compute_query(env, branch, 'SELECT * FROM query_log')
# we should have inserted something while single compute was running
log.info(f'Executed {len(result)} queries, {current_queries_by_0} of them '
f'by computes[0] after we started stopping the others')
for row in result:
log.info(f'{row[0]} {row[1]} {row[2]}')
# ensure everything reported as committed wasn't lost
for compute in computes:
for verify_key in compute.successful_queries:
assert verify_key in [row[2] for row in result]
# Run multiple computes concurrently, creating-destroying them after single
# query. Ensure we don't lose any xacts reported as committed and be able to
# progress once only one compute remains.
def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_concurrent_computes')
asyncio.run(run_concurrent_computes(env))
# Stop safekeeper and check that query cannot be executed while safekeeper is down.
# Query will insert a single row into a table.
async def check_unavailability(sk: Safekeeper,
conn: asyncpg.Connection,
key: int,
start_delay_sec: int = 2):
# shutdown one of two acceptors, that is, majority
sk.stop()
bg_query = asyncio.create_task(conn.execute(f"INSERT INTO t values ({key}, 'payload')"))
await asyncio.sleep(start_delay_sec)
# ensure that the query has not been executed yet
assert not bg_query.done()
# start safekeeper and await the query
sk.start()
await bg_query
assert bg_query.done()
async def run_unavailability(env: NeonEnv, pg: Postgres):
conn = await pg.connect_async()
# check basic work with table
await conn.execute('CREATE TABLE t(key int primary key, value text)')
await conn.execute("INSERT INTO t values (1, 'payload')")
# stop safekeeper and check that query cannot be executed while safekeeper is down
await check_unavailability(env.safekeepers[0], conn, 2)
# for the world's balance, do the same with second safekeeper
await check_unavailability(env.safekeepers[1], conn, 3)
# check that we can execute queries after restart
await conn.execute("INSERT INTO t values (4, 'payload')")
result_sum = await conn.fetchval('SELECT sum(key) FROM t')
assert result_sum == 10
# When majority of acceptors is offline, commits are expected to be frozen
def test_unavailability(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 2
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_unavailability')
pg = env.postgres.create_start('test_safekeepers_unavailability')
asyncio.run(run_unavailability(env, pg))
@dataclass
class RaceConditionTest:
iteration: int
is_stopped: bool
# shut down random subset of safekeeper, sleep, wake them up, rinse, repeat
async def xmas_garland(safekeepers: List[Safekeeper], data: RaceConditionTest):
while not data.is_stopped:
data.iteration += 1
victims = []
for sk in safekeepers:
if random.random() >= 0.5:
victims.append(sk)
log.info(
f'Iteration {data.iteration}: stopping {list(map(lambda sk: sk.id, victims))} safekeepers'
)
for v in victims:
v.stop()
await asyncio.sleep(1)
for v in victims:
v.start()
log.info(f'Iteration {data.iteration} finished')
await asyncio.sleep(1)
async def run_race_conditions(env: NeonEnv, pg: Postgres):
conn = await pg.connect_async()
await conn.execute('CREATE TABLE t(key int primary key, value text)')
data = RaceConditionTest(0, False)
bg_xmas = asyncio.create_task(xmas_garland(env.safekeepers, data))
n_iterations = 5
expected_sum = 0
i = 1
while data.iteration <= n_iterations:
await asyncio.sleep(0.005)
await conn.execute(f"INSERT INTO t values ({i}, 'payload')")
expected_sum += i
i += 1
log.info(f'Executed {i-1} queries')
res = await conn.fetchval('SELECT sum(key) FROM t')
assert res == expected_sum
data.is_stopped = True
await bg_xmas
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
asyncio.run(run_race_conditions(env, pg))
# Check that pageserver can select safekeeper with largest commit_lsn
# and switch if LSN is not updated for some time (NoWalTimeout).
async def run_wal_lagging(env: NeonEnv, pg: Postgres):
def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str:
# use ports 10, 11 and 12 to simulate unavailable safekeepers
return ','.join([
f'localhost:{sk.port.pg if active else 10 + i}'
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk))
])
conn = await pg.connect_async()
await conn.execute('CREATE TABLE t(key int primary key, value text)')
await conn.close()
pg.stop()
n_iterations = 20
n_txes = 10000
expected_sum = 0
i = 1
quorum = len(env.safekeepers) // 2 + 1
for it in range(n_iterations):
active_sk = list(map(lambda _: random.random() >= 0.5, env.safekeepers))
active_count = sum(active_sk)
if active_count < quorum:
it -= 1
continue
pg.adjust_for_safekeepers(safekeepers_guc(env, active_sk))
log.info(f'Iteration {it}: {active_sk}')
pg.start()
conn = await pg.connect_async()
for _ in range(n_txes):
await conn.execute(f"INSERT INTO t values ({i}, 'payload')")
expected_sum += i
i += 1
await conn.close()
pg.stop()
pg.adjust_for_safekeepers(safekeepers_guc(env, [True] * len(env.safekeepers)))
pg.start()
conn = await pg.connect_async()
log.info(f'Executed {i-1} queries')
res = await conn.fetchval('SELECT sum(key) FROM t')
assert res == expected_sum
# do inserts while restarting postgres and messing with safekeeper addresses
def test_wal_lagging(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_wal_lagging')
pg = env.postgres.create_start('test_wal_lagging')
asyncio.run(run_wal_lagging(env, pg))