Pull clone timeline from peer safekeepers (#4089)

Add HTTP endpoint to initialize safekeeper timeline from peer
safekeepers. This is useful for initializing new safekeeper to replace
failed safekeeper. Not fully "correct" in all cases, but should work in
most.

This code is not suitable for production workloads but can be tested on
staging to get started. New endpoint is separated from usual cases and
should not affect anything if no one explicitly uses a new endpoint. We
can rollback this commit in case of issues.
This commit is contained in:
Arthur Petukhovsky
2023-04-28 17:20:46 +03:00
committed by GitHub
parent ec53c5ca2e
commit 8543485e92
10 changed files with 424 additions and 7 deletions

View File

@@ -2661,6 +2661,13 @@ class SafekeeperHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_create(
self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn
):

View File

@@ -1254,3 +1254,98 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
with closing(endpoint_other.connect()) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO t (key) VALUES (123)")
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
cur.execute("INSERT INTO t VALUES (0, 'something')")
sum_before = query_scalar(cur, "SELECT SUM(key) FROM t")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
sum_after = query_scalar(cur, "SELECT SUM(key) FROM t")
assert sum_after == sum_before + 5000050000
def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
for sk in safekeepers:
http_cli = sk.http_client()
try:
status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"Safekeeper {sk.id} status: {status}")
except Exception as e:
log.info(f"Safekeeper {sk.id} status error: {e}")
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_pull_timeline")
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
active_safekeepers = [1, 2, 3]
endpoint = env.endpoints.create("test_pull_timeline")
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
endpoint.start()
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Kill safekeeper 2, continue with payload")
env.safekeepers[1].stop(immediate=True)
execute_payload(endpoint)
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
env.safekeepers[3].start()
res = (
env.safekeepers[3]
.http_client()
.pull_timeline(
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"http_hosts": [
f"http://localhost:{env.safekeepers[0].port.http}",
f"http://localhost:{env.safekeepers[2].port.http}",
],
}
)
)
log.info("Finished pulling timeline")
log.info(res)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Restarting compute with new config to verify that it works")
active_safekeepers = [1, 3, 4]
endpoint.stop_and_destroy().create("test_pull_timeline")
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
endpoint.start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Stop sk1 (simulate failure) and use only quorum of sk3 and sk4")
env.safekeepers[0].stop(immediate=True)
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Restart sk4 and and use quorum of sk1 and sk4")
env.safekeepers[3].stop()
env.safekeepers[2].stop()
env.safekeepers[0].start()
env.safekeepers[3].start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)