tests: add tests for secondary mode & multi attach

This commit is contained in:
John Spray
2023-09-07 18:06:16 +01:00
parent ce29d1588b
commit 33a66b963f
7 changed files with 738 additions and 82 deletions

View File

@@ -1860,7 +1860,7 @@ class NeonPageserver(PgProtocol):
client = self.http_client()
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any]):
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
# This API is only for use when generations are enabled
assert self.env.attachment_service is not None
@@ -1868,7 +1868,7 @@ class NeonPageserver(PgProtocol):
config["generation"] = self.env.attachment_service.attach_hook(tenant_id, self.id)
client = self.http_client()
return client.tenant_location_conf(tenant_id, config)
return client.tenant_location_conf(tenant_id, config, **kwargs)
def read_tenant_location_conf(self, tenant_id: TenantId) -> dict[str, Any]:
path = self.tenant_dir(tenant_id) / "config-v1"
@@ -2741,6 +2741,7 @@ class EndpointFactory:
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
pageserver_id: Optional[int] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2760,6 +2761,7 @@ class EndpointFactory:
lsn=lsn,
hot_standby=hot_standby,
config_lines=config_lines,
pageserver_id=pageserver_id,
)
def stop_all(self) -> "EndpointFactory":

View File

@@ -251,11 +251,20 @@ class PageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_location_conf(self, tenant_id: TenantId, location_conf=dict[str, Any]):
def tenant_location_conf(
self, tenant_id: TenantId, location_conf=dict[str, Any], flush_ms=None
):
body = location_conf.copy()
body["tenant_id"] = str(tenant_id)
params = {}
if flush_ms is not None:
params["flush_ms"] = str(flush_ms)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config", json=body
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config",
json=body,
params=params,
)
self.verbose_error(res)
@@ -662,6 +671,14 @@ class PageserverHttpClient(requests.Session):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)
def secondary_tenant_upload(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/upload")
self.verbose_error(res)
def secondary_tenant_download(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/download")
self.verbose_error(res)
def post_tracing_event(self, level: str, message: str):
res = self.post(
f"http://localhost:{self.port}/v1/tracing/event",

View File

@@ -47,7 +47,7 @@ def wait_for_upload(
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
log.info(f"wait finished: current remote consistent lsn {current_lsn}")
return
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
log.info(

View File

@@ -1,5 +1,13 @@
from typing import Optional
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, last_flush_lsn_upload
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
@@ -21,43 +29,82 @@ class Workload:
self.expect_rows = 0
self.churn_cursor = 0
def endpoint(self, pageserver_id):
return self.env.endpoints.create_start(
"main", tenant_id=self.tenant_id, pageserver_id=pageserver_id
self._endpoint: Optional[Endpoint] = None
def endpoint(self, pageserver_id: int) -> Endpoint:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
"main",
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id="ep-workload",
)
self._endpoint.start(pageserver_id=pageserver_id)
else:
self._endpoint.reconfigure(pageserver_id=pageserver_id)
connstring = self._endpoint.safe_psql(
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
)
log.info(f"Workload.endpoint: connstr={connstring}")
return self._endpoint
def __del__(self):
if self._endpoint is not None:
self._endpoint.stop()
def init(self, pageserver_id: int):
with self.endpoint(pageserver_id) as endpoint:
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
endpoint = self.endpoint(pageserver_id)
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n, pageserver_id):
with self.endpoint(pageserver_id) as endpoint:
start = self.expect_rows
end = start + n - 1
self.expect_rows += n
dummy_value = "blah"
endpoint.safe_psql(
f"""
INSERT INTO {self.table} (id, val)
SELECT g, '{dummy_value}'
FROM generate_series({start}, {end}) g
"""
)
endpoint = self.endpoint(pageserver_id)
start = self.expect_rows
end = start + n - 1
self.expect_rows += n
dummy_value = "blah"
endpoint.safe_psql(
f"""
INSERT INTO {self.table} (id, val)
SELECT g, '{dummy_value}'
FROM generate_series({start}, {end}) g
"""
)
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def churn_rows(self, n, pageserver_id):
def churn_rows(self, n, pageserver_id, upload=True):
assert self.expect_rows >= n
with self.endpoint(pageserver_id) as endpoint:
start = self.churn_cursor % (self.expect_rows)
end = (self.churn_cursor + n - 1) % (self.expect_rows)
self.churn_cursor += n
max_iters = 10
endpoint = self.endpoint(pageserver_id)
todo = n
i = 0
while todo > 0:
i += 1
if i > max_iters:
raise RuntimeError("oops")
start = self.churn_cursor % self.expect_rows
n_iter = min((self.expect_rows - start), todo)
todo -= n_iter
end = start + n_iter - 1
log.info(
f"start,end = {start},{end}, cursor={self.churn_cursor}, expect_rows={self.expect_rows}"
)
assert end < self.expect_rows
self.churn_cursor += n_iter
dummy_value = "blah"
endpoint.safe_psql_many(
[
@@ -72,17 +119,30 @@ class Workload:
]
)
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = self.env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id)
wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
def validate(self, pageserver_id):
with self.endpoint(pageserver_id) as endpoint:
result = endpoint.safe_psql(
endpoint = self.endpoint(pageserver_id)
result = endpoint.safe_psql_many(
[
"select clear_buffer_cache()",
f"""
SELECT COUNT(*) FROM {self.table}
"""
)
SELECT COUNT(*) FROM {self.table}
""",
]
)
log.info(f"validate({self.expect_rows}): {result}")
assert result == [(self.expect_rows,)]
log.info(f"validate({self.expect_rows}): {result}")
assert result == [[("",)], [(self.expect_rows,)]]