Merge branch 'main' into cicd/debug-regress-tests-on-arm

This commit is contained in:
Andrey Taranik
2024-08-17 17:49:47 +03:00
20 changed files with 1401 additions and 314 deletions

View File

@@ -3,6 +3,7 @@ pytest_plugins = (
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -497,6 +497,7 @@ class NeonEnvBuilder:
pageserver_aux_file_policy: Optional[AuxFileStore] = None,
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None,
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -549,6 +550,8 @@ class NeonEnvBuilder:
self.safekeeper_extra_opts = safekeeper_extra_opts
self.storage_controller_port_override = storage_controller_port_override
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -1054,6 +1057,7 @@ class NeonEnv:
"""
BASE_PAGESERVER_ID = 1
storage_controller: NeonStorageController | NeonProxiedStorageController
def __init__(self, config: NeonEnvBuilder):
self.repo_dir = config.repo_dir
@@ -1084,27 +1088,41 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
# Find two adjacent ports for storage controller and its postgres DB. This
# loop would eventually throw from get_port() if we run out of ports (extremely
# unlikely): usually we find two adjacent free ports on the first iteration.
while True:
self.storage_controller_port = self.port_distributor.get_port()
storage_controller_pg_port = self.port_distributor.get_port()
if storage_controller_pg_port == self.storage_controller_port + 1:
break
# The URL for the pageserver to use as its control_plane_api config
self.control_plane_api: str = f"http://127.0.0.1:{self.storage_controller_port}/upcall/v1"
# The base URL of the storage controller
self.storage_controller_api: str = f"http://127.0.0.1:{self.storage_controller_port}"
if config.storage_controller_port_override is not None:
log.info(
f"Using storage controller api override {config.storage_controller_port_override}"
)
self.storage_controller_port = config.storage_controller_port_override
self.storage_controller = NeonProxiedStorageController(
self, config.storage_controller_port_override, config.auth_enabled
)
else:
# Find two adjacent ports for storage controller and its postgres DB. This
# loop would eventually throw from get_port() if we run out of ports (extremely
# unlikely): usually we find two adjacent free ports on the first iteration.
while True:
storage_controller_port = self.port_distributor.get_port()
storage_controller_pg_port = self.port_distributor.get_port()
if storage_controller_pg_port == storage_controller_port + 1:
break
self.storage_controller_port = storage_controller_port
self.storage_controller = NeonStorageController(
self, storage_controller_port, config.auth_enabled
)
log.info(
f"Using generated control_plane_api: {self.storage_controller.upcall_api_endpoint()}"
)
self.storage_controller_api: str = self.storage_controller.api_root()
self.control_plane_api: str = self.storage_controller.upcall_api_endpoint()
# For testing this with a fake HTTP server, enable passing through a URL from config
self.control_plane_compute_hook_api = config.control_plane_compute_hook_api
self.storage_controller: NeonStorageController = NeonStorageController(
self, config.auth_enabled
)
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
@@ -1869,16 +1887,24 @@ class NeonCli(AbstractNeonCli):
def storage_controller_start(
self,
timeout_in_seconds: Optional[int] = None,
instance_id: Optional[int] = None,
base_port: Optional[int] = None,
):
cmd = ["storage_controller", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
if instance_id is not None:
cmd.append(f"--instance-id={instance_id}")
if base_port is not None:
cmd.append(f"--base-port={base_port}")
return self.raw_cli(cmd)
def storage_controller_stop(self, immediate: bool):
def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None):
cmd = ["storage_controller", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
if instance_id is not None:
cmd.append(f"--instance-id={instance_id}")
return self.raw_cli(cmd)
def pageserver_start(
@@ -2189,17 +2215,30 @@ class PageserverSchedulingPolicy(str, Enum):
PAUSE_FOR_RESTART = "PauseForRestart"
class StorageControllerLeadershipStatus(str, Enum):
LEADER = "leader"
STEPPED_DOWN = "stepped_down"
CANDIDATE = "candidate"
class NeonStorageController(MetricsGetter, LogUtils):
def __init__(self, env: NeonEnv, auth_enabled: bool):
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
self.env = env
self.port: int = port
self.api: str = f"http://127.0.0.1:{port}"
self.running = False
self.auth_enabled = auth_enabled
self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS
self.logfile = self.workdir / "storage_controller.log"
self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log"
def start(self, timeout_in_seconds: Optional[int] = None):
def start(
self,
timeout_in_seconds: Optional[int] = None,
instance_id: Optional[int] = None,
base_port: Optional[int] = None,
):
assert not self.running
self.env.neon_cli.storage_controller_start(timeout_in_seconds)
self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port)
self.running = True
return self
@@ -2209,6 +2248,12 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.running = False
return self
def upcall_api_endpoint(self) -> str:
return f"{self.api}/upcall/v1"
def api_root(self) -> str:
return self.api
@staticmethod
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
@@ -2237,7 +2282,9 @@ class NeonStorageController(MetricsGetter, LogUtils):
def assert_no_errors(self):
assert_no_errors(
self.env.repo_dir / "storage_controller.log", "storage_controller", self.allowed_errors
self.logfile,
"storage_controller",
self.allowed_errors,
)
def pageserver_api(self) -> PageserverHttpClient:
@@ -2249,7 +2296,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
auth_token = None
if self.auth_enabled:
auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API)
return PageserverHttpClient(self.env.storage_controller_port, lambda: True, auth_token)
return PageserverHttpClient(self.port, lambda: True, auth_token)
def request(self, method, *args, **kwargs) -> requests.Response:
resp = requests.request(method, *args, **kwargs)
@@ -2266,13 +2313,13 @@ class NeonStorageController(MetricsGetter, LogUtils):
return headers
def get_metrics(self) -> Metrics:
res = self.request("GET", f"{self.env.storage_controller_api}/metrics")
res = self.request("GET", f"{self.api}/metrics")
return parse_metrics(res.text)
def ready(self) -> bool:
status = None
try:
resp = self.request("GET", f"{self.env.storage_controller_api}/ready")
resp = self.request("GET", f"{self.api}/ready")
status = resp.status_code
except StorageControllerApiException as e:
status = e.status_code
@@ -2305,7 +2352,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
response = self.request(
"POST",
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
f"{self.api}/debug/v1/attach-hook",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2316,7 +2363,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]):
self.request(
"POST",
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
f"{self.api}/debug/v1/attach-hook",
json={"tenant_shard_id": str(tenant_shard_id), "node_id": None},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2327,7 +2374,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
response = self.request(
"POST",
f"{self.env.storage_controller_api}/debug/v1/inspect",
f"{self.api}/debug/v1/inspect",
json={"tenant_shard_id": str(tenant_shard_id)},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2350,7 +2397,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_register({body})")
self.request(
"POST",
f"{self.env.storage_controller_api}/control/v1/node",
f"{self.api}/control/v1/node",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2359,7 +2406,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_delete({node_id})")
self.request(
"DELETE",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
f"{self.api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2367,7 +2414,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_drain({node_id})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
f"{self.api}/control/v1/node/{node_id}/drain",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2375,7 +2422,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"cancel_node_drain({node_id})")
self.request(
"DELETE",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
f"{self.api}/control/v1/node/{node_id}/drain",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2383,7 +2430,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_fill({node_id})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
f"{self.api}/control/v1/node/{node_id}/fill",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2391,14 +2438,22 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"cancel_node_fill({node_id})")
self.request(
"DELETE",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
f"{self.api}/control/v1/node/{node_id}/fill",
headers=self.headers(TokenScope.ADMIN),
)
def node_status(self, node_id):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
f"{self.api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def get_leader(self):
response = self.request(
"GET",
f"{self.api}/control/v1/leader",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2406,7 +2461,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def node_list(self):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/node",
f"{self.api}/control/v1/node",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2414,7 +2469,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_list(self):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/debug/v1/tenant",
f"{self.api}/debug/v1/tenant",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2424,7 +2479,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
body["node_id"] = node_id
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/config",
f"{self.api}/control/v1/node/{node_id}/config",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2459,7 +2514,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
response = self.request(
"POST",
f"{self.env.storage_controller_api}/v1/tenant",
f"{self.api}/v1/tenant",
json=body,
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
@@ -2472,7 +2527,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
response = self.request(
"GET",
f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/locate",
f"{self.api}/debug/v1/tenant/{tenant_id}/locate",
headers=self.headers(TokenScope.ADMIN),
)
body = response.json()
@@ -2485,7 +2540,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}",
f"{self.api}/control/v1/tenant/{tenant_id}",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
@@ -2496,7 +2551,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
) -> list[TenantShardId]:
response = self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split",
f"{self.api}/control/v1/tenant/{tenant_id}/shard_split",
json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2508,7 +2563,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_shard_id}/migrate",
f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate",
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2519,7 +2574,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"tenant_policy_update({tenant_id}, {body})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy",
f"{self.api}/control/v1/tenant/{tenant_id}/policy",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2527,14 +2582,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_import(self, tenant_id: TenantId):
self.request(
"POST",
f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/import",
f"{self.api}/debug/v1/tenant/{tenant_id}/import",
headers=self.headers(TokenScope.ADMIN),
)
def reconcile_all(self):
r = self.request(
"POST",
f"{self.env.storage_controller_api}/debug/v1/reconcile_all",
f"{self.api}/debug/v1/reconcile_all",
headers=self.headers(TokenScope.ADMIN),
)
r.raise_for_status()
@@ -2567,7 +2622,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
self.request(
"POST",
f"{self.env.storage_controller_api}/debug/v1/consistency_check",
f"{self.api}/debug/v1/consistency_check",
headers=self.headers(TokenScope.ADMIN),
)
log.info("storage controller passed consistency check")
@@ -2640,7 +2695,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.request(
"POST",
f"{self.env.storage_controller_api}/control/v1/metadata_health/update",
f"{self.api}/control/v1/metadata_health/update",
json=body,
headers=self.headers(TokenScope.SCRUBBER),
)
@@ -2648,7 +2703,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def metadata_health_list_unhealthy(self):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy",
f"{self.api}/control/v1/metadata_health/unhealthy",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2658,7 +2713,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
response = self.request(
"POST",
f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated",
f"{self.api}/control/v1/metadata_health/outdated",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2681,7 +2736,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info("Asking storage controller to step down")
response = self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/step_down",
f"{self.api}/control/v1/step_down",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2698,7 +2753,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
res = self.request(
"PUT",
f"{self.env.storage_controller_api}/debug/v1/failpoints",
f"{self.api}/debug/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
headers=self.headers(TokenScope.ADMIN),
)
@@ -2768,9 +2823,21 @@ class NeonStorageController(MetricsGetter, LogUtils):
parsed_tid, wait_ms=250
)
@property
def workdir(self) -> Path:
return self.env.repo_dir
def get_leadership_status(self) -> StorageControllerLeadershipStatus:
metric_values = {}
for status in StorageControllerLeadershipStatus:
metric_value = self.get_metric_value(
"storage_controller_leadership_status", filter={"status": status}
)
metric_values[status] = metric_value
assert list(metric_values.values()).count(1) == 1
for status, metric_value in metric_values.items():
if metric_value == 1:
return status
raise AssertionError("unreachable")
def __enter__(self) -> "NeonStorageController":
return self
@@ -2784,6 +2851,59 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.stop(immediate=True)
class NeonProxiedStorageController(NeonStorageController):
def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool):
super(NeonProxiedStorageController, self).__init__(env, proxy_port, auth_enabled)
self.instances: dict[int, dict[str, Any]] = {}
def start(
self,
timeout_in_seconds: Optional[int] = None,
instance_id: Optional[int] = None,
base_port: Optional[int] = None,
):
assert instance_id is not None and base_port is not None
self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port)
self.instances[instance_id] = {"running": True}
self.running = True
return self
def stop_instance(
self, immediate: bool = False, instance_id: Optional[int] = None
) -> "NeonStorageController":
assert instance_id in self.instances
if self.instances[instance_id]["running"]:
self.env.neon_cli.storage_controller_stop(immediate, instance_id)
self.instances[instance_id]["running"] = False
self.running = any(meta["running"] for meta in self.instances.values())
return self
def stop(self, immediate: bool = False) -> "NeonStorageController":
for iid, details in self.instances.items():
if details["running"]:
self.env.neon_cli.storage_controller_stop(immediate, iid)
self.instances[iid]["running"] = False
self.running = False
return self
def assert_no_errors(self):
for instance_id in self.instances.keys():
assert_no_errors(
self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log",
"storage_controller",
self.allowed_errors,
)
def log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Optional[Tuple[str, LogCursor]]:
raise NotImplementedError()
@dataclass
class LogCursor:
_line_no: int
@@ -4520,7 +4640,7 @@ class StorageScrubber:
base_args = [
str(self.env.neon_binpath / "storage_scrubber"),
f"--controller-api={self.env.storage_controller_api}",
f"--controller-api={self.env.storage_controller.api_root()}",
]
args = base_args + args

View File

@@ -0,0 +1,73 @@
import re
from typing import Any, Optional
import pytest
import requests
from pytest_httpserver import HTTPServer
from werkzeug.datastructures import Headers
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.log_helper import log
class StorageControllerProxy:
def __init__(self, server: HTTPServer):
self.server: HTTPServer = server
self.listen: str = f"http://{server.host}:{server.port}"
self.routing_to: Optional[str] = None
def route_to(self, storage_controller_api: str):
self.routing_to = storage_controller_api
def port(self) -> int:
return self.server.port
def upcall_api_endpoint(self) -> str:
return f"{self.listen}/upcall/v1"
def proxy_request(method: str, url: str, **kwargs) -> requests.Response:
return requests.request(method, url, **kwargs)
@pytest.fixture(scope="function")
def storage_controller_proxy(make_httpserver):
"""
Proxies requests into the storage controller to the currently
selected storage controller instance via `StorageControllerProxy.route_to`.
This fixture is intended for tests that need to run multiple instances
of the storage controller at the same time.
"""
server = make_httpserver
self = StorageControllerProxy(server)
log.info(f"Storage controller proxy listening on {self.listen}")
def handler(request: Request):
if self.route_to is None:
log.info(f"Storage controller proxy has no routing configured for {request.url}")
return Response("Routing not configured", status=503)
route_to_url = f"{self.routing_to}{request.path}"
log.info(f"Routing {request.url} to {route_to_url}")
args: dict[str, Any] = {"headers": request.headers}
if request.is_json:
args["json"] = request.json
response = proxy_request(request.method, route_to_url, **args)
headers = Headers()
for key, value in response.headers.items():
headers.add(key, value)
return Response(response.content, headers=headers, status=response.status_code)
self.server.expect_request(re.compile(".*")).respond_with_handler(handler)
yield self
server.clear()

View File

@@ -403,7 +403,7 @@ def wait_until(
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
log.info("waiting for %s iteration %s failed: %s", func, i + 1, e)
last_exception = e
if show_intermediate_error:
log.info(e)

View File

@@ -282,15 +282,16 @@ def test_snap_files(
env = benchmark_project_pub.pgbench_env
connstr = benchmark_project_pub.connstr
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env)
with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
is_super = cur.fetchall()[0]
is_super = cur.fetchall()[0][0]
assert is_super, "This benchmark won't work if we don't have superuser"
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env)
conn = psycopg2.connect(connstr)
conn.autocommit = True
cur = conn.cursor()

View File

@@ -1,3 +1,7 @@
import os
import random
import re
import subprocess
import threading
import time
@@ -17,17 +21,17 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
"test_lfc_resize",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
n_resize = 10
scale = 10
scale = 100
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr])
pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()
@@ -35,9 +39,21 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
conn = endpoint.connect()
cur = conn.cursor()
for i in range(n_resize):
cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'")
for _ in range(n_resize):
size = random.randint(1, 512)
cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'")
cur.execute("select pg_reload_conf()")
time.sleep(1)
cur.execute("alter system set neon.file_cache_size_limit='100MB'")
cur.execute("select pg_reload_conf()")
thread.join()
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
lfc_file_size = os.path.getsize(lfc_file_path)
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}")
assert lfc_file_size <= 512 * 1024 * 1024
assert int(lfc_file_blocks) <= 128 * 1024

View File

@@ -1,3 +1,4 @@
import concurrent.futures
import json
import threading
import time
@@ -16,6 +17,7 @@ from fixtures.neon_fixtures import (
PageserverSchedulingPolicy,
PgBin,
StorageControllerApiException,
StorageControllerLeadershipStatus,
TokenScope,
last_flush_lsn_upload,
)
@@ -30,7 +32,9 @@ from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.storage_controller_proxy import StorageControllerProxy
from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until
from fixtures.workload import Workload
from mypy_boto3_s3.type_defs import (
@@ -2093,6 +2097,131 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
)
# This is a copy of NeonEnv.start which injects the instance id and port
# into the call to NeonStorageController.start
def start_env(env: NeonEnv, storage_controller_port: int):
timeout_in_seconds = 30
# Storage controller starts first, so that pageserver /re-attach calls don't
# bounce through retries on startup
env.storage_controller.start(timeout_in_seconds, 1, storage_controller_port)
# Wait for storage controller readiness to prevent unnecessary post start-up
# reconcile.
env.storage_controller.wait_until_ready()
# Start up broker, pageserver and all safekeepers
futs = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=2 + len(env.pageservers) + len(env.safekeepers)
) as executor:
futs.append(
executor.submit(lambda: env.broker.try_start() or None)
) # The `or None` is for the linter
for pageserver in env.pageservers:
futs.append(
executor.submit(
lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds)
)
)
for safekeeper in env.safekeepers:
futs.append(
executor.submit(
lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds)
)
)
for f in futs:
f.result()
@pytest.mark.parametrize("step_down_times_out", [False, True])
def test_storage_controller_leadership_transfer(
neon_env_builder: NeonEnvBuilder,
storage_controller_proxy: StorageControllerProxy,
port_distributor: PortDistributor,
step_down_times_out: bool,
):
neon_env_builder.num_pageservers = 3
neon_env_builder.storage_controller_config = {
"database_url": f"127.0.0.1:{port_distributor.get_port()}",
"start_as_candidate": True,
}
neon_env_builder.storage_controller_port_override = storage_controller_proxy.port()
storage_controller_1_port = port_distributor.get_port()
storage_controller_2_port = port_distributor.get_port()
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env = neon_env_builder.init_configs()
start_env(env, storage_controller_1_port)
assert (
env.storage_controller.get_leadership_status() == StorageControllerLeadershipStatus.LEADER
)
leader = env.storage_controller.get_leader()
assert leader["address"] == f"http://127.0.0.1:{storage_controller_1_port}/"
if step_down_times_out:
env.storage_controller.configure_failpoints(
("sleep-on-step-down-handling", "return(10000)")
)
env.storage_controller.allowed_errors.append(".*request was dropped before completing.*")
tenant_count = 2
shard_count = 4
tenants = set(TenantId.generate() for _ in range(0, tenant_count))
for tid in tenants:
env.storage_controller.tenant_create(
tid, shard_count=shard_count, placement_policy={"Attached": 1}
)
env.storage_controller.reconcile_until_idle()
env.storage_controller.start(
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
)
if not step_down_times_out:
def previous_stepped_down():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.STEPPED_DOWN
)
wait_until(5, 1, previous_stepped_down)
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
def new_becomes_leader():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.LEADER
)
wait_until(15, 1, new_becomes_leader)
leader = env.storage_controller.get_leader()
assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/"
env.storage_controller.wait_until_ready()
env.storage_controller.consistency_check()
if step_down_times_out:
env.storage_controller.allowed_errors.extend(
[
".*Leader.*did not respond to step-down request.*",
".*Send step down request failed.*",
".*Send step down request still failed.*",
]
)
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
# single unsharded tenant, two locations
neon_env_builder.num_pageservers = 2