mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
## Problem To test sharding, we need something to control it. We could write python code for doing this from the test runner, but this wouldn't be usable with neon_local run directly, and when we want to write tests with large number of shards/tenants, Rust is a better fit efficiently handling all the required state. This service enables automated tests to easily get a system with sharding/HA without the test itself having to set this all up by hand: existing tests can be run against sharded tenants just by setting a shard count when creating the tenant. ## Summary of changes Attachment service was previously a map of TenantId->TenantState, where the principal state stored for each tenant was the generation and the last attached pageserver. This enabled it to serve the re-attach and validate requests that the pageserver requires. In this PR, the scope of the service is extended substantially to do overall management of tenants in the pageserver, including tenant/timeline creation, live migration, evacuation of offline pageservers etc. This is done using synchronous code to make declarative changes to the tenant's intended state (`TenantState.policy` and `TenantState.intent`), which are then translated into calls into the pageserver by the `Reconciler`. Top level summary of modules within `control_plane/attachment_service/src`: - `tenant_state`: structure that represents one tenant shard. - `service`: implements the main high level such as tenant/timeline creation, marking a node offline, etc. - `scheduler`: for operations that need to pick a pageserver for a tenant, construct a scheduler and call into it. - `compute_hook`: receive notifications when a tenant shard is attached somewhere new. Once we have locations for all the shards in a tenant, emit an update to postgres configuration via the neon_local `LocalEnv`. - `http`: HTTP stubs. These mostly map to methods on `Service`, but are separated for readability and so that it'll be easier to adapt if/when we switch to another RPC layer. - `node`: structure that describes a pageserver node. The most important attribute of a node is its availability: marking a node offline causes tenant shards to reschedule away from it. This PR is a precursor to implementing the full sharding service for prod (#6342). What's the difference between this and a production-ready controller for pageservers? - JSON file persistence to be replaced with a database - Limited observability. - No concurrency limits. Marking a pageserver offline will try and migrate every tenant to a new pageserver concurrently, even if there are thousands. - Very simple scheduler that only knows to pick the pageserver with fewest tenants, and place secondary locations on a different pageserver than attached locations: it does not try to place shards for the same tenant on different pageservers. This matters little in tests, because picking the least-used pageserver usually results in round-robin placement. - Scheduler state is rebuilt exhaustively for each operation that requires a scheduler. - Relies on neon_local mechanisms for updating postgres: in production this would be something that flows through the real control plane. --------- Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
153 lines
5.2 KiB
Python
153 lines
5.2 KiB
Python
from typing import Optional
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import (
|
|
Endpoint,
|
|
NeonEnv,
|
|
last_flush_lsn_upload,
|
|
tenant_get_shards,
|
|
wait_for_last_flush_lsn,
|
|
)
|
|
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
|
from fixtures.types import TenantId, TimelineId
|
|
|
|
|
|
class Workload:
|
|
"""
|
|
This is not a general purpose load generator: it exists for storage tests that need to inject some
|
|
high level types of storage work via the postgres interface:
|
|
- layer writes (`write_rows`)
|
|
- work for compaction (`churn_rows`)
|
|
- reads, checking we get the right data (`validate`)
|
|
"""
|
|
|
|
def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
|
|
self.env = env
|
|
self.tenant_id = tenant_id
|
|
self.timeline_id = timeline_id
|
|
self.table = "foo"
|
|
|
|
self.expect_rows = 0
|
|
self.churn_cursor = 0
|
|
|
|
self._endpoint: Optional[Endpoint] = None
|
|
|
|
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
|
if self._endpoint is None:
|
|
self._endpoint = self.env.endpoints.create(
|
|
"main",
|
|
tenant_id=self.tenant_id,
|
|
pageserver_id=pageserver_id,
|
|
endpoint_id="ep-workload",
|
|
)
|
|
self._endpoint.start(pageserver_id=pageserver_id)
|
|
else:
|
|
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
|
|
|
connstring = self._endpoint.safe_psql(
|
|
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
|
)
|
|
log.info(f"Workload.endpoint: connstr={connstring}")
|
|
|
|
return self._endpoint
|
|
|
|
def __del__(self):
|
|
if self._endpoint is not None:
|
|
self._endpoint.stop()
|
|
|
|
def init(self, pageserver_id: Optional[int] = None):
|
|
endpoint = self.endpoint(pageserver_id)
|
|
|
|
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
|
|
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
|
|
last_flush_lsn_upload(
|
|
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
|
)
|
|
|
|
def write_rows(self, n, pageserver_id: Optional[int] = None):
|
|
endpoint = self.endpoint(pageserver_id)
|
|
start = self.expect_rows
|
|
end = start + n - 1
|
|
self.expect_rows += n
|
|
dummy_value = "blah"
|
|
endpoint.safe_psql(
|
|
f"""
|
|
INSERT INTO {self.table} (id, val)
|
|
SELECT g, '{dummy_value}'
|
|
FROM generate_series({start}, {end}) g
|
|
"""
|
|
)
|
|
|
|
return last_flush_lsn_upload(
|
|
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
|
)
|
|
|
|
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
|
|
assert self.expect_rows >= n
|
|
|
|
max_iters = 10
|
|
endpoint = self.endpoint(pageserver_id)
|
|
todo = n
|
|
i = 0
|
|
while todo > 0:
|
|
i += 1
|
|
if i > max_iters:
|
|
raise RuntimeError("oops")
|
|
start = self.churn_cursor % self.expect_rows
|
|
n_iter = min((self.expect_rows - start), todo)
|
|
todo -= n_iter
|
|
|
|
end = start + n_iter - 1
|
|
|
|
log.info(
|
|
f"start,end = {start},{end}, cursor={self.churn_cursor}, expect_rows={self.expect_rows}"
|
|
)
|
|
|
|
assert end < self.expect_rows
|
|
|
|
self.churn_cursor += n_iter
|
|
dummy_value = "blah"
|
|
endpoint.safe_psql_many(
|
|
[
|
|
f"""
|
|
INSERT INTO {self.table} (id, val)
|
|
SELECT g, '{dummy_value}'
|
|
FROM generate_series({start}, {end}) g
|
|
ON CONFLICT (id) DO UPDATE
|
|
SET val = EXCLUDED.val
|
|
""",
|
|
f"VACUUM {self.table}",
|
|
]
|
|
)
|
|
|
|
for tenant_shard_id, pageserver in tenant_get_shards(
|
|
self.env, self.tenant_id, pageserver_id
|
|
):
|
|
last_flush_lsn = wait_for_last_flush_lsn(
|
|
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
|
)
|
|
ps_http = pageserver.http_client()
|
|
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
|
|
|
if upload:
|
|
# force a checkpoint to trigger upload
|
|
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
|
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
|
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
|
else:
|
|
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
|
|
|
def validate(self, pageserver_id: Optional[int] = None):
|
|
endpoint = self.endpoint(pageserver_id)
|
|
result = endpoint.safe_psql_many(
|
|
[
|
|
"select clear_buffer_cache()",
|
|
f"""
|
|
SELECT COUNT(*) FROM {self.table}
|
|
""",
|
|
]
|
|
)
|
|
|
|
log.info(f"validate({self.expect_rows}): {result}")
|
|
assert result == [[("",)], [(self.expect_rows,)]]
|