walproposer: make it aware of membership (#11407)

## Problem

Walproposer should get elected and commit WAL on safekeepers specified
by the membership configuration.

## Summary of changes

- Add to wp `members_safekeepers` and `new_members_safekeepers` arrays
mapping configuration members to connection slots. Establish this
mapping (by node id) when safekeeper sends greeting, giving its id and
when mconf becomes known / changes.
- Add to TermsCollected, VotesCollected,
GetAcknowledgedByQuorumWALPosition membership aware logic. Currently it
partially duplicates existing one, but we'll drop the latter eventually.
- In python, rename Configuration to MembershipConfiguration for
clarity.
- Add test_quorum_sanity testing new logic.

ref https://github.com/neondatabase/neon/issues/10851
This commit is contained in:
Arseny Sher
2025-04-10 12:55:37 +03:00
committed by GitHub
parent 8a72e6f888
commit fae7528adb
6 changed files with 644 additions and 122 deletions

View File

@@ -79,7 +79,12 @@ from fixtures.remote_storage import (
default_remote_storage,
remote_storage_to_toml_dict,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.http import (
MembershipConfiguration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
@@ -4839,6 +4844,50 @@ class Safekeeper(LogUtils):
wait_until(paused)
@staticmethod
def sks_to_safekeeper_ids(sks: list[Safekeeper]) -> list[SafekeeperId]:
return [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in sks]
@staticmethod
def mconf_sks(env: NeonEnv, mconf: MembershipConfiguration) -> list[Safekeeper]:
"""
List of Safekeepers which are members in `mconf`.
"""
members_ids = [m.id for m in mconf.members]
new_members_ids = [m.id for m in mconf.new_members] if mconf.new_members is not None else []
return [sk for sk in env.safekeepers if sk.id in members_ids or sk.id in new_members_ids]
@staticmethod
def create_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
mconf: MembershipConfiguration,
members_sks: list[Safekeeper],
):
"""
Manually create timeline on safekeepers with given (presumably inital)
mconf: figure out LSN from pageserver, bake request and execute it on
given safekeepers.
Normally done by storcon, but some tests want to do it manually so far.
"""
ps_http_cli = ps.http_client()
# figure out initial LSN.
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
init_lsn = ps_timeline_detail["last_record_lsn"]
log.info(f"initial LSN: {init_lsn}")
# sk timeline creation request expects minor version
pg_version = ps_timeline_detail["pg_version"] * 10000
# create inital mconf
create_r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
)
log.info(f"sending timeline create: {create_r.to_json()}")
for sk in members_sks:
sk.http_client().timeline_create(create_r)
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""

View File

@@ -25,7 +25,7 @@ class Walreceiver:
@dataclass
class SafekeeperTimelineStatus:
mconf: Configuration | None
mconf: MembershipConfiguration | None
term: int
last_log_term: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
@@ -78,17 +78,17 @@ class SafekeeperId:
@dataclass
class Configuration:
class MembershipConfiguration:
generation: int
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None
@classmethod
def from_json(cls, d: dict[str, Any]) -> Configuration:
def from_json(cls, d: dict[str, Any]) -> MembershipConfiguration:
generation = d["generation"]
members = d["members"]
new_members = d.get("new_members")
return Configuration(generation, members, new_members)
return MembershipConfiguration(generation, members, new_members)
def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)
@@ -98,7 +98,7 @@ class Configuration:
class TimelineCreateRequest:
tenant_id: TenantId
timeline_id: TimelineId
mconf: Configuration
mconf: MembershipConfiguration
# not exactly PgVersion, for example 150002 for 15.2
pg_version: int
start_lsn: Lsn
@@ -110,13 +110,13 @@ class TimelineCreateRequest:
@dataclass
class TimelineMembershipSwitchResponse:
previous_conf: Configuration
current_conf: Configuration
previous_conf: MembershipConfiguration
current_conf: MembershipConfiguration
@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
previous_conf = Configuration.from_json(d["previous_conf"])
current_conf = Configuration.from_json(d["current_conf"])
previous_conf = MembershipConfiguration.from_json(d["previous_conf"])
current_conf = MembershipConfiguration.from_json(d["current_conf"])
return TimelineMembershipSwitchResponse(previous_conf, current_conf)
@@ -194,7 +194,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
# It is always normally not None, it is allowed only to make forward compat tests happy.
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
mconf = MembershipConfiguration.from_json(resj["mconf"]) if "mconf" in resj else None
return SafekeeperTimelineStatus(
mconf=mconf,
term=resj["acceptor_state"]["term"],
@@ -223,7 +223,9 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return self.timeline_status(tenant_id, timeline_id).commit_lsn
# Get timeline membership configuration.
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
def get_membership(
self, tenant_id: TenantId, timeline_id: TimelineId
) -> MembershipConfiguration:
# make mypy happy
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore
@@ -275,7 +277,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return res_json
def timeline_exclude(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
) -> dict[str, Any]:
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/exclude",
@@ -287,7 +289,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return res_json
def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
) -> TimelineMembershipSwitchResponse:
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",

View File

