mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
Wait for safekeepers to catch up in test_restarts_under_load (#776)
This commit is contained in:
committed by
GitHub
parent
85116a8375
commit
13f4e173c9
@@ -1,9 +1,11 @@
|
||||
import asyncio
|
||||
import asyncpg
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.zenith_fixtures import WalAcceptor, WalAcceptorFactory, ZenithPageserver, PostgresFactory, Postgres
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from typing import List
|
||||
|
||||
log = getLogger('root.wal_acceptor_async')
|
||||
@@ -102,6 +104,38 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou
|
||||
await pg_conn.close()
|
||||
|
||||
|
||||
async def wait_for_lsn(safekeeper: WalAcceptor,
|
||||
tenant_id: str,
|
||||
timeline_id: str,
|
||||
wait_lsn: str,
|
||||
polling_interval=1,
|
||||
timeout=600):
|
||||
"""
|
||||
Poll flush_lsn from safekeeper until it's greater or equal than
|
||||
provided wait_lsn. To do that, timeline_status is fetched from
|
||||
safekeeper every polling_interval seconds.
|
||||
"""
|
||||
|
||||
started_at = time.time()
|
||||
client = safekeeper.http_client()
|
||||
|
||||
flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
log.info(
|
||||
f'Safekeeper at port {safekeeper.port.pg} has flush_lsn {flush_lsn}, waiting for lsn {wait_lsn}'
|
||||
)
|
||||
|
||||
while lsn_from_hex(wait_lsn) > lsn_from_hex(flush_lsn):
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > timeout:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting for safekeeper at port {safekeeper.port.pg} to reach {wait_lsn}, current lsn is {flush_lsn}"
|
||||
)
|
||||
|
||||
await asyncio.sleep(polling_interval)
|
||||
flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
log.debug(f'safekeeper port={safekeeper.port.pg} flush_lsn={flush_lsn} wait_lsn={wait_lsn}')
|
||||
|
||||
|
||||
# This test will run several iterations and check progress in each of them.
|
||||
# On each iteration 1 acceptor is stopped, and 2 others should allow
|
||||
# background workers execute transactions. In the end, state should remain
|
||||
@@ -114,6 +148,9 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_
|
||||
iterations = 6
|
||||
|
||||
pg_conn = await pg.connect_async()
|
||||
tenant_id = await pg_conn.fetchval("show zenith.zenith_tenant")
|
||||
timeline_id = await pg_conn.fetchval("show zenith.zenith_timeline")
|
||||
|
||||
bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount)
|
||||
# create tables and initial balances
|
||||
await bank.initdb()
|
||||
@@ -125,14 +162,18 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_
|
||||
workers.append(asyncio.create_task(worker))
|
||||
|
||||
for it in range(iterations):
|
||||
victim = acceptors[it % len(acceptors)]
|
||||
victim_idx = it % len(acceptors)
|
||||
victim = acceptors[victim_idx]
|
||||
victim.stop()
|
||||
|
||||
# Wait till previous victim recovers so it is ready for the next
|
||||
# iteration by making any writing xact.
|
||||
conn = await pg.connect_async()
|
||||
await conn.execute('UPDATE bank_accs SET amount = amount WHERE uid = 1', timeout=120)
|
||||
await conn.close()
|
||||
flush_lsn = await pg_conn.fetchval('SELECT pg_current_wal_flush_lsn()')
|
||||
flush_lsn = lsn_to_hex(flush_lsn)
|
||||
log.info(f'Postgres flush_lsn {flush_lsn}')
|
||||
|
||||
# Wait until alive safekeepers catch up with postgres
|
||||
for idx, safekeeper in enumerate(acceptors):
|
||||
if idx != victim_idx:
|
||||
await wait_for_lsn(safekeeper, tenant_id, timeline_id, flush_lsn)
|
||||
|
||||
stats.reset()
|
||||
await asyncio.sleep(period_time)
|
||||
|
||||
@@ -63,3 +63,9 @@ def global_counter() -> int:
|
||||
def lsn_to_hex(num: int) -> str:
|
||||
""" Convert lsn from int to standard hex notation. """
|
||||
return "{:X}/{:X}".format(num >> 32, num & 0xffffffff)
|
||||
|
||||
|
||||
def lsn_from_hex(lsn_hex: str) -> int:
|
||||
""" Convert lsn from hex notation to int. """
|
||||
l, r = lsn_hex.split('/')
|
||||
return (int(l, 16) << 32) + int(r, 16)
|
||||
|
||||
@@ -1031,8 +1031,9 @@ def wa_factory(zenith_binpath: str,
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverTimelineStatus:
|
||||
class SafekeeperTimelineStatus:
|
||||
acceptor_epoch: int
|
||||
flush_lsn: str
|
||||
|
||||
|
||||
class WalAcceptorHttpClient(requests.Session):
|
||||
@@ -1043,11 +1044,12 @@ class WalAcceptorHttpClient(requests.Session):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def timeline_status(self, tenant_id: str, timeline_id: str) -> PageserverTimelineStatus:
|
||||
def timeline_status(self, tenant_id: str, timeline_id: str) -> SafekeeperTimelineStatus:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
|
||||
res.raise_for_status()
|
||||
resj = res.json()
|
||||
return PageserverTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'])
|
||||
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
|
||||
flush_lsn=resj['flush_lsn'])
|
||||
|
||||
|
||||
@zenfixture
|
||||
|
||||
@@ -49,6 +49,8 @@ struct TimelineStatus {
|
||||
commit_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
truncate_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
flush_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Report info about timeline.
|
||||
@@ -64,6 +66,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
)
|
||||
.map_err(ApiError::from_err)?;
|
||||
let sk_state = tli.get_info();
|
||||
let (flush_lsn, _) = tli.get_end_of_wal();
|
||||
|
||||
let status = TimelineStatus {
|
||||
tenant_id,
|
||||
@@ -71,6 +74,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
acceptor_state: sk_state.acceptor_state,
|
||||
commit_lsn: sk_state.commit_lsn,
|
||||
truncate_lsn: sk_state.truncate_lsn,
|
||||
flush_lsn,
|
||||
};
|
||||
Ok(json_response(StatusCode::OK, status)?)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user