Merge branch 'main' into 22037-basic-fast-import-e2e

This commit is contained in:
Gleb Novikov
2025-01-16 16:46:53 +00:00
97 changed files with 5910 additions and 1556 deletions

View File

@@ -1884,7 +1884,10 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return response.json()
def tenant_list(self):
def tenant_shard_dump(self):
"""
Debug listing API: dumps the internal map of tenant shards
"""
response = self.request(
"GET",
f"{self.api}/debug/v1/tenant",
@@ -1892,6 +1895,18 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return response.json()
def tenant_list(self, **kwargs):
"""
Control API tenant listing: a vector of the same content returned by tenant_describe
"""
response = self.request(
"GET",
f"{self.api}/control/v1/tenant",
headers=self.headers(TokenScope.ADMIN),
params=kwargs,
)
return response.json()
def node_configure(self, node_id, body: dict[str, Any]):
log.info(f"node_configure({node_id}, {body})")
body["node_id"] = node_id
@@ -2238,7 +2253,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
Get the intent and observed placements of all tenants known to the storage controller.
"""
tenants = self.tenant_list()
tenants = self.tenant_shard_dump()
tenant_placement: defaultdict[str, dict[str, Any]] = defaultdict(
lambda: {
@@ -2321,6 +2336,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
json=body,
)
def safekeeper_scheduling_policy(self, id: int, scheduling_policy: str):
self.request(
"POST",
f"{self.api}/control/v1/safekeeper/{id}/scheduling_policy",
headers=self.headers(TokenScope.ADMIN),
json={"id": id, "scheduling_policy": scheduling_policy},
)
def get_safekeeper(self, id: int) -> dict[str, Any] | None:
try:
response = self.request(
@@ -4120,7 +4143,7 @@ class Endpoint(PgProtocol, LogUtils):
# Checkpoints running endpoint and returns pg_wal size in MB.
def get_pg_wal_size(self):
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
log.info(f"checkpointing at LSN {self.safe_psql('select pg_current_wal_lsn()')[0][0]}")
self.safe_psql("checkpoint")
assert self.pgdata_dir is not None # please mypy
return get_dir_size(self.pgdata_dir / "pg_wal") / 1024 / 1024
@@ -4960,7 +4983,7 @@ def logical_replication_sync(
if res:
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={publisher_lsn}")
if subscriber_lsn >= publisher_lsn:
return subscriber_lsn
time.sleep(0.5)

View File

@@ -15,7 +15,6 @@ from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fixtures.common_types import (
Id,
Lsn,
TenantId,
TenantShardId,
@@ -25,7 +24,7 @@ from fixtures.common_types import (
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.utils import Fn
from fixtures.utils import EnhancedJSONEncoder, Fn
class PageserverApiException(Exception):
@@ -83,14 +82,6 @@ class TimelineCreateRequest:
mode: TimelineCreateRequestMode
def to_json(self) -> str:
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
elif isinstance(o, Id):
return o.id.hex()
return super().default(o)
# mode is flattened
this = dataclasses.asdict(self)
mode = this.pop("mode")

View File

@@ -10,7 +10,7 @@ import requests
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.utils import wait_until
from fixtures.utils import EnhancedJSONEncoder, wait_until
if TYPE_CHECKING:
from typing import Any
@@ -25,6 +25,7 @@ class Walreceiver:
@dataclass
class SafekeeperTimelineStatus:
mconf: Configuration | None
term: int
last_log_term: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
@@ -69,6 +70,56 @@ class TermBumpResponse:
)
@dataclass
class SafekeeperId:
id: int
host: str
pg_port: int
@dataclass
class Configuration:
generation: int
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None
@classmethod
def from_json(cls, d: dict[str, Any]) -> Configuration:
generation = d["generation"]
members = d["members"]
new_members = d.get("new_members")
return Configuration(generation, members, new_members)
def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)
@dataclass
class TimelineCreateRequest:
tenant_id: TenantId
timeline_id: TimelineId
mconf: Configuration
# not exactly PgVersion, for example 150002 for 15.2
pg_version: int
start_lsn: Lsn
commit_lsn: Lsn | None
def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)
@dataclass
class TimelineMembershipSwitchResponse:
previous_conf: Configuration
current_conf: Configuration
@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
previous_conf = Configuration.from_json(d["previous_conf"])
current_conf = Configuration.from_json(d["current_conf"])
return TimelineMembershipSwitchResponse(previous_conf, current_conf)
class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError
@@ -131,20 +182,8 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
resj = res.json()
return [TenantTimelineId.from_json(ttidj) for ttidj in resj]
def timeline_create(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
commit_lsn: Lsn,
):
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"pg_version": pg_version,
"commit_lsn": str(commit_lsn),
}
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
def timeline_create(self, r: TimelineCreateRequest):
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", data=r.to_json())
res.raise_for_status()
def timeline_status(
@@ -154,7 +193,10 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
res.raise_for_status()
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
# It is always normally not None, it is allowed only to make forward compat tests happy.
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
return SafekeeperTimelineStatus(
mconf=mconf,
term=resj["acceptor_state"]["term"],
last_log_term=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
@@ -180,6 +222,11 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn
# Get timeline membership configuration.
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
# make mypy happy
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
@@ -226,6 +273,16 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> TimelineMembershipSwitchResponse:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",
data=to.to_json(),
)
res.raise_for_status()
return TimelineMembershipSwitchResponse.from_json(res.json())
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import contextlib
import dataclasses
import json
import os
import re
@@ -21,6 +22,7 @@ import zstandard
from psycopg2.extensions import cursor
from typing_extensions import override
from fixtures.common_types import Id, Lsn
from fixtures.log_helper import log
from fixtures.pageserver.common_types import (
parse_delta_layer,
@@ -605,6 +607,22 @@ class PropagatingThread(threading.Thread):
return self.ret
class EnhancedJSONEncoder(json.JSONEncoder):
"""
Default json.JSONEncoder works only on primitive builtins. Extend it to any
dataclass plus our custom types.
"""
def default(self, o):
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
elif isinstance(o, Id):
return o.id.hex()
elif isinstance(o, Lsn):
return str(o) # standard hex notation
return super().default(o)
def human_bytes(amt: float) -> str:
"""
Render a bytes amount into nice IEC bytes string.

View File

@@ -53,6 +53,22 @@ class Workload:
self._endpoint: Endpoint | None = None
self._endpoint_opts = endpoint_opts or {}
def branch(
self,
timeline_id: TimelineId,
branch_name: str | None = None,
endpoint_opts: dict[str, Any] | None = None,
) -> Workload:
"""
Checkpoint the current status of the workload in case of branching
"""
branch_workload = Workload(
self.env, self.tenant_id, timeline_id, branch_name, endpoint_opts
)
branch_workload.expect_rows = self.expect_rows
branch_workload.churn_cursor = self.churn_cursor
return branch_workload
def reconfigure(self) -> None:
"""
Request the endpoint to reconfigure based on location reported by storage controller

View File

@@ -112,7 +112,11 @@ page_cache_size=10
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"with_branches",
["with_branches", "no_branches"],
)
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_branches: str):
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
@@ -143,12 +147,17 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
child_workloads: list[Workload] = []
for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
if (i - 1) % 10 == 0:
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
if i % 10 == 5 and with_branches == "with_branches":
branch_name = f"child-{i}"
branch_timeline_id = env.create_branch(branch_name)
child_workloads.append(workload.branch(branch_timeline_id, branch_name))
if (i - 1) % 10 == 0 or (i - 1) % 10 == 1:
# Run gc-compaction twice every 10 rounds to ensure the test doesn't take too long time.
ps_http.timeline_compact(
tenant_id,
timeline_id,
@@ -179,6 +188,9 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
for child_workload in child_workloads:
log.info(f"Validating at branch {child_workload.branch_name}")
child_workload.validate(env.pageserver.id)
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)

View File

@@ -4,9 +4,19 @@ import time
from fixtures.neon_fixtures import NeonEnv
BTREE_NUM_CYCLEID_PAGES = """
WITH raw_pages AS (
SELECT blkno, get_raw_page_at_lsn('t_uidx', 'main', blkno, NULL, NULL) page
FROM generate_series(1, pg_relation_size('t_uidx'::regclass) / 8192) blkno
WITH lsns AS (
/*
* pg_switch_wal() ensures we have an LSN that
* 1. is after any previous modifications, but also,
* 2. (critically) is flushed, preventing any issues with waiting for
* unflushed WAL in PageServer.
*/
SELECT pg_switch_wal() as lsn
),
raw_pages AS (
SELECT blkno, get_raw_page_at_lsn('t_uidx', 'main', blkno, lsn, lsn) page
FROM generate_series(1, pg_relation_size('t_uidx'::regclass) / 8192) AS blkno,
lsns l(lsn)
),
parsed_pages AS (
/* cycle ID is the last 2 bytes of the btree page */
@@ -36,7 +46,6 @@ def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
ses1.execute("CREATE UNIQUE INDEX t_uidx ON t(id);")
ses1.execute("INSERT INTO t (txt) SELECT i::text FROM generate_series(1, 2035) i;")
ses1.execute("SELECT neon_xlogflush();")
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()
assert (
@@ -57,7 +66,6 @@ def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
ses1.execute("DELETE FROM t WHERE id <= 610;")
# Flush wal, for checking purposes
ses1.execute("SELECT neon_xlogflush();")
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()
assert len(pages) == 0, f"No back splits with cycle ID expected, got batches of {pages} instead"
@@ -108,8 +116,6 @@ def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
# unpin the btree page, allowing s3's vacuum to complete
ses2.execute("FETCH ALL FROM foo;")
ses2.execute("ROLLBACK;")
# flush WAL to make sure PS is up-to-date
ses1.execute("SELECT neon_xlogflush();")
# check that our expectations are correct
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()

View File

@@ -113,6 +113,19 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
for tid in tenant_ids:
env.create_tenant(tid, shard_count=shards_per_tenant)
# Tenant listing API should work
listed_tenants = env.storage_controller.tenant_list()
log.info(f"listed_tenants: {listed_tenants}")
assert set(t["tenant_id"] for t in listed_tenants) == set(str(t) for t in tenant_ids)
paged = env.storage_controller.tenant_list(limit=2, start_after=listed_tenants[0]["tenant_id"])
assert len(paged) == 2
assert paged[0] == listed_tenants[1]
assert paged[1] == listed_tenants[2]
paged = env.storage_controller.tenant_list(
limit=1000, start_after="ffffffffffffffffffffffffffffffff"
)
assert paged == []
# Validate high level metrics
assert (
env.storage_controller.get_metric_value("storage_controller_tenant_shards")
@@ -1506,7 +1519,7 @@ class PageserverFailpoint(Failure):
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
tenants = env.storage_controller.tenant_list()
tenants = env.storage_controller.tenant_shard_dump()
node_to_tenants: dict[int, list[TenantId]] = {}
for t in tenants:
@@ -2631,7 +2644,7 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
# Validate that the storcon attempts to forward the request, but stops.
# when it realises it is still the current leader.
with pytest.raises(StorageControllerApiException, match="Leader is stepped down instance"):
env.storage_controller.tenant_list()
env.storage_controller.tenant_shard_dump()
# Validate that we can step down multiple times and the observed state
# doesn't change.
@@ -2781,7 +2794,7 @@ def test_storage_controller_leadership_transfer(
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_list()
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"
@@ -3195,6 +3208,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert eq_safekeeper_records(body, inserted_now)
# some small tests for the scheduling policy querying and returning APIs
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Pause"
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Decomissioned"
# Ensure idempotency
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
import pytest
@@ -253,29 +254,8 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder)
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "pause"))
def timeline_create():
try:
ps_http.timeline_create(env.pg_version, tenant_id, TimelineId.generate(), timeout=1)
raise RuntimeError("creation succeeded even though it shouldn't")
except ReadTimeout:
pass
Thread(target=timeline_create).start()
def hit_initdb_upload_failpoint():
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")
wait_until(hit_initdb_upload_failpoint)
def creation_connection_timed_out():
env.pageserver.assert_log_contains(
"POST.*/timeline.* request was dropped before completing"
)
# Wait so that we hit the timeout and the connection is dropped
# (But timeline creation still continues)
wait_until(creation_connection_timed_out)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "pause"))
ps_http.timeline_create(env.pg_version, tenant_id, TimelineId.generate(), timeout=1)
raise RuntimeError("creation succeeded even though it shouldn't")
def tenant_delete():
def tenant_delete_inner():
@@ -283,21 +263,46 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder)
wait_until(tenant_delete_inner)
Thread(target=tenant_delete).start()
# We will spawn background threads for timeline creation and tenant deletion. They will both
# get blocked on our failpoint.
with ThreadPoolExecutor(max_workers=1) as executor:
create_fut = executor.submit(timeline_create)
def deletion_arrived():
env.pageserver.assert_log_contains(
f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause"
)
def hit_initdb_upload_failpoint():
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")
wait_until(deletion_arrived)
wait_until(hit_initdb_upload_failpoint)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "off"))
def creation_connection_timed_out():
env.pageserver.assert_log_contains(
"POST.*/timeline.* request was dropped before completing"
)
# Disable the failpoint and wait for deletion to finish
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "off"))
# Wait so that we hit the timeout and the connection is dropped
# (But timeline creation still continues)
wait_until(creation_connection_timed_out)
ps_http.tenant_delete(tenant_id)
with pytest.raises(ReadTimeout):
# Our creation failed from the client's point of view.
create_fut.result()
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "pause"))
delete_fut = executor.submit(tenant_delete)
def deletion_arrived():
env.pageserver.assert_log_contains(
f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause"
)
wait_until(deletion_arrived)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "off"))
# Disable the failpoint and wait for deletion to finish
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "off"))
delete_fut.result()
# Physical deletion should have happened
assert_prefix_empty(

View File

@@ -48,7 +48,12 @@ from fixtures.remote_storage import (
default_remote_storage,
s3_storage,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.http import (
Configuration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
PropagatingThread,
@@ -658,7 +663,13 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
mconf = Configuration(generation=0, members=[], new_members=None)
# set start_lsn to the beginning of the first segment to allow reading
# WAL from there (could you intidb LSN as well).
r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn("0/1000000"), commit_lsn=last_lsn
)
cli.timeline_create(r)
f_partial_path = (
Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
)
@@ -2237,6 +2248,63 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder):
wait_until(unevicted_on_dest, interval=0.1, timeout=1.0)
# Basic test for http API membership related calls: create timeline and switch
# configuration. Normally these are called by storage controller, but this
# allows to test them separately.
@run_only_on_default_postgres("tests only safekeeper API")
def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
sk = env.safekeepers[0]
http_cli = sk.http_client()
sk_id_1 = SafekeeperId(env.safekeepers[0].id, "localhost", sk.port.pg_tenant_only)
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
# Request to switch before timeline creation should fail.
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
# Create timeline.
create_r = TimelineCreateRequest(
tenant_id, timeline_id, init_conf, 150002, Lsn("0/1000000"), commit_lsn=None
)
log.info(f"sending {create_r.to_json()}")
http_cli.timeline_create(create_r)
# Switch into some conf.
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
assert resp.current_conf.generation == 4
# Restart sk, conf should be preserved.
sk.stop().start()
after_restart = http_cli.get_membership(tenant_id, timeline_id)
log.info(f"conf after restart: {after_restart}")
assert after_restart.generation == 4
# Switch into disjoint conf.
non_joint = Configuration(generation=5, members=[sk_id_2], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 5
# Switch request to lower conf should be ignored.
lower_conf = Configuration(generation=3, members=[], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
log.info(f"lower switch resp: {resp}")
assert resp.previous_conf.generation == 5
assert resp.current_conf.generation == 5
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt