mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Fix checkpoints race condition in safekeeper tests (#2175)
We should wait for WAL to arrive to pageserver before calling CHECKPOINT
This commit is contained in:
committed by
GitHub
parent
aeb3f0ea07
commit
09ddd34b2a
@@ -14,13 +14,43 @@ from contextlib import closing
|
||||
from dataclasses import dataclass, field
|
||||
from multiprocessing import Process, Value
|
||||
from pathlib import Path
|
||||
from fixtures.neon_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol
|
||||
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex
|
||||
from fixtures.log_helper import log
|
||||
from typing import List, Optional, Any
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint(tenant_id: str,
|
||||
timeline_id: str,
|
||||
pg: Postgres,
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options={}):
|
||||
lsn = lsn_from_hex(pg.safe_psql('SELECT pg_current_wal_flush_lsn()')[0][0])
|
||||
log.info(f"pg_current_wal_flush_lsn is {lsn_to_hex(lsn)}, waiting for it on pageserver")
|
||||
|
||||
auth_token = None
|
||||
if 'password' in pageserver_conn_options:
|
||||
auth_token = pageserver_conn_options['password']
|
||||
|
||||
# wait for the pageserver to catch up
|
||||
wait_for_last_record_lsn(ps.http_client(auth_token=auth_token),
|
||||
uuid.UUID(hex=tenant_id),
|
||||
uuid.UUID(hex=timeline_id),
|
||||
lsn)
|
||||
|
||||
# force checkpoint to advance remote_consistent_lsn
|
||||
with closing(ps.connect(**pageserver_conn_options)) as psconn:
|
||||
with psconn.cursor() as pscur:
|
||||
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
|
||||
|
||||
# ensure that remote_consistent_lsn is advanced
|
||||
wait_for_upload(ps.http_client(auth_token=auth_token),
|
||||
uuid.UUID(hex=tenant_id),
|
||||
uuid.UUID(hex=timeline_id),
|
||||
lsn)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelineMetrics:
|
||||
timeline_id: str
|
||||
@@ -223,10 +253,10 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
|
||||
log.info(f"statuses is {stat_before}")
|
||||
|
||||
pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
|
||||
# force checkpoint to advance remote_consistent_lsn
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor() as pscur:
|
||||
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
|
||||
|
||||
# force checkpoint in pageserver to advance remote_consistent_lsn
|
||||
wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver)
|
||||
|
||||
# and wait till remote_consistent_lsn propagates to all safekeepers
|
||||
started_at = time.time()
|
||||
while True:
|
||||
@@ -270,9 +300,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
pageserver_conn_options = {}
|
||||
if auth_enabled:
|
||||
pageserver_conn_options['password'] = env.auth_keys.generate_tenant_token(tenant_id)
|
||||
with closing(env.pageserver.connect(**pageserver_conn_options)) as psconn:
|
||||
with psconn.cursor() as pscur:
|
||||
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
|
||||
wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver, pageserver_conn_options)
|
||||
|
||||
# We will wait for first segment removal. Make sure they exist for starter.
|
||||
first_segments = [
|
||||
|
||||
Reference in New Issue
Block a user