Merge remote-tracking branch 'origin/main' into problame/hung-shutdown/demo-hypothesis

This commit is contained in:
Christian Schwarz
2025-01-14 22:33:20 +01:00
84 changed files with 4368 additions and 2399 deletions

View File

@@ -131,7 +131,6 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_smgr_query_seconds_global"),
*histogram("pageserver_layers_visited_per_read_global"),
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import concurrent.futures
import re
import threading
from pathlib import Path
import pytest
@@ -188,7 +189,20 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
check_pgbench_output(out_path)
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads:
stop_pump = threading.Event()
def pump_controller():
# Run a background loop to force the storage controller to run its
# background work faster than it otherwise would: this helps
# us:
# A) to create a test that runs in a shorter time
# B) to create a test that is more intensive by doing the shard migrations
# after splits happen more rapidly.
while not stop_pump.is_set():
env.storage_controller.reconcile_all()
stop_pump.wait(0.1)
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count + 1) as pgbench_threads:
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_init, tenant_state.endpoint)
@@ -198,6 +212,8 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
for fut in pgbench_futs:
fut.result()
pump_fut = pgbench_threads.submit(pump_controller)
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_main, tenant_state.endpoint)
@@ -207,6 +223,9 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
for fut in pgbench_futs:
fut.result()
stop_pump.set()
pump_fut.result()
def assert_all_split():
for tenant_id in tenants.keys():
shards = tenant_get_shards(env, tenant_id)

View File

@@ -13,11 +13,13 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PageserverAvailability,
PageserverSchedulingPolicy,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pg_version import PgVersion
from fixtures.utils import wait_until
def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[str, int]:
@@ -85,8 +87,12 @@ def test_storage_controller_many_tenants(
)
AZS = ["alpha", "bravo", "charlie"]
def az_selector(node_id):
return f"az-{AZS[(node_id - 1) % len(AZS)]}"
neon_env_builder.pageserver_config_override = lambda ps_cfg: ps_cfg.update(
{"availability_zone": f"az-{AZS[ps_cfg['id'] % len(AZS)]}"}
{"availability_zone": az_selector(ps_cfg["id"])}
)
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
@@ -168,6 +174,31 @@ def test_storage_controller_many_tenants(
log.info(f"Resident memory: {rss} ({ rss / total_shards} per shard)")
assert rss < expect_memory_per_shard * total_shards
def assert_all_tenants_scheduled_in_home_az():
for tenant_id in tenant_ids:
desc = env.storage_controller.tenant_describe(tenant_id)
preferred_az = None
for shard in desc["shards"]:
# All shards in a tenant should have the same preferred AZ
if preferred_az is None:
preferred_az = shard["preferred_az_id"]
else:
assert preferred_az == shard["preferred_az_id"]
# Attachment should be in the preferred AZ
assert shard["preferred_az_id"] == az_selector(
shard["node_attached"]
), f"Shard {shard['tenant_shard_id']} not in {shard['preferred_az_id']}"
# Secondary locations should not be in the preferred AZ
for node_secondary in shard["node_secondary"]:
assert (
shard["preferred_az_id"] != az_selector(node_secondary)
), f"Shard {shard['tenant_shard_id']} secondary should be in {shard['preferred_az_id']}"
# There should only be one secondary location (i.e. no migrations in flight)
assert len(shard["node_secondary"]) == 1
# Issue more concurrent operations than the storage controller's reconciler concurrency semaphore
# permits, to ensure that we are exercising stressing that.
api_concurrency = 135
@@ -242,6 +273,22 @@ def test_storage_controller_many_tenants(
f"Created {len(tenants_with_timelines)} timelines in {time.time() - t1}, {len(tenants_with_timelines) / (time.time() - t1)}/s"
)
# Check initial scheduling
assert_all_tenants_scheduled_in_home_az()
az_attached_counts: defaultdict[str, int] = defaultdict(int)
az_secondary_counts: defaultdict[str, int] = defaultdict(int)
node_attached_counts: defaultdict[str, int] = defaultdict(int)
for tenant_id in tenants.keys():
desc = env.storage_controller.tenant_describe(tenant_id)
for shard in desc["shards"]:
az_attached_counts[az_selector(shard["node_attached"])] += 1
node_attached_counts[shard["node_attached"]] += 1
for node_secondary in shard["node_secondary"]:
az_secondary_counts[az_selector(node_secondary)] += 1
log.info(f"Initial node attached counts: {node_attached_counts}")
log.info(f"Initial AZ shard counts: {az_attached_counts}, {az_secondary_counts}")
# Plan operations: ensure each tenant with a timeline gets at least
# one of each operation type. Then add other tenants to make up the
# numbers.
@@ -450,11 +497,77 @@ def test_storage_controller_many_tenants(
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
env.storage_controller.consistency_check()
# Since we did `reconcile_until_idle` during the above loop, the system should be left in
# an optimally scheduled state. Validate that this includes all the tenants being scheduled
# in their home AZ.
assert_all_tenants_scheduled_in_home_az()
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
# as they were not offline long enough to trigger any scheduling changes.
env.storage_controller.consistency_check()
check_memory()
# Simulate loss of an AZ
victim_az = "az-alpha"
killed_pageservers = []
for ps in env.pageservers:
if az_selector(ps.id) == victim_az:
ps.stop(immediate=True)
killed_pageservers.append(ps)
log.info(f"Killed pageserver {ps.id}")
assert killed_pageservers
# Wait for the controller to notice the pageservers are dead
def assert_pageservers_availability(
pageservers: list[NeonPageserver], expected_availability: PageserverAvailability
):
nodes = env.storage_controller.nodes()
checked_any = False
node_ids = [ps.id for ps in pageservers]
for node in nodes:
if node["id"] in node_ids:
checked_any = True
assert (
node["availability"] == expected_availability
), f"Node {node['id']} is not {expected_availability} yet: {node['availability']}"
assert checked_any
wait_until(
lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.OFFLINE),
timeout=60,
)
# Let the controller finish all its rescheduling
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
# Check that all the tenants are rescheduled to the remaining pageservers
for tenant_id in tenant_ids:
desc = env.storage_controller.tenant_describe(tenant_id)
for shard in desc["shards"]:
# Attachment should be outside the AZ where we killed the pageservers
assert (
az_selector(shard["node_attached"]) != victim_az
), f"Shard {shard['tenant_shard_id']} still in {victim_az} (node {shard['node_attached']})"
# Bring back the pageservers
for ps in killed_pageservers:
ps.start()
wait_until(
lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.ACTIVE),
timeout=60,
)
# A very long timeout is required: we will be migrating all the tenants on all the pageservers
# in the region that we just restored. Assume it'll take up to twice as long as it took to fill
# a single node
env.storage_controller.reconcile_until_idle(
max_interval=0.1, timeout_secs=DRAIN_FILL_TIMEOUT * 4
)
assert_all_tenants_scheduled_in_home_az()
# Stop the storage controller before tearing down fixtures, because it otherwise might log
# errors trying to call our `ComputeReconfigure`.
env.storage_controller.stop()

