Merge remote-tracking branch 'origin/main' into vlad/read-path-concurrent-io

This commit is contained in:
Christian Schwarz
2025-01-20 19:46:44 +01:00
100 changed files with 5316 additions and 1317 deletions

View File

@@ -374,6 +374,7 @@ class NeonEnvBuilder:
pageserver_config_override: str | Callable[[dict[str, Any]], None] | None = None,
num_safekeepers: int = 1,
num_pageservers: int = 1,
num_azs: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
@@ -405,6 +406,7 @@ class NeonEnvBuilder:
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.num_pageservers = num_pageservers
self.num_azs = num_azs
self.safekeepers_id_start = safekeepers_id_start
self.safekeepers_enable_fsync = safekeepers_enable_fsync
self.auth_enabled = auth_enabled
@@ -994,6 +996,7 @@ class NeonEnv:
self.endpoints = EndpointFactory(self)
self.safekeepers: list[Safekeeper] = []
self.pageservers: list[NeonPageserver] = []
self.num_azs = config.num_azs
self.broker = NeonBroker(self)
self.pageserver_remote_storage = config.pageserver_remote_storage
self.safekeepers_remote_storage = config.safekeepers_remote_storage
@@ -1094,14 +1097,21 @@ class NeonEnv:
http=self.port_distributor.get_port(),
)
# Availabilty zones may also be configured manually with `NeonEnvBuilder.pageserver_config_override`
if self.num_azs > 1:
# Round-robin assignment of AZ names like us-east-2a, us-east-2b, etc.
az_prefix = DEFAULT_AZ_ID[:-1]
availability_zone = f"{az_prefix}{chr(ord('a') + (ps_id - 1) % self.num_azs)}"
else:
availability_zone = DEFAULT_AZ_ID
ps_cfg: dict[str, Any] = {
"id": ps_id,
"listen_pg_addr": f"localhost:{pageserver_port.pg}",
"listen_http_addr": f"localhost:{pageserver_port.http}",
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
# Default which can be overriden with `NeonEnvBuilder.pageserver_config_override`
"availability_zone": DEFAULT_AZ_ID,
"availability_zone": availability_zone,
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
"no_sync": True,
@@ -1895,7 +1905,10 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return response.json()
def tenant_list(self):
def tenant_shard_dump(self):
"""
Debug listing API: dumps the internal map of tenant shards
"""
response = self.request(
"GET",
f"{self.api}/debug/v1/tenant",
@@ -1903,6 +1916,18 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return response.json()
def tenant_list(self, **kwargs):
"""
Control API tenant listing: a vector of the same content returned by tenant_describe
"""
response = self.request(
"GET",
f"{self.api}/control/v1/tenant",
headers=self.headers(TokenScope.ADMIN),
params=kwargs,
)
return response.json()
def node_configure(self, node_id, body: dict[str, Any]):
log.info(f"node_configure({node_id}, {body})")
body["node_id"] = node_id
@@ -2249,7 +2274,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
Get the intent and observed placements of all tenants known to the storage controller.
"""
tenants = self.tenant_list()
tenants = self.tenant_shard_dump()
tenant_placement: defaultdict[str, dict[str, Any]] = defaultdict(
lambda: {
@@ -2332,6 +2357,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
json=body,
)
def safekeeper_scheduling_policy(self, id: int, scheduling_policy: str):
self.request(
"POST",
f"{self.api}/control/v1/safekeeper/{id}/scheduling_policy",
headers=self.headers(TokenScope.ADMIN),
json={"id": id, "scheduling_policy": scheduling_policy},
)
def get_safekeeper(self, id: int) -> dict[str, Any] | None:
try:
response = self.request(
@@ -4131,7 +4164,7 @@ class Endpoint(PgProtocol, LogUtils):
# Checkpoints running endpoint and returns pg_wal size in MB.
def get_pg_wal_size(self):
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
log.info(f"checkpointing at LSN {self.safe_psql('select pg_current_wal_lsn()')[0][0]}")
self.safe_psql("checkpoint")
assert self.pgdata_dir is not None # please mypy
return get_dir_size(self.pgdata_dir / "pg_wal") / 1024 / 1024
@@ -4971,7 +5004,7 @@ def logical_replication_sync(
if res:
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={publisher_lsn}")
if subscriber_lsn >= publisher_lsn:
return subscriber_lsn
time.sleep(0.5)

View File

@@ -15,7 +15,6 @@ from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fixtures.common_types import (
Id,
Lsn,
TenantId,
TenantShardId,
@@ -25,7 +24,7 @@ from fixtures.common_types import (
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.utils import Fn
from fixtures.utils import EnhancedJSONEncoder, Fn
class PageserverApiException(Exception):
@@ -83,14 +82,6 @@ class TimelineCreateRequest:
mode: TimelineCreateRequestMode
def to_json(self) -> str:
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
elif isinstance(o, Id):
return o.id.hex()
return super().default(o)
# mode is flattened
this = dataclasses.asdict(self)
mode = this.pop("mode")

View File

@@ -10,7 +10,7 @@ import requests
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.utils import wait_until
from fixtures.utils import EnhancedJSONEncoder, wait_until
if TYPE_CHECKING:
from typing import Any
@@ -25,6 +25,7 @@ class Walreceiver:
@dataclass
class SafekeeperTimelineStatus:
mconf: Configuration | None
term: int
last_log_term: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
@@ -69,6 +70,56 @@ class TermBumpResponse:
)
@dataclass
class SafekeeperId:
id: int
host: str
pg_port: int
@dataclass
class Configuration:
generation: int
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None
@classmethod
def from_json(cls, d: dict[str, Any]) -> Configuration:
generation = d["generation"]
members = d["members"]
new_members = d.get("new_members")
return Configuration(generation, members, new_members)
def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)
@dataclass
class TimelineCreateRequest:
tenant_id: TenantId
timeline_id: TimelineId
mconf: Configuration
# not exactly PgVersion, for example 150002 for 15.2
pg_version: int
start_lsn: Lsn
commit_lsn: Lsn | None
def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)
@dataclass
class TimelineMembershipSwitchResponse:
previous_conf: Configuration
current_conf: Configuration
@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
previous_conf = Configuration.from_json(d["previous_conf"])
current_conf = Configuration.from_json(d["current_conf"])
return TimelineMembershipSwitchResponse(previous_conf, current_conf)
class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError
@@ -131,20 +182,8 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
resj = res.json()
return [TenantTimelineId.from_json(ttidj) for ttidj in resj]
def timeline_create(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
commit_lsn: Lsn,
):
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"pg_version": pg_version,
"commit_lsn": str(commit_lsn),
}
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
def timeline_create(self, r: TimelineCreateRequest):
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", data=r.to_json())
res.raise_for_status()
def timeline_status(
@@ -154,7 +193,10 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
res.raise_for_status()
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
# It is always normally not None, it is allowed only to make forward compat tests happy.
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
return SafekeeperTimelineStatus(
mconf=mconf,
term=resj["acceptor_state"]["term"],
last_log_term=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
@@ -180,6 +222,11 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn
# Get timeline membership configuration.
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
# make mypy happy
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
@@ -226,6 +273,16 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> TimelineMembershipSwitchResponse:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",
data=to.to_json(),
)
res.raise_for_status()
return TimelineMembershipSwitchResponse.from_json(res.json())
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import contextlib
import dataclasses
import json
import os
import re
@@ -21,6 +22,7 @@ import zstandard
from psycopg2.extensions import cursor
from typing_extensions import override
from fixtures.common_types import Id, Lsn
from fixtures.log_helper import log
from fixtures.pageserver.common_types import (
parse_delta_layer,
@@ -605,6 +607,22 @@ class PropagatingThread(threading.Thread):
return self.ret
class EnhancedJSONEncoder(json.JSONEncoder):
"""
Default json.JSONEncoder works only on primitive builtins. Extend it to any
dataclass plus our custom types.
"""
def default(self, o):
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
elif isinstance(o, Id):
return o.id.hex()
elif isinstance(o, Lsn):
return str(o) # standard hex notation
return super().default(o)
def human_bytes(amt: float) -> str:
"""
Render a bytes amount into nice IEC bytes string.

View File

@@ -53,6 +53,22 @@ class Workload:
self._endpoint: Endpoint | None = None
self._endpoint_opts = endpoint_opts or {}
def branch(
self,
timeline_id: TimelineId,
branch_name: str | None = None,
endpoint_opts: dict[str, Any] | None = None,
) -> Workload:
"""
Checkpoint the current status of the workload in case of branching
"""
branch_workload = Workload(
self.env, self.tenant_id, timeline_id, branch_name, endpoint_opts
)
branch_workload.expect_rows = self.expect_rows
branch_workload.churn_cursor = self.churn_cursor
return branch_workload
def reconfigure(self) -> None:
"""
Request the endpoint to reconfigure based on location reported by storage controller

View File

@@ -112,7 +112,11 @@ page_cache_size=10
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"with_branches",
["with_branches", "no_branches"],
)
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_branches: str):
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
@@ -143,12 +147,17 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
child_workloads: list[Workload] = []
for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
if (i - 1) % 10 == 0:
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
if i % 10 == 5 and with_branches == "with_branches":
branch_name = f"child-{i}"
branch_timeline_id = env.create_branch(branch_name)
child_workloads.append(workload.branch(branch_timeline_id, branch_name))
if (i - 1) % 10 == 0 or (i - 1) % 10 == 1:
# Run gc-compaction twice every 10 rounds to ensure the test doesn't take too long time.
ps_http.timeline_compact(
tenant_id,
timeline_id,
@@ -179,6 +188,9 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
for child_workload in child_workloads:
log.info(f"Validating at branch {child_workload.branch_name}")
child_workload.validate(env.pageserver.id)
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)

View File

@@ -7,9 +7,78 @@ import threading
import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.utils import USE_LFC, query_scalar
"""
Test whether LFC doesn't error out when the LRU is empty, but the LFC is
already at its maximum size.
If we don't handle this safely, we might allocate more hash entries than
otherwise considered safe, thus causing ERRORs in hash_search(HASH_ENTER) once
we hit lfc->used >= lfc->limit.
"""
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_all_pinned(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size='1MB'",
"neon.file_cache_size_limit='1MB'",
],
)
top_cur = endpoint.connect().cursor()
stop = threading.Event()
n_rows = 10000
n_threads = 5
n_updates_per_connection = 1000
top_cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
top_cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates(n_updates_performed_q: queue.Queue[int]):
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
while not stop.is_set():
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
n_updates_performed_q.put(n_updates_performed)
n_updates_performed_q: queue.Queue[int] = queue.Queue()
threads: list[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
thread.start()
threads.append(thread)
time.sleep(15)
stop.set()
n_updates_performed = 0
for thread in threads:
thread.join()
n_updates_performed += n_updates_performed_q.get()
assert query_scalar(top_cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):

View File

@@ -4,9 +4,19 @@ import time
from fixtures.neon_fixtures import NeonEnv
BTREE_NUM_CYCLEID_PAGES = """
WITH raw_pages AS (
SELECT blkno, get_raw_page_at_lsn('t_uidx', 'main', blkno, NULL, NULL) page
FROM generate_series(1, pg_relation_size('t_uidx'::regclass) / 8192) blkno
WITH lsns AS (
/*
* pg_switch_wal() ensures we have an LSN that
* 1. is after any previous modifications, but also,
* 2. (critically) is flushed, preventing any issues with waiting for
* unflushed WAL in PageServer.
*/
SELECT pg_switch_wal() as lsn
),
raw_pages AS (
SELECT blkno, get_raw_page_at_lsn('t_uidx', 'main', blkno, lsn, lsn) page
FROM generate_series(1, pg_relation_size('t_uidx'::regclass) / 8192) AS blkno,
lsns l(lsn)
),
parsed_pages AS (
/* cycle ID is the last 2 bytes of the btree page */
@@ -36,7 +46,6 @@ def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
ses1.execute("CREATE UNIQUE INDEX t_uidx ON t(id);")
ses1.execute("INSERT INTO t (txt) SELECT i::text FROM generate_series(1, 2035) i;")
ses1.execute("SELECT neon_xlogflush();")
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()
assert (
@@ -57,7 +66,6 @@ def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
ses1.execute("DELETE FROM t WHERE id <= 610;")
# Flush wal, for checking purposes
ses1.execute("SELECT neon_xlogflush();")
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()
assert len(pages) == 0, f"No back splits with cycle ID expected, got batches of {pages} instead"
@@ -108,8 +116,6 @@ def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
# unpin the btree page, allowing s3's vacuum to complete
ses2.execute("FETCH ALL FROM foo;")
ses2.execute("ROLLBACK;")
# flush WAL to make sure PS is up-to-date
ses1.execute("SELECT neon_xlogflush();")
# check that our expectations are correct
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()

View File

@@ -113,6 +113,19 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
for tid in tenant_ids:
env.create_tenant(tid, shard_count=shards_per_tenant)
# Tenant listing API should work
listed_tenants = env.storage_controller.tenant_list()
log.info(f"listed_tenants: {listed_tenants}")
assert set(t["tenant_id"] for t in listed_tenants) == set(str(t) for t in tenant_ids)
paged = env.storage_controller.tenant_list(limit=2, start_after=listed_tenants[0]["tenant_id"])
assert len(paged) == 2
assert paged[0] == listed_tenants[1]
assert paged[1] == listed_tenants[2]
paged = env.storage_controller.tenant_list(
limit=1000, start_after="ffffffffffffffffffffffffffffffff"
)
assert paged == []
# Validate high level metrics
assert (
env.storage_controller.get_metric_value("storage_controller_tenant_shards")
@@ -1506,7 +1519,7 @@ class PageserverFailpoint(Failure):
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
tenants = env.storage_controller.tenant_list()
tenants = env.storage_controller.tenant_shard_dump()
node_to_tenants: dict[int, list[TenantId]] = {}
for t in tenants:
@@ -2381,6 +2394,7 @@ def test_storage_controller_node_deletion(
Test that deleting a node works & properly reschedules everything that was on the node.
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.num_azs = 3
env = neon_env_builder.init_configs()
env.start()
@@ -2394,6 +2408,9 @@ def test_storage_controller_node_deletion(
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
)
# Sanity check: initial creations should not leave the system in an unstable scheduling state
assert env.storage_controller.reconcile_all() == 0
victim = env.pageservers[-1]
# The procedure a human would follow is:
@@ -2631,7 +2648,7 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
# Validate that the storcon attempts to forward the request, but stops.
# when it realises it is still the current leader.
with pytest.raises(StorageControllerApiException, match="Leader is stepped down instance"):
env.storage_controller.tenant_list()
env.storage_controller.tenant_shard_dump()
# Validate that we can step down multiple times and the observed state
# doesn't change.
@@ -2781,7 +2798,7 @@ def test_storage_controller_leadership_transfer(
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_list()
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"
@@ -3195,6 +3212,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert eq_safekeeper_records(body, inserted_now)
# some small tests for the scheduling policy querying and returning APIs
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Pause"
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Decomissioned"
# Ensure idempotency
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
import pytest
@@ -253,29 +254,8 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder)
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "pause"))
def timeline_create():
try:
ps_http.timeline_create(env.pg_version, tenant_id, TimelineId.generate(), timeout=1)
raise RuntimeError("creation succeeded even though it shouldn't")
except ReadTimeout:
pass
Thread(target=timeline_create).start()
def hit_initdb_upload_failpoint():
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")
wait_until(hit_initdb_upload_failpoint)
def creation_connection_timed_out():
env.pageserver.assert_log_contains(
"POST.*/timeline.* request was dropped before completing"
)
# Wait so that we hit the timeout and the connection is dropped
# (But timeline creation still continues)
wait_until(creation_connection_timed_out)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "pause"))
ps_http.timeline_create(env.pg_version, tenant_id, TimelineId.generate(), timeout=1)
raise RuntimeError("creation succeeded even though it shouldn't")
def tenant_delete():
def tenant_delete_inner():
@@ -283,21 +263,46 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder)
wait_until(tenant_delete_inner)
Thread(target=tenant_delete).start()
# We will spawn background threads for timeline creation and tenant deletion. They will both
# get blocked on our failpoint.
with ThreadPoolExecutor(max_workers=1) as executor:
create_fut = executor.submit(timeline_create)
def deletion_arrived():
env.pageserver.assert_log_contains(
f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause"
)
def hit_initdb_upload_failpoint():
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")
wait_until(deletion_arrived)
wait_until(hit_initdb_upload_failpoint)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "off"))
def creation_connection_timed_out():
env.pageserver.assert_log_contains(
"POST.*/timeline.* request was dropped before completing"
)
# Disable the failpoint and wait for deletion to finish
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "off"))
# Wait so that we hit the timeout and the connection is dropped
# (But timeline creation still continues)
wait_until(creation_connection_timed_out)
ps_http.tenant_delete(tenant_id)
with pytest.raises(ReadTimeout):
# Our creation failed from the client's point of view.
create_fut.result()
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "pause"))
delete_fut = executor.submit(tenant_delete)
def deletion_arrived():
env.pageserver.assert_log_contains(
f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause"
)
wait_until(deletion_arrived)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "off"))
# Disable the failpoint and wait for deletion to finish
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "off"))
delete_fut.result()
# Physical deletion should have happened
assert_prefix_empty(

View File

@@ -194,7 +194,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
io_metrics = query_all_safekeepers(
"safekeeper_pg_io_bytes_total",
{
"app_name": "pageserver",
"app_name": f"pageserver-{env.pageserver.id}",
"client_az": "test_ps_az",
"dir": io_direction,
"same_az": "false",

View File

@@ -48,7 +48,12 @@ from fixtures.remote_storage import (
default_remote_storage,
s3_storage,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.http import (
Configuration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
PropagatingThread,
@@ -658,7 +663,13 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
mconf = Configuration(generation=0, members=[], new_members=None)
# set start_lsn to the beginning of the first segment to allow reading
# WAL from there (could you intidb LSN as well).
r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn("0/1000000"), commit_lsn=last_lsn
)
cli.timeline_create(r)
f_partial_path = (
Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
)
@@ -2237,6 +2248,63 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder):
wait_until(unevicted_on_dest, interval=0.1, timeout=1.0)
# Basic test for http API membership related calls: create timeline and switch
# configuration. Normally these are called by storage controller, but this
# allows to test them separately.
@run_only_on_default_postgres("tests only safekeeper API")
def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
sk = env.safekeepers[0]
http_cli = sk.http_client()
sk_id_1 = SafekeeperId(env.safekeepers[0].id, "localhost", sk.port.pg_tenant_only)
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
# Request to switch before timeline creation should fail.
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
# Create timeline.
create_r = TimelineCreateRequest(
tenant_id, timeline_id, init_conf, 150002, Lsn("0/1000000"), commit_lsn=None
)
log.info(f"sending {create_r.to_json()}")
http_cli.timeline_create(create_r)
# Switch into some conf.
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
assert resp.current_conf.generation == 4
# Restart sk, conf should be preserved.
sk.stop().start()
after_restart = http_cli.get_membership(tenant_id, timeline_id)
log.info(f"conf after restart: {after_restart}")
assert after_restart.generation == 4
# Switch into disjoint conf.
non_joint = Configuration(generation=5, members=[sk_id_2], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 5
# Switch request to lower conf should be ignored.
lower_conf = Configuration(generation=3, members=[], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
log.info(f"lower switch resp: {resp}")
assert resp.previous_conf.generation == 5
assert resp.current_conf.generation == 5
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt