mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
tests: start adding tests for secondary mode, live migration (#5842)
These tests have been loitering on a branch of mine for a while: they already provide value even without all the secondary mode bits landed yet, and the Workload helper is handy for other tests too. - `Workload` is a re-usable test workload that replaces some of the arbitrary "write a few rows" SQL that I've found my self repeating, and adds a systematic way to append data and check that reads properly reflect the changes. This append+validate stuff is important when doing migrations, as we want to detect situations where we might be reading from a pageserver that has not properly seen latest changes. - test_multi_attach is a validation of how the pageserver handles attaching the same tenant to multiple pageservers, from a safety point of view. This is intentionally separate from the larger testing of migration, to provide an isolated environment for multi-attachment. - test_location_conf_churn is a pseudo-random walk through the various states that TenantSlot can be put into, with validation that attached tenants remain externally readable when they should, and as a side effect validating that the compute endpoint's online configuration changes work as expected. - test_live_migration is the reference implementation of how to drive a pair of pageservers through a zero-downtime migration of a tenant. --------- Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
148
test_runner/fixtures/workload.py
Normal file
148
test_runner/fixtures/workload.py
Normal file
@@ -0,0 +1,148 @@
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
class Workload:
|
||||
"""
|
||||
This is not a general purpose load generator: it exists for storage tests that need to inject some
|
||||
high level types of storage work via the postgres interface:
|
||||
- layer writes (`write_rows`)
|
||||
- work for compaction (`churn_rows`)
|
||||
- reads, checking we get the right data (`validate`)
|
||||
"""
|
||||
|
||||
def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
self.env = env
|
||||
self.tenant_id = tenant_id
|
||||
self.timeline_id = timeline_id
|
||||
self.table = "foo"
|
||||
|
||||
self.expect_rows = 0
|
||||
self.churn_cursor = 0
|
||||
|
||||
self._endpoint: Optional[Endpoint] = None
|
||||
|
||||
def endpoint(self, pageserver_id: int) -> Endpoint:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
"main",
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id="ep-workload",
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
connstring = self._endpoint.safe_psql(
|
||||
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
||||
)
|
||||
log.info(f"Workload.endpoint: connstr={connstring}")
|
||||
|
||||
return self._endpoint
|
||||
|
||||
def __del__(self):
|
||||
if self._endpoint is not None:
|
||||
self._endpoint.stop()
|
||||
|
||||
def init(self, pageserver_id: int):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
|
||||
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
|
||||
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
|
||||
last_flush_lsn_upload(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
|
||||
def write_rows(self, n, pageserver_id):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
start = self.expect_rows
|
||||
end = start + n - 1
|
||||
self.expect_rows += n
|
||||
dummy_value = "blah"
|
||||
endpoint.safe_psql(
|
||||
f"""
|
||||
INSERT INTO {self.table} (id, val)
|
||||
SELECT g, '{dummy_value}'
|
||||
FROM generate_series({start}, {end}) g
|
||||
"""
|
||||
)
|
||||
|
||||
return last_flush_lsn_upload(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
|
||||
def churn_rows(self, n, pageserver_id, upload=True):
|
||||
assert self.expect_rows >= n
|
||||
|
||||
max_iters = 10
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
todo = n
|
||||
i = 0
|
||||
while todo > 0:
|
||||
i += 1
|
||||
if i > max_iters:
|
||||
raise RuntimeError("oops")
|
||||
start = self.churn_cursor % self.expect_rows
|
||||
n_iter = min((self.expect_rows - start), todo)
|
||||
todo -= n_iter
|
||||
|
||||
end = start + n_iter - 1
|
||||
|
||||
log.info(
|
||||
f"start,end = {start},{end}, cursor={self.churn_cursor}, expect_rows={self.expect_rows}"
|
||||
)
|
||||
|
||||
assert end < self.expect_rows
|
||||
|
||||
self.churn_cursor += n_iter
|
||||
dummy_value = "blah"
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO {self.table} (id, val)
|
||||
SELECT g, '{dummy_value}'
|
||||
FROM generate_series({start}, {end}) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
f"VACUUM {self.table}",
|
||||
]
|
||||
)
|
||||
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
ps_http = self.env.get_pageserver(pageserver_id).http_client()
|
||||
wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
|
||||
|
||||
if upload:
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
|
||||
def validate(self, pageserver_id):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
result = endpoint.safe_psql_many(
|
||||
[
|
||||
"select clear_buffer_cache()",
|
||||
f"""
|
||||
SELECT COUNT(*) FROM {self.table}
|
||||
""",
|
||||
]
|
||||
)
|
||||
|
||||
log.info(f"validate({self.expect_rows}): {result}")
|
||||
assert result == [[("",)], [(self.expect_rows,)]]
|
||||
Reference in New Issue
Block a user