View File

@@ -84,9 +84,6 @@ page_cache_size=10
log.info("Checking layer access metrics ...")
layer_access_metric_names = [
"pageserver_layers_visited_per_read_global_sum",
"pageserver_layers_visited_per_read_global_count",
"pageserver_layers_visited_per_read_global_bucket",
"pageserver_layers_visited_per_vectored_read_global_sum",
"pageserver_layers_visited_per_vectored_read_global_count",
"pageserver_layers_visited_per_vectored_read_global_bucket",
@@ -97,12 +94,6 @@ page_cache_size=10
layer_access_metrics = metrics.query_all(name)
log.info(f"Got metrics: {layer_access_metrics}")
non_vectored_sum = metrics.query_one("pageserver_layers_visited_per_read_global_sum")
non_vectored_count = metrics.query_one("pageserver_layers_visited_per_read_global_count")
if non_vectored_count.value != 0:
non_vectored_average = non_vectored_sum.value / non_vectored_count.value
else:
non_vectored_average = 0
vectored_sum = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_sum")
vectored_count = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_count")
if vectored_count.value > 0:
@@ -113,11 +104,10 @@ page_cache_size=10
assert vectored_sum.value == 0
vectored_average = 0
log.info(f"{non_vectored_average=} {vectored_average=}")
log.info(f"{vectored_average=}")
# The upper bound for average number of layer visits below (8)
# was chosen empirically for this workload.
assert non_vectored_average < 8
assert vectored_average < 8

View File

@@ -219,7 +219,7 @@ if SQL_EXPORTER is None:
#
# The "host" network mode allows sql_exporter to talk to the
# endpoint which is running on the host.
super().__init__("docker.io/burningalchemist/sql_exporter:0.16.0", network_mode="host")
super().__init__("docker.io/burningalchemist/sql_exporter:0.17.0", network_mode="host")
self.__logs_dir = logs_dir
self.__port = port
@@ -252,7 +252,7 @@ if SQL_EXPORTER is None:
log.info("Waiting for sql_exporter to be ready")
wait_for_logs(
self,
rf'level=info msg="Listening on" address=\[::\]:{self.__port}',
rf'msg="Listening on" address=\[::\]:{self.__port}',
timeout=5,
)
@@ -344,10 +344,7 @@ else:
time.sleep(0.5)
continue
if (
f'level=info msg="Listening on" address=[::]:{self._sql_exporter_port}'
in line
):
if f'msg="Listening on" address=[::]:{self._sql_exporter_port}' in line:
break
@override

View File

@@ -187,7 +187,7 @@ def test_physical_replication_config_mismatch_too_many_known_xids(neon_simple_en
origin=primary,
endpoint_id="secondary",
config_lines=[
"max_connections=2",
"max_connections=5",
"autovacuum_max_workers=1",
"max_worker_processes=5",
"max_wal_senders=1",

View File

@@ -1,10 +1,15 @@
from __future__ import annotations
import asyncio
import ssl
import asyncpg
import pytest
import websocket_tunnel
import websockets
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonProxy
from fixtures.port_distributor import PortDistributor
@pytest.mark.asyncio
@@ -196,3 +201,53 @@ async def test_websockets_pipelined(static_proxy: NeonProxy):
# close
await websocket.send(b"X\x00\x00\x00\x04")
await websocket.wait_closed()
@pytest.mark.asyncio
async def test_websockets_tunneled(static_proxy: NeonProxy, port_distributor: PortDistributor):
static_proxy.safe_psql("create user ws_auth with password 'ws' superuser")
user = "ws_auth"
password = "ws"
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(str(static_proxy.test_output_dir / "proxy.crt"))
# Launch a tunnel service so that we can speak the websockets protocol to
# the proxy
tunnel_port = port_distributor.get_port()
tunnel_server = await websocket_tunnel.start_server(
"127.0.0.1",
tunnel_port,
f"wss://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
ssl_context,
)
log.info(f"websockets tunnel listening for connections on port {tunnel_port}")
async with tunnel_server:
async def run_tunnel():
try:
async with tunnel_server:
await tunnel_server.serve_forever()
except Exception as e:
log.error(f"Error in tunnel task: {e}")
tunnel_task = asyncio.create_task(run_tunnel())
# Ok, the tunnel is now running. Check that we can connect to the proxy's
# websocket interface, through the tunnel
tunnel_connstring = f"postgres://{user}:{password}@127.0.0.1:{tunnel_port}/postgres"
log.info(f"connecting to {tunnel_connstring}")
conn = await asyncpg.connect(tunnel_connstring)
res = await conn.fetchval("SELECT 123")
assert res == 123
await conn.close()
log.info("Ran a query successfully through the tunnel")
tunnel_server.close()
try:
await tunnel_task
except asyncio.CancelledError:
pass

View File

@@ -520,14 +520,18 @@ def test_sharding_split_smoke(
shard_count = 2
# Shard count we split into
split_shard_count = 4
# We will have 2 shards per pageserver once done (including secondaries)
neon_env_builder.num_pageservers = split_shard_count
# In preferred AZ & other AZ we will end up with one shard per pageserver
neon_env_builder.num_pageservers = split_shard_count * 2
# Two AZs
def assign_az(ps_cfg):
az = f"az-{(ps_cfg['id'] - 1) % 2}"
ps_cfg["availability_zone"] = az
# We will run more pageservers than tests usually do, so give them tiny page caches
# in case we're on a test node under memory pressure.
ps_cfg["page_cache_size"] = 128
neon_env_builder.pageserver_config_override = assign_az
# 1MiB stripes: enable getting some meaningful data distribution without
@@ -679,8 +683,8 @@ def test_sharding_split_smoke(
# - shard_count reconciles for the original setup of the tenant
# - shard_count reconciles for detaching the original secondary locations during split
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
# - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move)
expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2
# - split_shard_count/2 reconciles to migrate shards to their temporary secondaries
expect_reconciles = shard_count * 2 + split_shard_count + 3 * (split_shard_count / 2)
reconcile_ok = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
@@ -745,10 +749,14 @@ def test_sharding_split_smoke(
# dominated by shard count.
log.info(f"total: {total}")
assert total == {
1: 2,
2: 2,
3: 2,
4: 2,
1: 1,
2: 1,
3: 1,
4: 1,
5: 1,
6: 1,
7: 1,
8: 1,
}
# The controller is not required to lay out the attached locations in any particular way, but
@@ -1387,13 +1395,7 @@ def test_sharding_split_failures(
else:
attached_count += 1
if exclude_ps_id is not None:
# For a node failure case, we expect there to be a secondary location
# scheduled on the offline node, so expect one fewer secondary in total
assert secondary_count == initial_shard_count - 1
else:
assert secondary_count == initial_shard_count
assert secondary_count == initial_shard_count
assert attached_count == initial_shard_count
def assert_split_done(exclude_ps_id: int | None = None) -> None:

View File

@@ -822,6 +822,122 @@ def test_storage_controller_stuck_compute_hook(
env.storage_controller.consistency_check()
@run_only_on_default_postgres("postgres behavior is not relevant")
def test_storage_controller_compute_hook_retry(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address: ListenAddress,
):
"""
Test that when a reconciler can't do its compute hook notification, it will keep
trying until it succeeds.
Reproducer for https://github.com/neondatabase/cloud/issues/22612
"""
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
handle_params = {"status": 200}
notifications = []
def handler(request: Request):
status = handle_params["status"]
log.info(f"Notify request[{status}]: {request}")
notifications.append(request.json)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.create_tenant(tenant_id, placement_policy='{"Attached": 1}')
# Initial notification from tenant creation
assert len(notifications) == 1
expect: dict[str, list[dict[str, int]] | str | None | int] = {
"tenant_id": str(tenant_id),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}
assert notifications[0] == expect
# Block notifications, and fail a node
handle_params["status"] = 423
env.pageservers[0].stop()
env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG)
env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS)
# Avoid waiting for heartbeats
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Offline"})
# Make reconciler run and fail: it should leave itself in a state where the shard will retry notification later,
# and we will check that that happens
notifications = []
try:
assert env.storage_controller.reconcile_all() == 1
except StorageControllerApiException as e:
assert "Control plane tenant busy" in str(e)
assert len(notifications) == 1
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is True
)
# Try reconciling again, it should try notifying again
notifications = []
try:
assert env.storage_controller.reconcile_all() == 1
except StorageControllerApiException as e:
assert "Control plane tenant busy" in str(e)
assert len(notifications) == 1
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is True
)
# The describe API should indicate that a notification is pending
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is True
)
# Unblock notifications: reconcile should work now
handle_params["status"] = 200
notifications = []
assert env.storage_controller.reconcile_all() == 1
assert len(notifications) == 1
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is False
)
# Reconciler should be idle now that it succeeded in its compute notification
notifications = []
assert env.storage_controller.reconcile_all() == 0
assert len(notifications) == 0
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is False
)
@run_only_on_default_postgres("this test doesn't start an endpoint")
def test_storage_controller_compute_hook_revert(
httpserver: HTTPServer,
@@ -936,7 +1052,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
that just hits the endpoints to check that they don't bitrot.
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.num_pageservers = 3
env = neon_env_builder.init_start()
tenant_id = TenantId.generate()
@@ -961,7 +1077,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
"GET", f"{env.storage_controller_api}/debug/v1/scheduler"
)
# Two nodes, in a dict of node_id->node
assert len(response.json()["nodes"]) == 2
assert len(response.json()["nodes"]) == 3
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
@@ -972,13 +1088,25 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
# Secondary migration API: superficial check that it migrates
secondary_dest = env.pageservers[2].id
env.storage_controller.request(
"PUT",
f"{env.storage_controller_api}/control/v1/tenant/{tenant_id}-0002/migrate_secondary",
headers=env.storage_controller.headers(TokenScope.ADMIN),
json={"tenant_shard_id": f"{tenant_id}-0002", "node_id": secondary_dest},
)
assert env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_secondary"] == [
secondary_dest
]
# Node unclean drop API
response = env.storage_controller.request(
"POST",
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
assert len(env.storage_controller.node_list()) == 1
assert len(env.storage_controller.node_list()) == 2
# Tenant unclean drop API
response = env.storage_controller.request(
@@ -1696,7 +1824,13 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
"""
output_dir = neon_env_builder.test_output_dir
shard_count = 4
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.create_tenant(tenant_id, placement_policy='{"Attached":1}', shard_count=shard_count)
base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api]
def storcon_cli(args):
@@ -1725,7 +1859,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
# List nodes
node_lines = storcon_cli(["nodes"])
# Table header, footer, and one line of data
assert len(node_lines) == 5
assert len(node_lines) == 7
assert "localhost" in node_lines[3]
# Pause scheduling onto a node
@@ -1743,10 +1877,21 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"])
assert "Offline" in storcon_cli(["nodes"])[3]
# Restore node, verify status changes in CLI output
env.pageservers[0].start()
def is_online():
assert "Offline" not in storcon_cli(["nodes"])
wait_until(is_online)
# Let everything stabilize after node failure to avoid interfering with subsequent steps
env.storage_controller.reconcile_until_idle(timeout_secs=10)
# List tenants
tenant_lines = storcon_cli(["tenants"])
assert len(tenant_lines) == 5
assert str(env.initial_tenant) in tenant_lines[3]
assert str(tenant_id) in tenant_lines[3]
# Setting scheduling policies intentionally result in warnings, they're for rare use.
env.storage_controller.allowed_errors.extend(
@@ -1754,23 +1899,58 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
)
# Describe a tenant
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)])
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(tenant_id)])
assert len(tenant_lines) >= 3 + shard_count * 2
assert str(env.initial_tenant) in tenant_lines[0]
assert str(tenant_id) in tenant_lines[0]
# Migrate an attached location
def other_ps_id(current_ps_id):
return (
env.pageservers[0].id
if current_ps_id == env.pageservers[1].id
else env.pageservers[1].id
)
storcon_cli(
[
"tenant-shard-migrate",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"]
)
),
]
)
# Migrate a secondary location
storcon_cli(
[
"tenant-shard-migrate-secondary",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"node_secondary"
][0]
)
),
]
)
# Pause changes on a tenant
storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"])
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--scheduling", "stop"])
assert "Stop" in storcon_cli(["tenants"])[3]
# Cancel ongoing reconcile on a tenant
storcon_cli(
["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{env.initial_tenant}-0104"]
)
storcon_cli(["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{tenant_id}-0104"])
# Change a tenant's placement
storcon_cli(
["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"]
)
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--placement", "secondary"])
assert "Secondary" in storcon_cli(["tenants"])[3]
# Modify a tenant's config
@@ -1778,7 +1958,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
[
"patch-tenant-config",
"--tenant-id",
str(env.initial_tenant),
str(tenant_id),
"--config",
json.dumps({"pitr_interval": "1m"}),
]
@@ -3033,11 +3213,12 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
def assign_az(ps_cfg):
az = f"az-{ps_cfg['id']}"
az = f"az-{ps_cfg['id'] % 2}"
log.info("Assigned AZ {az}")
ps_cfg["availability_zone"] = az
neon_env_builder.pageserver_config_override = assign_az
neon_env_builder.num_pageservers = 2
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
env.start()
@@ -3052,8 +3233,14 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
assert shards[0]["preferred_az_id"] == expected_az
# When all other schedule scoring parameters are equal, tenants should round-robin on AZs
assert env.storage_controller.tenant_describe(tids[0])["shards"][0]["preferred_az_id"] == "az-0"
assert env.storage_controller.tenant_describe(tids[1])["shards"][0]["preferred_az_id"] == "az-1"
assert env.storage_controller.tenant_describe(tids[2])["shards"][0]["preferred_az_id"] == "az-0"
# Try modifying preferred AZ
updated = env.storage_controller.set_preferred_azs(
{TenantShardId(tid, 0, 0): "foo" for tid in tids}
{TenantShardId(tid, 0, 0): "az-0" for tid in tids}
)
assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids])
@@ -3061,29 +3248,24 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
for tid in tids:
shards = env.storage_controller.tenant_describe(tid)["shards"]
assert len(shards) == 1
assert shards[0]["preferred_az_id"] == "foo"
assert shards[0]["preferred_az_id"] == "az-0"
# Generate a layer to avoid shard split handling on ps from tripping
# up on debug assert.
timeline_id = TimelineId.generate()
env.create_timeline("bar", tids[0], timeline_id)
workload = Workload(env, tids[0], timeline_id, branch_name="bar")
workload.init()
workload.write_rows(256)
workload.validate()
# Having modified preferred AZ, we should get moved there
env.storage_controller.reconcile_until_idle(max_interval=0.1)
for tid in tids:
shard = env.storage_controller.tenant_describe(tid)["shards"][0]
attached_to = shard["node_attached"]
attached_in_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == attached_in_az == "az-0"
env.storage_controller.tenant_shard_split(tids[0], shard_count=2)
env.storage_controller.reconcile_until_idle(max_interval=0.1)
shards = env.storage_controller.tenant_describe(tids[0])["shards"]
assert len(shards) == 2
for shard in shards:
attached_to = shard["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
# The scheduling optimization logic is not yet AZ-aware, so doesn't succeed
# in putting the tenant shards in the preferred AZ.
# To be fixed in https://github.com/neondatabase/neon/pull/9916
# assert shard["preferred_az_id"] == expected_az
attached_in_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == attached_in_az == "az-0"
@run_only_on_default_postgres("Postgres version makes no difference here")

154
test_runner/websocket_tunnel.py Executable file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
#
# This program helps to test the WebSocket tunneling in proxy. It listens for a TCP
# connection on a port, and when you connect to it, it opens a websocket connection,
# and forwards all the traffic to the websocket connection, wrapped in WebSocket binary
# frames.
#
# This is used in the test_proxy::test_websockets test, but it is handy for manual testing too.
#
# Usage for manual testing:
#
# ## Launch Posgres on port 3000:
# postgres -D data -p3000
#
# ## Launch proxy with WSS enabled:
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.neon.localtest.me'
# ./target/debug/proxy --wss 127.0.0.1:40433 --http 127.0.0.1:28080 --mgmt 127.0.0.1:9099 --proxy 127.0.0.1:4433 --tls-key server.key --tls-cert server.crt --auth-backend postgres
#
# ## Launch the tunnel:
#
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.neon.localtest.me"
#
# ## Now you can connect with psql:
# psql "postgresql://heikki@localhost:40433/postgres"
#
import argparse
import asyncio
import logging
import ssl
from ssl import Purpose
import websockets
from fixtures.log_helper import log
# Enable verbose logging of all the traffic
def enable_verbose_logging():
logger = logging.getLogger("websockets")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
async def start_server(tcp_listen_host, tcp_listen_port, ws_url, ctx):
server = await asyncio.start_server(
lambda r, w: handle_client(r, w, ws_url, ctx), tcp_listen_host, tcp_listen_port
)
return server
async def handle_tcp_to_websocket(tcp_reader, ws):
try:
while not tcp_reader.at_eof():
data = await tcp_reader.read(1024)
await ws.send(data)
except websockets.exceptions.ConnectionClosedError as e:
log.debug(f"connection closed: {e}")
except websockets.exceptions.ConnectionClosedOK:
log.debug("connection closed")
except Exception as e:
log.error(e)
async def handle_websocket_to_tcp(ws, tcp_writer):
try:
async for message in ws:
tcp_writer.write(message)
await tcp_writer.drain()
except websockets.exceptions.ConnectionClosedError as e:
log.debug(f"connection closed: {e}")
except websockets.exceptions.ConnectionClosedOK:
log.debug("connection closed")
except Exception as e:
log.error(e)
async def handle_client(tcp_reader, tcp_writer, ws_url: str, ctx: ssl.SSLContext):
try:
log.info("Received TCP connection. Connecting to websockets proxy.")
async with websockets.connect(ws_url, ssl=ctx) as ws:
try:
log.info("Connected to websockets proxy")
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(handle_tcp_to_websocket(tcp_reader, ws))
task2 = tg.create_task(handle_websocket_to_tcp(ws, tcp_writer))
done, pending = await asyncio.wait(
[task1, task2], return_when=asyncio.FIRST_COMPLETED
)
tcp_writer.close()
await ws.close()
except* Exception as ex:
log.error(ex.exceptions)
except Exception as e:
log.error(e)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--tcp-listen-addr",
default="localhost",
help="TCP addr to listen on",
)
parser.add_argument(
"--tcp-listen-port",
default="40444",
help="TCP port to listen on",
)
parser.add_argument(
"--ws-url",
default="wss://localhost/",
help="websocket URL to connect to. This determines the Host header sent to the server",
)
parser.add_argument(
"--ws-host",
default="127.0.0.1",
help="websockets host to connect to",
)
parser.add_argument(
"--ws-port",
type=int,
default=443,
help="websockets port to connect to",
)
parser.add_argument(
"--verbose",
action="store_true",
help="enable verbose logging",
)
args = parser.parse_args()
if args.verbose:
enable_verbose_logging()
ctx = ssl.create_default_context(Purpose.SERVER_AUTH)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
server = await start_server(args.tcp_listen_addr, args.tcp_listen_port, args.ws_url, ctx)
print(
f"Listening for connections at {args.tcp_listen_addr}:{args.tcp_listen_port}, forwarding them to {args.ws_host}:{args.ws_port}"
)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())