diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 03d49b7f2e..b8943aad31 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -60,7 +60,7 @@ from fixtures.remote_storage import ( default_remote_storage, remote_storage_to_toml_inline_table, ) -from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import ( ATTACHMENT_NAME_REGEX, allure_add_grafana_links, @@ -1690,10 +1690,20 @@ class NeonAttachmentService: response = requests.get(f"{self.env.control_plane_api}/tenant/{tenant_id}/locate") response.raise_for_status() body = response.json() - log.info(f"tenant_locate success: {body}") shards: list[dict[str, Any]] = body["shards"] return shards + def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]: + response = requests.put( + f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split", + json={"new_shard_count": shard_count}, + ) + response.raise_for_status() + body = response.json() + log.info(f"tenant_shard_split success: {body}") + shards: list[TenantShardId] = body["new_shards"] + return shards + def __enter__(self) -> "NeonAttachmentService": return self @@ -3405,6 +3415,27 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) - time.sleep(0.5) +def tenant_get_shards( + env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None +) -> list[tuple[TenantShardId, NeonPageserver]]: + """ + Helper for when you want to talk to one or more pageservers, and the + caller _might_ have specified a pageserver, or they might leave it to + us to figure out the shards for a tenant. + + Caller should over the response to apply their per-pageserver action to + each shard + """ + if len(env.pageservers) > 1: + return [ + (TenantShardId.parse(s["shard_id"]), env.get_pageserver(s["node_id"])) + for s in env.attachment_service.locate(tenant_id) + ] + else: + # Assume an unsharded tenant + return [(TenantShardId(tenant_id, 0, 0), env.pageserver)] + + def wait_for_last_flush_lsn( env: NeonEnv, endpoint: Endpoint, @@ -3414,19 +3445,13 @@ def wait_for_last_flush_lsn( ) -> Lsn: """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" - if len(env.pageservers) > 1: - shards = [ - (s["shard_id"], env.get_pageserver(s["node_id"])) - for s in env.attachment_service.locate(tenant) - ] - else: - # Assume an unsharded tenant - shards = [(str(tenant), env.pageserver)] + shards = tenant_get_shards(env, tenant, pageserver_id) last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) results = [] for tenant_shard_id, pageserver in shards: + log.info(f"wait_for_last_flush_lsn: shard {tenant_shard_id}") waited = wait_for_last_record_lsn( pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn ) @@ -3447,9 +3472,16 @@ def wait_for_wal_insert_lsn( ) -> Lsn: """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0]) - return wait_for_last_record_lsn( - env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn - ) + result = None + for tenant_shard_id, pageserver in tenant_get_shards(env, tenant, pageserver_id): + shard_r = wait_for_last_record_lsn( + pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn + ) + if result is None: + result = shard_r + + assert result is not None + return result def fork_at_current_lsn( @@ -3483,11 +3515,13 @@ def last_flush_lsn_upload( last_flush_lsn = wait_for_last_flush_lsn( env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id ) - ps_http = env.get_pageserver(pageserver_id).http_client() - wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn) - # force a checkpoint to trigger upload - ps_http.timeline_checkpoint(tenant_id, timeline_id) - wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn) + shards = tenant_get_shards(env, tenant_id, pageserver_id) + for tenant_shard_id, pageserver in shards: + ps_http = pageserver.http_client() + wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) + # force a checkpoint to trigger upload + ps_http.timeline_checkpoint(tenant_shard_id, timeline_id) + wait_for_upload(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) return last_flush_lsn diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index ad701bae7a..3e35b88000 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -13,7 +13,7 @@ from urllib3.util.retry import Retry from fixtures.log_helper import log from fixtures.metrics import Metrics, parse_metrics from fixtures.pg_version import PgVersion -from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import Fn @@ -433,7 +433,7 @@ class PageserverHttpClient(requests.Session): def timeline_detail( self, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, include_non_incremental_logical_size: bool = False, include_timeline_dir_layer_file_size_sum: bool = False, @@ -455,7 +455,7 @@ class PageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs): + def timeline_delete(self, tenant_id: TenantShardId, timeline_id: TimelineId, **kwargs): """ Note that deletion is not instant, it is scheduled and performed mostly in the background. So if you need to wait for it to complete use `timeline_delete_wait_completed`. @@ -540,7 +540,7 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_checkpoint( - self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False + self, tenant_id: TenantShardId, timeline_id: TimelineId, force_repartition=False ): self.is_testing_enabled_or_skip() query = {} diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index e7b78cfb9a..0dd771568a 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -6,7 +6,7 @@ from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.remote_storage import RemoteStorageKind, S3Storage -from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import wait_until @@ -22,7 +22,7 @@ def assert_tenant_state( def remote_consistent_lsn( - pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId + pageserver_http: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId ) -> Lsn: detail = pageserver_http.timeline_detail(tenant, timeline) @@ -39,7 +39,7 @@ def remote_consistent_lsn( def wait_for_upload( pageserver_http: PageserverHttpClient, - tenant: TenantId, + tenant: TenantShardId, timeline: TimelineId, lsn: Lsn, ): @@ -92,7 +92,7 @@ def wait_until_tenant_state( def wait_until_timeline_state( pageserver_http: PageserverHttpClient, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, expected_state: str, iterations: int, @@ -141,7 +141,7 @@ def wait_until_tenant_active( def last_record_lsn( - pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId + pageserver_http_client: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId ) -> Lsn: detail = pageserver_http_client.timeline_detail(tenant, timeline) @@ -152,7 +152,7 @@ def last_record_lsn( def wait_for_last_record_lsn( pageserver_http: PageserverHttpClient, - tenant: TenantId, + tenant: TenantShardId, timeline: TimelineId, lsn: Lsn, ) -> Lsn: @@ -194,7 +194,7 @@ def wait_for_upload_queue_empty( def wait_timeline_detail_404( pageserver_http: PageserverHttpClient, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, iterations: int, interval: Optional[float] = None, @@ -219,7 +219,7 @@ def wait_timeline_detail_404( def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, iterations: int = 20, interval: Optional[float] = None, diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 241531437c..30def1194d 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -5,6 +5,7 @@ from fixtures.neon_fixtures import ( Endpoint, NeonEnv, last_flush_lsn_upload, + tenant_get_shards, wait_for_last_flush_lsn, ) from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload @@ -31,7 +32,7 @@ class Workload: self._endpoint: Optional[Endpoint] = None - def endpoint(self, pageserver_id: int) -> Endpoint: + def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint: if self._endpoint is None: self._endpoint = self.env.endpoints.create( "main", @@ -54,7 +55,7 @@ class Workload: if self._endpoint is not None: self._endpoint.stop() - def init(self, pageserver_id: int): + def init(self, pageserver_id: Optional[int] = None): endpoint = self.endpoint(pageserver_id) endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);") @@ -63,7 +64,7 @@ class Workload: self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id ) - def write_rows(self, n, pageserver_id): + def write_rows(self, n, pageserver_id: Optional[int] = None): endpoint = self.endpoint(pageserver_id) start = self.expect_rows end = start + n - 1 @@ -81,7 +82,7 @@ class Workload: self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id ) - def churn_rows(self, n, pageserver_id, upload=True): + def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True): assert self.expect_rows >= n max_iters = 10 @@ -119,21 +120,24 @@ class Workload: ] ) - last_flush_lsn = wait_for_last_flush_lsn( - self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id - ) - ps_http = self.env.get_pageserver(pageserver_id).http_client() - wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) + for tenant_shard_id, pageserver in tenant_get_shards( + self.env, self.tenant_id, pageserver_id + ): + last_flush_lsn = wait_for_last_flush_lsn( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) + ps_http = pageserver.http_client() + wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn) - if upload: - # force a checkpoint to trigger upload - ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id) - wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) - log.info(f"Churn: waiting for remote LSN {last_flush_lsn}") - else: - log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}") + if upload: + # force a checkpoint to trigger upload + ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id) + wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn) + log.info(f"Churn: waiting for remote LSN {last_flush_lsn}") + else: + log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}") - def validate(self, pageserver_id): + def validate(self, pageserver_id: Optional[int] = None): endpoint = self.endpoint(pageserver_id) result = endpoint.safe_psql_many( [