diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index e5351912fa..0177555787 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -344,7 +344,7 @@ class ProposerPostgres: f"wal_acceptors = '{wal_acceptors}'\n", ]) - def sync_safekeepers(self) -> str: + def sync_safekeepers(self, timeout=60) -> str: """ Run 'postgres --sync-safekeepers'. Returns execution result, which is commit_lsn after sync. @@ -355,7 +355,7 @@ class ProposerPostgres: "PGDATA": self.pg_data_dir_path(), } - basepath = self.pg_bin.run_capture(command, env) + basepath = self.pg_bin.run_capture(command, env, timeout=timeout) stdout_filename = basepath + '.stdout' with open(stdout_filename, 'r') as stdout_f: @@ -438,3 +438,79 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder): epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch assert epoch_after_reboot > epoch + + +class WalAppender: + """Helper for appending WAL to safekeepers, keeps track of last inserted lsn.""" + + # 0/16B9188 is good lsn to start with, it's valid and not in the segment start, nor in zero segment + def __init__(self, acceptors, tenant_id, timeline_id, epoch_start_lsn=0x16B9188, begin_lsn=0x16B9188): + self.acceptors = acceptors + self.epoch_start_lsn = epoch_start_lsn + self.begin_lsn = begin_lsn + self.tenant_id = tenant_id + self.timeline_id = timeline_id + self.flush_lsns = dict() + + def append(self, i, term, lm_message = "message", lm_prefix = "", set_commit_lsn = False): + """Append new logical message to i'th safekeeper.""" + lsn = self.flush_lsns.get(i, self.begin_lsn) + req = { + "lm_prefix": lm_prefix, + "lm_message": lm_message, + "set_commit_lsn": set_commit_lsn, + "term": term, + "begin_lsn": lsn, + "epoch_start_lsn": self.epoch_start_lsn, + "truncate_lsn": self.begin_lsn, + } + + res = self.acceptors[i].append_logical_message( + self.tenant_id, + self.timeline_id, + req, + ) + + end_lsn = res["inserted_wal"]["end_lsn"] + self.flush_lsns[i] = end_lsn + + return res + + def debug_print(self): + """Print lsn for each acceptor.""" + for i, lsn in self.flush_lsns.items(): + print(f'end_lsn for acceptors[{i}] = {lsn_to_hex(lsn)}') + +# one safekeeper with old term and a lot of non-commited wal +def test_sync_safekeepers_old_term_ahead(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory): + wa_factory.start_n_new(3) + + timeline_id = uuid.uuid4().hex + tenant_id = uuid.uuid4().hex + + # write config for proposer + pgdata_dir = os.path.join(repo_dir, "proposer_pgdata") + pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id) + pg.create_dir_config(wa_factory.get_connstrs()) + + # append wal to safekeepers + appender = WalAppender(wa_factory.instances, tenant_id, timeline_id) + + appender.append(0, term=1, lm_message="msg1") + appender.append(1, term=1, lm_message="msg1") + appender.append(2, term=1, lm_message="msg1") + + appender.append(2, term=1, lm_message="msg1") + appender.append(2, term=1, lm_message="msg1") + appender.append(2, term=1, lm_message="msg1") + appender.append(2, term=1, lm_message="msg1") + + appender.append(0, term=2, lm_message="msg2") + appender.append(1, term=2, lm_message="msg2") + + # run sync safekeepers + # FIXME: fails with timeout + lsn_after_sync = pg.sync_safekeepers() + print(f"lsn after sync = {lsn_after_sync}") + + appender.debug_print() diff --git a/vendor/postgres b/vendor/postgres index a97cfe8ed7..56c561aa77 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit a97cfe8ed781857b2880bf1e0b4ff04972b9c8dc +Subproject commit 56c561aa770b01d9e5cfcbf064eebe33349b7f77