mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Create test_sync_safekeepers_old_term_ahead
This commit is contained in:
@@ -344,7 +344,7 @@ class ProposerPostgres:
|
||||
f"wal_acceptors = '{wal_acceptors}'\n",
|
||||
])
|
||||
|
||||
def sync_safekeepers(self) -> str:
|
||||
def sync_safekeepers(self, timeout=60) -> str:
|
||||
"""
|
||||
Run 'postgres --sync-safekeepers'.
|
||||
Returns execution result, which is commit_lsn after sync.
|
||||
@@ -355,7 +355,7 @@ class ProposerPostgres:
|
||||
"PGDATA": self.pg_data_dir_path(),
|
||||
}
|
||||
|
||||
basepath = self.pg_bin.run_capture(command, env)
|
||||
basepath = self.pg_bin.run_capture(command, env, timeout=timeout)
|
||||
stdout_filename = basepath + '.stdout'
|
||||
|
||||
with open(stdout_filename, 'r') as stdout_f:
|
||||
@@ -438,3 +438,79 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
|
||||
assert epoch_after_reboot > epoch
|
||||
|
||||
|
||||
class WalAppender:
|
||||
"""Helper for appending WAL to safekeepers, keeps track of last inserted lsn."""
|
||||
|
||||
# 0/16B9188 is good lsn to start with, it's valid and not in the segment start, nor in zero segment
|
||||
def __init__(self, acceptors, tenant_id, timeline_id, epoch_start_lsn=0x16B9188, begin_lsn=0x16B9188):
|
||||
self.acceptors = acceptors
|
||||
self.epoch_start_lsn = epoch_start_lsn
|
||||
self.begin_lsn = begin_lsn
|
||||
self.tenant_id = tenant_id
|
||||
self.timeline_id = timeline_id
|
||||
self.flush_lsns = dict()
|
||||
|
||||
def append(self, i, term, lm_message = "message", lm_prefix = "", set_commit_lsn = False):
|
||||
"""Append new logical message to i'th safekeeper."""
|
||||
lsn = self.flush_lsns.get(i, self.begin_lsn)
|
||||
req = {
|
||||
"lm_prefix": lm_prefix,
|
||||
"lm_message": lm_message,
|
||||
"set_commit_lsn": set_commit_lsn,
|
||||
"term": term,
|
||||
"begin_lsn": lsn,
|
||||
"epoch_start_lsn": self.epoch_start_lsn,
|
||||
"truncate_lsn": self.begin_lsn,
|
||||
}
|
||||
|
||||
res = self.acceptors[i].append_logical_message(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
req,
|
||||
)
|
||||
|
||||
end_lsn = res["inserted_wal"]["end_lsn"]
|
||||
self.flush_lsns[i] = end_lsn
|
||||
|
||||
return res
|
||||
|
||||
def debug_print(self):
|
||||
"""Print lsn for each acceptor."""
|
||||
for i, lsn in self.flush_lsns.items():
|
||||
print(f'end_lsn for acceptors[{i}] = {lsn_to_hex(lsn)}')
|
||||
|
||||
# one safekeeper with old term and a lot of non-commited wal
|
||||
def test_sync_safekeepers_old_term_ahead(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory):
|
||||
wa_factory.start_n_new(3)
|
||||
|
||||
timeline_id = uuid.uuid4().hex
|
||||
tenant_id = uuid.uuid4().hex
|
||||
|
||||
# write config for proposer
|
||||
pgdata_dir = os.path.join(repo_dir, "proposer_pgdata")
|
||||
pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id)
|
||||
pg.create_dir_config(wa_factory.get_connstrs())
|
||||
|
||||
# append wal to safekeepers
|
||||
appender = WalAppender(wa_factory.instances, tenant_id, timeline_id)
|
||||
|
||||
appender.append(0, term=1, lm_message="msg1")
|
||||
appender.append(1, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
|
||||
appender.append(0, term=2, lm_message="msg2")
|
||||
appender.append(1, term=2, lm_message="msg2")
|
||||
|
||||
# run sync safekeepers
|
||||
# FIXME: fails with timeout
|
||||
lsn_after_sync = pg.sync_safekeepers()
|
||||
print(f"lsn after sync = {lsn_after_sync}")
|
||||
|
||||
appender.debug_print()
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: a97cfe8ed7...56c561aa77
Reference in New Issue
Block a user