Compare commits

...

3 Commits

Author SHA1 Message Date
Arthur Petukhovsky
686d199acf Run yapf 2021-12-09 12:30:20 +03:00
Arthur Petukhovsky
60783b986d Add epoch switch 2021-12-09 12:16:07 +03:00
Arthur Petukhovsky
1de822f1b0 Create test_sync_safekeepers_old_term_ahead 2021-12-09 12:13:02 +03:00

View File

@@ -344,7 +344,7 @@ class ProposerPostgres:
f"wal_acceptors = '{wal_acceptors}'\n",
])
def sync_safekeepers(self) -> str:
def sync_safekeepers(self, timeout=60) -> str:
"""
Run 'postgres --sync-safekeepers'.
Returns execution result, which is commit_lsn after sync.
@@ -355,7 +355,7 @@ class ProposerPostgres:
"PGDATA": self.pg_data_dir_path(),
}
basepath = self.pg_bin.run_capture(command, env)
basepath = self.pg_bin.run_capture(command, env, timeout=timeout)
stdout_filename = basepath + '.stdout'
with open(stdout_filename, 'r') as stdout_f:
@@ -438,3 +438,90 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
assert epoch_after_reboot > epoch
class WalAppender:
"""Helper for appending WAL to safekeepers, keeps track of last inserted lsn."""
# 0/16B9188 is good lsn to start with, it's valid and not in the segment start, nor in zero segment
def __init__(self,
acceptors,
tenant_id,
timeline_id,
epoch_start_lsn=0x16B9188,
begin_lsn=0x16B9188):
self.acceptors = acceptors
self.epoch_start_lsn = epoch_start_lsn
self.begin_lsn = begin_lsn
self.tenant_id = tenant_id
self.timeline_id = timeline_id
self.flush_lsns = dict()
def append(self, i, term, lm_message="message", lm_prefix="", set_commit_lsn=False):
"""Append new logical message to i'th safekeeper."""
lsn = self.flush_lsns.get(i, self.begin_lsn)
req = {
"lm_prefix": lm_prefix,
"lm_message": lm_message,
"set_commit_lsn": set_commit_lsn,
"term": term,
"begin_lsn": lsn,
"epoch_start_lsn": self.epoch_start_lsn,
"truncate_lsn": self.begin_lsn,
}
res = self.acceptors[i].append_logical_message(
self.tenant_id,
self.timeline_id,
req,
)
end_lsn = res["inserted_wal"]["end_lsn"]
self.flush_lsns[i] = end_lsn
return res
def debug_print(self):
"""Print lsn for each acceptor."""
for i, lsn in self.flush_lsns.items():
print(f'end_lsn for acceptors[{i}] = {lsn_to_hex(lsn)}')
# one safekeeper with old term and a lot of non-commited wal
def test_sync_safekeepers_old_term_ahead(repo_dir: str,
pg_bin: PgBin,
wa_factory: WalAcceptorFactory):
wa_factory.start_n_new(3)
timeline_id = uuid.uuid4().hex
tenant_id = uuid.uuid4().hex
# write config for proposer
pgdata_dir = os.path.join(repo_dir, "proposer_pgdata")
pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id)
pg.create_dir_config(wa_factory.get_connstrs())
# append wal to safekeepers
appender = WalAppender(wa_factory.instances, tenant_id, timeline_id)
appender.append(0, term=1, lm_message="msg1")
appender.append(1, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
new_epoch = appender.flush_lsns[0]
appender.epoch_start_lsn = new_epoch
appender.append(0, term=2, lm_message="msg2")
appender.append(1, term=2, lm_message="msg2")
# run sync safekeepers
# FIXME: fails with timeout
lsn_after_sync = pg.sync_safekeepers()
print(f"lsn after sync = {lsn_after_sync}")
appender.debug_print()