tests: make more fixtures/helpers shard-aware

This commit is contained in:
John Spray
2023-12-18 19:43:10 +00:00
parent a5813e2516
commit 727eef05b3
4 changed files with 85 additions and 47 deletions

View File

@@ -60,7 +60,7 @@ from fixtures.remote_storage import (
default_remote_storage,
remote_storage_to_toml_inline_table,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
allure_add_grafana_links,
@@ -1690,10 +1690,20 @@ class NeonAttachmentService:
response = requests.get(f"{self.env.control_plane_api}/tenant/{tenant_id}/locate")
response.raise_for_status()
body = response.json()
log.info(f"tenant_locate success: {body}")
shards: list[dict[str, Any]] = body["shards"]
return shards
def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]:
response = requests.put(
f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split",
json={"new_shard_count": shard_count},
)
response.raise_for_status()
body = response.json()
log.info(f"tenant_shard_split success: {body}")
shards: list[TenantShardId] = body["new_shards"]
return shards
def __enter__(self) -> "NeonAttachmentService":
return self
@@ -3405,6 +3415,27 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -
time.sleep(0.5)
def tenant_get_shards(
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None
) -> list[tuple[TenantShardId, NeonPageserver]]:
"""
Helper for when you want to talk to one or more pageservers, and the
caller _might_ have specified a pageserver, or they might leave it to
us to figure out the shards for a tenant.
Caller should over the response to apply their per-pageserver action to
each shard
"""
if len(env.pageservers) > 1:
return [
(TenantShardId.parse(s["shard_id"]), env.get_pageserver(s["node_id"]))
for s in env.attachment_service.locate(tenant_id)
]
else:
# Assume an unsharded tenant
return [(TenantShardId(tenant_id, 0, 0), env.pageserver)]
def wait_for_last_flush_lsn(
env: NeonEnv,
endpoint: Endpoint,
@@ -3414,19 +3445,13 @@ def wait_for_last_flush_lsn(
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
if len(env.pageservers) > 1:
shards = [
(s["shard_id"], env.get_pageserver(s["node_id"]))
for s in env.attachment_service.locate(tenant)
]
else:
# Assume an unsharded tenant
shards = [(str(tenant), env.pageserver)]
shards = tenant_get_shards(env, tenant, pageserver_id)
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
results = []
for tenant_shard_id, pageserver in shards:
log.info(f"wait_for_last_flush_lsn: shard {tenant_shard_id}")
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
)
@@ -3447,9 +3472,16 @@ def wait_for_wal_insert_lsn(
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(
env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn
)
result = None
for tenant_shard_id, pageserver in tenant_get_shards(env, tenant, pageserver_id):
shard_r = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
)
if result is None:
result = shard_r
assert result is not None
return result
def fork_at_current_lsn(
@@ -3483,11 +3515,13 @@ def last_flush_lsn_upload(
last_flush_lsn = wait_for_last_flush_lsn(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id
)
ps_http = env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
shards = tenant_get_shards(env, tenant_id, pageserver_id)
for tenant_shard_id, pageserver in shards:
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id)
wait_for_upload(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
return last_flush_lsn

View File

@@ -13,7 +13,7 @@ from urllib3.util.retry import Retry
from fixtures.log_helper import log
from fixtures.metrics import Metrics, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import Fn
@@ -433,7 +433,7 @@ class PageserverHttpClient(requests.Session):
def timeline_detail(
self,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
@@ -455,7 +455,7 @@ class PageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs):
def timeline_delete(self, tenant_id: TenantShardId, timeline_id: TimelineId, **kwargs):
"""
Note that deletion is not instant, it is scheduled and performed mostly in the background.
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
@@ -540,7 +540,7 @@ class PageserverHttpClient(requests.Session):
return res_json
def timeline_checkpoint(
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
self, tenant_id: TenantShardId, timeline_id: TimelineId, force_repartition=False
):
self.is_testing_enabled_or_skip()
query = {}

View File

@@ -6,7 +6,7 @@ from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorageKind, S3Storage
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until
@@ -22,7 +22,7 @@ def assert_tenant_state(
def remote_consistent_lsn(
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
pageserver_http: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http.timeline_detail(tenant, timeline)
@@ -39,7 +39,7 @@ def remote_consistent_lsn(
def wait_for_upload(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
tenant: TenantShardId,
timeline: TimelineId,
lsn: Lsn,
):
@@ -92,7 +92,7 @@ def wait_until_tenant_state(
def wait_until_timeline_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
expected_state: str,
iterations: int,
@@ -141,7 +141,7 @@ def wait_until_tenant_active(
def last_record_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
pageserver_http_client: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
@@ -152,7 +152,7 @@ def last_record_lsn(
def wait_for_last_record_lsn(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
tenant: TenantShardId,
timeline: TimelineId,
lsn: Lsn,
) -> Lsn:
@@ -194,7 +194,7 @@ def wait_for_upload_queue_empty(
def wait_timeline_detail_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
iterations: int,
interval: Optional[float] = None,
@@ -219,7 +219,7 @@ def wait_timeline_detail_404(
def timeline_delete_wait_completed(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
iterations: int = 20,
interval: Optional[float] = None,

View File

@@ -5,6 +5,7 @@ from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
last_flush_lsn_upload,
tenant_get_shards,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
@@ -31,7 +32,7 @@ class Workload:
self._endpoint: Optional[Endpoint] = None
def endpoint(self, pageserver_id: int) -> Endpoint:
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
"main",
@@ -54,7 +55,7 @@ class Workload:
if self._endpoint is not None:
self._endpoint.stop()
def init(self, pageserver_id: int):
def init(self, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
@@ -63,7 +64,7 @@ class Workload:
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n, pageserver_id):
def write_rows(self, n, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)
start = self.expect_rows
end = start + n - 1
@@ -81,7 +82,7 @@ class Workload:
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def churn_rows(self, n, pageserver_id, upload=True):
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
assert self.expect_rows >= n
max_iters = 10
@@ -119,21 +120,24 @@ class Workload:
]
)
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = self.env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
for tenant_shard_id, pageserver in tenant_get_shards(
self.env, self.tenant_id, pageserver_id
):
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id)
wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
def validate(self, pageserver_id):
def validate(self, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)
result = endpoint.safe_psql_many(
[