mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
control_plane: implement HTTP compute hook for attachment service (#6471)
## Problem When we change which physical pageservers a tenant is attached to, we must update the control plane so that it can update computes. This will be done via an HTTP hook, as described in https://www.notion.so/neondatabase/Sharding-Service-Control-Plane-interface-6de56dd310a043bfa5c2f5564fa98365#1fe185a35d6d41f0a54279ac1a41bc94 ## Summary of changes - Optional CLI args `--control-plane-jwt-token` and `-compute-hook-url` are added. If these are set, then we will use this HTTP endpoint, instead of trying to use neon_local LocalEnv to update compute configuration. - Implement an HTTP-driven version of ComputeHook that calls into the configured URL - Notify for all tenants on startup, to ensure that we don't miss notifications if we crash partway through a change, and carry a `pending_compute_notification` flag at runtime to allow notifications to fail without risking never sending the update. - Add a test for all this One might wonder: why not do a "forever" retry for compute hook notifications, rather than carrying a flag on the shard to call reconcile() again later. The reason is that we will later limit concurreny of reconciles, when dealing with larger numbers of shards, and if reconcile is stuck waiting for the control plane to accept a notification request, it could jam up the whole system and prevent us making other changes. Anyway: from the perspective of the outside world, we _do_ retry forever, but we don't retry forever within a given Reconciler lifetime. The `pending_compute_notification` logic is predicated on later adding a background task that just calls `Service::reconcile_all` on a schedule to make sure that anything+everything that can fail a Reconciler::reconcile call will eventually be retried.
This commit is contained in:
@@ -482,6 +482,7 @@ class NeonEnvBuilder:
|
||||
self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = []
|
||||
self.config_init_force: Optional[str] = None
|
||||
self.top_output_dir = top_output_dir
|
||||
self.control_plane_compute_hook_api: Optional[str] = None
|
||||
|
||||
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
|
||||
|
||||
@@ -1007,6 +1008,9 @@ class NeonEnv:
|
||||
# The base URL of the attachment service
|
||||
self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}"
|
||||
|
||||
# For testing this with a fake HTTP server, enable passing through a URL from config
|
||||
self.control_plane_compute_hook_api = config.control_plane_compute_hook_api
|
||||
|
||||
self.attachment_service: NeonAttachmentService = NeonAttachmentService(
|
||||
self, config.auth_enabled
|
||||
)
|
||||
@@ -1026,6 +1030,9 @@ class NeonEnv:
|
||||
if self.control_plane_api is not None:
|
||||
cfg["control_plane_api"] = self.control_plane_api
|
||||
|
||||
if self.control_plane_compute_hook_api is not None:
|
||||
cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api
|
||||
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
@@ -1904,7 +1911,7 @@ class Pagectl(AbstractNeonCli):
|
||||
|
||||
|
||||
class NeonAttachmentService:
|
||||
def __init__(self, env: NeonEnv, auth_enabled):
|
||||
def __init__(self, env: NeonEnv, auth_enabled: bool):
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.auth_enabled = auth_enabled
|
||||
|
||||
@@ -1,14 +1,24 @@
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
counts: defaultdict[str, int] = defaultdict(int)
|
||||
for tid in tenant_ids:
|
||||
for shard in env.attachment_service.locate(tid):
|
||||
counts[shard["node_id"]] += 1
|
||||
return counts
|
||||
|
||||
|
||||
def test_sharding_service_smoke(
|
||||
@@ -54,14 +64,7 @@ def test_sharding_service_smoke(
|
||||
for tid in tenant_ids:
|
||||
env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant)
|
||||
|
||||
def get_node_shard_counts():
|
||||
counts: defaultdict[str, int] = defaultdict(int)
|
||||
for tid in tenant_ids:
|
||||
for shard in env.attachment_service.locate(tid):
|
||||
counts[shard["node_id"]] += 1
|
||||
return counts
|
||||
|
||||
for node_id, count in get_node_shard_counts().items():
|
||||
for node_id, count in get_node_shard_counts(env, tenant_ids).items():
|
||||
# we used a multiple of pagservers for the total shard count,
|
||||
# so expect equal number on all pageservers
|
||||
assert count == tenant_shard_count / len(
|
||||
@@ -89,7 +92,7 @@ def test_sharding_service_smoke(
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
counts = get_node_shard_counts()
|
||||
counts = get_node_shard_counts(env, tenant_ids)
|
||||
assert counts[node_id] == 0
|
||||
|
||||
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
|
||||
@@ -98,7 +101,7 @@ def test_sharding_service_smoke(
|
||||
# immediately
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"})
|
||||
time.sleep(1)
|
||||
assert get_node_shard_counts()[env.pageservers[0].id] == 0
|
||||
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
|
||||
|
||||
# Delete all the tenants
|
||||
for tid in tenant_ids:
|
||||
@@ -113,7 +116,7 @@ def test_sharding_service_smoke(
|
||||
for tid in tenant_ids:
|
||||
env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant)
|
||||
|
||||
counts = get_node_shard_counts()
|
||||
counts = get_node_shard_counts(env, tenant_ids)
|
||||
# Nothing should have been scheduled on the node in Draining
|
||||
assert counts[env.pageservers[1].id] == 0
|
||||
assert counts[env.pageservers[0].id] == tenant_shard_count // 2
|
||||
@@ -270,3 +273,73 @@ def test_sharding_service_onboarding(
|
||||
# The onboarded tenant should surviev a restart of pageserver
|
||||
dest_ps.stop()
|
||||
dest_ps.start()
|
||||
|
||||
|
||||
def test_sharding_service_compute_hook(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
):
|
||||
"""
|
||||
Test that the sharding service calls out to the configured HTTP endpoint on attachment changes
|
||||
"""
|
||||
|
||||
# We will run two pageserver to migrate and check that the attachment service sends notifications
|
||||
# when migrating.
|
||||
neon_env_builder.num_pageservers = 2
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
|
||||
|
||||
# Set up fake HTTP notify endpoint
|
||||
notifications = []
|
||||
|
||||
def handler(request: Request):
|
||||
log.info(f"Notify request: {request}")
|
||||
notifications.append(request.json)
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# We will to an unclean migration, which will result in deletion queue warnings
|
||||
env.pageservers[0].allowed_errors.append(".*Dropped remote consistent LSN updates for tenant.*")
|
||||
|
||||
# Initial notification from tenant creation
|
||||
assert len(notifications) == 1
|
||||
expect = {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
|
||||
}
|
||||
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
counts = get_node_shard_counts(env, [env.initial_tenant])
|
||||
assert counts[node_id] == 0
|
||||
|
||||
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
|
||||
|
||||
# Additional notification from migration
|
||||
log.info(f"notifications: {notifications}")
|
||||
expect = {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"shards": [{"node_id": int(env.pageservers[1].id), "shard_number": 0}],
|
||||
}
|
||||
|
||||
def received_migration_notification():
|
||||
assert len(notifications) == 2
|
||||
assert notifications[1] == expect
|
||||
|
||||
wait_until(20, 0.25, received_migration_notification)
|
||||
|
||||
# When we restart, we should re-emit notifications for all tenants
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
|
||||
def received_restart_notification():
|
||||
assert len(notifications) == 3
|
||||
assert notifications[1] == expect
|
||||
|
||||
wait_until(10, 1, received_restart_notification)
|
||||
|
||||
Reference in New Issue
Block a user