@@ -45,7 +45,7 @@ from fixtures.remote_storage import (
s3_storage,
)
from fixtures.safekeeper.http import (
Configuration,
MembershipConfiguration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
@@ -589,7 +589,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
mconf = Configuration(generation=0, members=[], new_members=None)
mconf = MembershipConfiguration(generation=0, members=[], new_members=None)
# set start_lsn to the beginning of the first segment to allow reading
# WAL from there (could you intidb LSN as well).
r = TimelineCreateRequest(
@@ -1948,7 +1948,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
# Request to switch before timeline creation should fail.
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
init_conf = MembershipConfiguration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
@@ -1960,7 +1960,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
http_cli.timeline_create(create_r)
# Switch into some conf.
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
joint_conf = MembershipConfiguration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
@@ -1973,24 +1973,26 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
assert after_restart.generation == 4
# Switch into non joint conf of which sk is not a member, must fail.
non_joint_not_member = Configuration(generation=5, members=[sk_id_2], new_members=None)
non_joint_not_member = MembershipConfiguration(
generation=5, members=[sk_id_2], new_members=None
)
with pytest.raises(requests.exceptions.HTTPError):
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint_not_member)
# Switch into good non joint conf.
non_joint = Configuration(generation=6, members=[sk_id_1], new_members=None)
non_joint = MembershipConfiguration(generation=6, members=[sk_id_1], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 6
# Switch request to lower conf should be rejected.
lower_conf = Configuration(generation=3, members=[sk_id_1], new_members=None)
lower_conf = MembershipConfiguration(generation=3, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
# Now, exclude sk from the membership, timeline should be deleted.
excluded_conf = Configuration(generation=7, members=[sk_id_2], new_members=None)
excluded_conf = MembershipConfiguration(generation=7, members=[sk_id_2], new_members=None)
http_cli.timeline_exclude(tenant_id, timeline_id, excluded_conf)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.timeline_status(tenant_id, timeline_id)
@@ -2010,11 +2012,6 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps = env.pageservers[0]
ps_http_cli = ps.http_client()
http_clis = [sk.http_client() for sk in env.safekeepers]
config_lines = [
"neon.safekeeper_proto_version = 3",
]
@@ -2023,22 +2020,11 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
# expected to fail because timeline is not created on safekeepers
with pytest.raises(Exception, match=r".*timed out.*"):
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s")
# figure out initial LSN.
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
init_lsn = ps_timeline_detail["last_record_lsn"]
log.info(f"initial LSN: {init_lsn}")
# sk timeline creation request expects minor version
pg_version = ps_timeline_detail["pg_version"] * 10000
# create inital mconf
sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers]
mconf = Configuration(generation=1, members=sk_ids, new_members=None)
create_r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
mconf = MembershipConfiguration(
generation=1, members=Safekeeper.sks_to_safekeeper_ids(env.safekeepers), new_members=None
)
log.info(f"sending timeline create: {create_r.to_json()}")
for sk_http_cli in http_clis:
sk_http_cli.timeline_create(create_r)
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, env.safekeepers)
# Once timeline created endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")

View File

@@ -18,6 +18,7 @@ from fixtures.neon_fixtures import (
Safekeeper,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.safekeeper.http import MembershipConfiguration
from fixtures.utils import skip_in_debug_build
if TYPE_CHECKING:
@@ -452,20 +453,24 @@ def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_concurrent_computes(env))
async def assert_query_hangs(endpoint: Endpoint, query: str):
"""
Start on endpoint query which is expected to hang and check that it does.
"""
conn = await endpoint.connect_async()
bg_query = asyncio.create_task(conn.execute(query))
await asyncio.sleep(2)
assert not bg_query.done()
return bg_query
# Stop safekeeper and check that query cannot be executed while safekeeper is down.
# Query will insert a single row into a table.
async def check_unavailability(
sk: Safekeeper, conn: asyncpg.Connection, key: int, start_delay_sec: int = 2
):
async def check_unavailability(sk: Safekeeper, ep: Endpoint, key: int, start_delay_sec: int = 2):
# shutdown one of two acceptors, that is, majority
sk.stop()
bg_query = asyncio.create_task(conn.execute(f"INSERT INTO t values ({key}, 'payload')"))
await asyncio.sleep(start_delay_sec)
# ensure that the query has not been executed yet
assert not bg_query.done()
bg_query = await assert_query_hangs(ep, f"INSERT INTO t values ({key}, 'payload')")
# start safekeeper and await the query
sk.start()
await bg_query
@@ -480,10 +485,10 @@ async def run_unavailability(env: NeonEnv, endpoint: Endpoint):
await conn.execute("INSERT INTO t values (1, 'payload')")
# stop safekeeper and check that query cannot be executed while safekeeper is down
await check_unavailability(env.safekeepers[0], conn, 2)
await check_unavailability(env.safekeepers[0], endpoint, 2)
# for the world's balance, do the same with second safekeeper
await check_unavailability(env.safekeepers[1], conn, 3)
await check_unavailability(env.safekeepers[1], endpoint, 3)
# check that we can execute queries after restart
await conn.execute("INSERT INTO t values (4, 'payload')")
@@ -514,15 +519,7 @@ async def run_recovery_uncommitted(env: NeonEnv):
# insert with only one safekeeper up to create tail of flushed but not committed WAL
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
await assert_query_hangs(ep, "insert into t select generate_series(1, 2000), 'payload'")
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
@@ -559,15 +556,7 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
await assert_query_hangs(ep, "insert into t select generate_series(1, 180000), 'Papaya'")
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
@@ -607,6 +596,132 @@ def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_versi
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
async def quorum_sanity_single(
env: NeonEnv,
compute_sks_ids: list[int],
members_sks_ids: list[int],
new_members_sks_ids: list[int] | None,
sks_to_stop_ids: list[int],
should_work_when_stopped: bool,
):
"""
*_ids params contain safekeeper node ids; it is assumed they are issued
from 1 and sequentially assigned to env.safekeepers.
"""
members_sks = [env.safekeepers[i - 1] for i in members_sks_ids]
new_members_sks = (
[env.safekeepers[i - 1] for i in new_members_sks_ids] if new_members_sks_ids else None
)
sks_to_stop = [env.safekeepers[i - 1] for i in sks_to_stop_ids]
mconf = MembershipConfiguration(
generation=1,
members=Safekeeper.sks_to_safekeeper_ids(members_sks),
new_members=Safekeeper.sks_to_safekeeper_ids(new_members_sks) if new_members_sks else None,
)
members_sks = Safekeeper.mconf_sks(env, mconf)
tenant_id = env.initial_tenant
compute_sks_ids_str = "-".join([str(sk_id) for sk_id in compute_sks_ids])
members_sks_ids_str = "-".join([str(sk.id) for sk in mconf.members])
new_members_sks_ids_str = "-".join(
[str(sk.id) for sk in mconf.new_members] if mconf.new_members is not None else []
)
sks_to_stop_ids_str = "-".join([str(sk.id) for sk in sks_to_stop])
log.info(
f"running quorum_sanity_single with compute_sks={compute_sks_ids_str}, members_sks={members_sks_ids_str}, new_members_sks={new_members_sks_ids_str}, sks_to_stop={sks_to_stop_ids_str}, should_work_when_stopped={should_work_when_stopped}"
)
branch_name = f"test_quorum_single_c{compute_sks_ids_str}_m{members_sks_ids_str}_{new_members_sks_ids_str}_s{sks_to_stop_ids_str}"
timeline_id = env.create_branch(branch_name)
# create timeline on `members_sks`
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create(branch_name, config_lines=config_lines)
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
ep.safe_psql("create table t(key int, value text)")
# stop specified sks and check whether writes work
for sk in sks_to_stop:
sk.stop()
if should_work_when_stopped:
log.info("checking that writes still work")
ep.safe_psql("insert into t select generate_series(1, 100), 'Papaya'")
# restarting ep should also be fine
ep.stop()
ep.start()
ep.safe_psql("insert into t select generate_series(1, 100), 'plum'")
bg_query = None
else:
log.info("checking that writes hang")
bg_query = await assert_query_hangs(
ep, "insert into t select generate_series(1, 100), 'Papaya'"
)
# start again; now they should work
for sk in sks_to_stop:
sk.start()
if bg_query:
log.info("awaiting query")
await bg_query
# It's a bit tempting to iterate over all possible combinations, but let's stick
# with this for now.
async def run_quorum_sanity(env: NeonEnv):
# 3 members, all up, should work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [], True)
# 3 members, 2/3 up, should work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [3], True)
# 3 members, 1/3 up, should not work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [2, 3], False)
# 3 members, all up, should work; wp redundantly talks to 4th.
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], None, [], True)
# 3 members, all up, should work with wp talking to 2 of these 3 + plus one redundant
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [], True)
# 3 members, 2/3 up, could work but wp talks to different 3s, so it shouldn't
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [3], False)
# joint conf of 1-2-3 and 4, all up, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [], True)
# joint conf of 1-2-3 and 4, 4 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [4], False)
# joint conf of 1-2-3 and 2-3-4, all up, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
# joint conf of 1-2-3 and 2-3-4, 1 and 4 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 4], True)
# joint conf of 1-2-3 and 2-3-4, 2 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2], True)
# joint conf of 1-2-3 and 2-3-4, 3 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [3], True)
# joint conf of 1-2-3 and 2-3-4, 1 and 2 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 2], False)
# joint conf of 1-2-3 and 2-3-4, 2 and 4 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2, 4], False)
# joint conf of 1-2-3 and 2-3-4 with wp talking to 2-3-4 only.
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
# with 1 down should still be ok
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [1], True)
# but with 2 down not ok
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [2], False)
# Test various combinations of membership configurations / neon.safekeepers
# (list of safekeepers endpoint connects to) values / up & down safekeepers and
# check that endpont can start and write data when we have quorum and can't when
# we don't.
def test_quorum_sanity(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
asyncio.run(run_quorum_sanity(env))
async def run_segment_init_failure(env: NeonEnv):
env.create_branch("test_segment_init_failure")
ep = env.endpoints.create_start("test_segment_init_failure")