diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index a6a25da332..7c489bda67 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -16,6 +16,7 @@ class Metrics: def query_all(self, name: str, filter: Optional[Dict[str, str]] = None) -> List[Sample]: filter = filter or {} res = [] + for sample in self.metrics[name]: try: if all(sample.labels[k] == v for k, v in filter.items()): diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5b1a8ba27d..174f2bd9ba 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -60,7 +60,7 @@ from fixtures.remote_storage import ( default_remote_storage, remote_storage_to_toml_inline_table, ) -from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import ( ATTACHMENT_NAME_REGEX, allure_add_grafana_links, @@ -490,6 +490,8 @@ class NeonEnvBuilder: self, initial_tenant_conf: Optional[Dict[str, str]] = None, default_remote_storage_if_missing: bool = True, + initial_tenant_shard_count: Optional[int] = None, + initial_tenant_shard_stripe_size: Optional[int] = None, ) -> NeonEnv: """ Default way to create and start NeonEnv. Also creates the initial_tenant with root initial_timeline. @@ -507,7 +509,11 @@ class NeonEnvBuilder: f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline" ) initial_tenant, initial_timeline = env.neon_cli.create_tenant( - tenant_id=env.initial_tenant, conf=initial_tenant_conf, timeline_id=env.initial_timeline + tenant_id=env.initial_tenant, + conf=initial_tenant_conf, + timeline_id=env.initial_timeline, + shard_count=initial_tenant_shard_count, + shard_stripe_size=initial_tenant_shard_stripe_size, ) assert env.initial_tenant == initial_tenant assert env.initial_timeline == initial_timeline @@ -1130,15 +1136,29 @@ class AbstractNeonCli(abc.ABC): env_vars[var] = val # Intercept CalledProcessError and print more info - res = subprocess.run( - args, - env=env_vars, - check=False, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=timeout, - ) + try: + res = subprocess.run( + args, + env=env_vars, + check=False, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=timeout, + ) + except subprocess.TimeoutExpired as e: + if e.stderr: + stderr = e.stderr.decode(errors="replace") + else: + stderr = "" + + if e.stdout: + stdout = e.stdout.decode(errors="replace") + else: + stdout = "" + + log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}") + raise indent = " " if not res.returncode: @@ -1189,6 +1209,8 @@ class NeonCli(AbstractNeonCli): tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None, conf: Optional[Dict[str, str]] = None, + shard_count: Optional[int] = None, + shard_stripe_size: Optional[int] = None, set_default: bool = False, ) -> Tuple[TenantId, TimelineId]: """ @@ -1216,6 +1238,12 @@ class NeonCli(AbstractNeonCli): if set_default: args.append("--set-default") + if shard_count is not None: + args.extend(["--shard-count", str(shard_count)]) + + if shard_stripe_size is not None: + args.extend(["--shard-stripe-size", str(shard_stripe_size)]) + res = self.raw_cli(args) res.check_returncode() return tenant_id, timeline_id @@ -1536,6 +1564,19 @@ class NeonCli(AbstractNeonCli): return self.raw_cli(args, check_return_code=True) + def tenant_migrate( + self, tenant_shard_id: TenantShardId, new_pageserver: int, timeout_secs: Optional[int] + ): + args = [ + "tenant", + "migrate", + "--tenant-id", + str(tenant_shard_id), + "--id", + str(new_pageserver), + ] + return self.raw_cli(args, check_return_code=True, timeout=timeout_secs) + def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]": return self.raw_cli(["start"], check_return_code=check_return_code) @@ -1631,6 +1672,66 @@ class NeonAttachmentService: else: return None + def node_register(self, node: NeonPageserver): + body = { + "node_id": int(node.id), + "listen_http_addr": "localhost", + "listen_http_port": node.service_port.http, + } + log.info(f"node_register({body})") + requests.post(f"{self.env.control_plane_api}/node", json=body).raise_for_status() + + def tenant_create( + self, + tenant_id: TenantId, + shard_count: Optional[int] = None, + shard_stripe_size: Optional[int] = None, + tenant_config: Optional[Dict[Any, Any]] = None, + ): + body: Dict[str, Any] = {"new_tenant_id": str(tenant_id)} + + if shard_count is not None: + shard_params = {"count": shard_count} + if shard_stripe_size is not None: + shard_params["stripe_size"] = shard_stripe_size + + body["shard_parameters"] = shard_params + + if tenant_config is not None: + for k, v in tenant_config.items(): + body[k] = v + + response = requests.post(f"{self.env.control_plane_api}/tenant", json=body) + response.raise_for_status() + log.info(f"tenant_create success: {response.json()}") + + def tenant_timeline_create(self, tenant_id: TenantId, timeline_id: TimelineId): + body: Dict[str, Any] = {"new_timeline_id": str(timeline_id)} + + response = requests.post( + f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body + ) + response.raise_for_status() + log.info(f"tenant_timeline_create success: {response.json()}") + + def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: + response = requests.get(f"{self.env.control_plane_api}/tenant/{tenant_id}/locate") + response.raise_for_status() + body = response.json() + shards: list[dict[str, Any]] = body["shards"] + return shards + + def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]: + response = requests.put( + f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split", + json={"new_shard_count": shard_count}, + ) + response.raise_for_status() + body = response.json() + log.info(f"tenant_shard_split success: {body}") + shards: list[TenantShardId] = body["new_shards"] + return shards + def __enter__(self) -> "NeonAttachmentService": return self @@ -3201,7 +3302,7 @@ def pytest_addoption(parser: Parser): SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg] - r"config|config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql)" + r"config|config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)" ) @@ -3297,9 +3398,7 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]: # pg is the existing and running compute node, that we want to compare with a basebackup -def check_restored_datadir_content( - test_output_dir: Path, env: NeonEnv, endpoint: Endpoint, pageserver_id: Optional[int] = None -): +def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint): # Get the timeline ID. We need it for the 'basebackup' command timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0]) @@ -3320,6 +3419,7 @@ def check_restored_datadir_content( pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version) psql_path = os.path.join(pg_bin.pg_bin_path, "psql") + pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"] cmd = rf""" {psql_path} \ --no-psqlrc \ @@ -3388,6 +3488,27 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) - time.sleep(0.5) +def tenant_get_shards( + env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None +) -> list[tuple[TenantShardId, NeonPageserver]]: + """ + Helper for when you want to talk to one or more pageservers, and the + caller _might_ have specified a pageserver, or they might leave it to + us to figure out the shards for a tenant. + + Caller should over the response to apply their per-pageserver action to + each shard + """ + if len(env.pageservers) > 1: + return [ + (TenantShardId.parse(s["shard_id"]), env.get_pageserver(s["node_id"])) + for s in env.attachment_service.locate(tenant_id) + ] + else: + # Assume an unsharded tenant + return [(TenantShardId(tenant_id, 0, 0), env.pageserver)] + + def wait_for_last_flush_lsn( env: NeonEnv, endpoint: Endpoint, @@ -3397,10 +3518,22 @@ def wait_for_last_flush_lsn( ) -> Lsn: """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" + shards = tenant_get_shards(env, tenant, pageserver_id) + last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - return wait_for_last_record_lsn( - env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn - ) + + results = [] + for tenant_shard_id, pageserver in shards: + log.info(f"wait_for_last_flush_lsn: shard {tenant_shard_id}") + waited = wait_for_last_record_lsn( + pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn + ) + + assert waited >= last_flush_lsn + results.append(waited) + + # Return the lowest LSN that has been ingested by all shards + return min(results) def wait_for_wal_insert_lsn( @@ -3412,9 +3545,16 @@ def wait_for_wal_insert_lsn( ) -> Lsn: """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0]) - return wait_for_last_record_lsn( - env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn - ) + result = None + for tenant_shard_id, pageserver in tenant_get_shards(env, tenant, pageserver_id): + shard_r = wait_for_last_record_lsn( + pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn + ) + if result is None: + result = shard_r + + assert result is not None + return result def fork_at_current_lsn( @@ -3448,11 +3588,13 @@ def last_flush_lsn_upload( last_flush_lsn = wait_for_last_flush_lsn( env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id ) - ps_http = env.get_pageserver(pageserver_id).http_client() - wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn) - # force a checkpoint to trigger upload - ps_http.timeline_checkpoint(tenant_id, timeline_id) - wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn) + shards = tenant_get_shards(env, tenant_id, pageserver_id) + for tenant_shard_id, pageserver in shards: + ps_http = pageserver.http_client() + wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) + # force a checkpoint to trigger upload + ps_http.timeline_checkpoint(tenant_shard_id, timeline_id) + wait_for_upload(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) return last_flush_lsn diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 6dea0d923d..711f362882 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -13,7 +13,7 @@ from urllib3.util.retry import Retry from fixtures.log_helper import log from fixtures.metrics import Metrics, parse_metrics from fixtures.pg_version import PgVersion -from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import Fn @@ -433,7 +433,7 @@ class PageserverHttpClient(requests.Session): def timeline_detail( self, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, include_non_incremental_logical_size: bool = False, include_timeline_dir_layer_file_size_sum: bool = False, @@ -455,7 +455,7 @@ class PageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs): + def timeline_delete(self, tenant_id: TenantShardId, timeline_id: TimelineId, **kwargs): """ Note that deletion is not instant, it is scheduled and performed mostly in the background. So if you need to wait for it to complete use `timeline_delete_wait_completed`. @@ -469,7 +469,7 @@ class PageserverHttpClient(requests.Session): assert res_json is None def timeline_gc( - self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int] + self, tenant_id: TenantShardId, timeline_id: TimelineId, gc_horizon: Optional[int] ) -> dict[str, Any]: """ Unlike most handlers, this will wait for the layers to be actually @@ -540,7 +540,7 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_checkpoint( - self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False + self, tenant_id: TenantShardId, timeline_id: TimelineId, force_repartition=False ): self.is_testing_enabled_or_skip() query = {} @@ -682,6 +682,34 @@ class PageserverHttpClient(requests.Session): assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}" return results[0].value + def get_metrics_values( + self, names: list[str], filter: Optional[Dict[str, str]] = None + ) -> Dict[str, float]: + """ + When fetching multiple named metrics, it is more efficient to use this + than to call `get_metric_value` repeatedly. + + Throws RuntimeError if no metrics matching `names` are found, or if + not all of `names` are found: this method is intended for loading sets + of metrics whose existence is coupled. + """ + metrics = self.get_metrics() + samples = [] + for name in names: + samples.extend(metrics.query_all(name, filter=filter)) + + result = {} + for sample in samples: + if sample.name in result: + raise RuntimeError(f"Multiple values found for {sample.name}") + result[sample.name] = sample.value + + if len(result) != len(names): + log.info(f"Metrics found: {metrics.metrics}") + raise RuntimeError(f"could not find all metrics {' '.join(names)}") + + return result + def layer_map_info( self, tenant_id: TenantId, diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index e7b78cfb9a..0dd771568a 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -6,7 +6,7 @@ from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.remote_storage import RemoteStorageKind, S3Storage -from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import wait_until @@ -22,7 +22,7 @@ def assert_tenant_state( def remote_consistent_lsn( - pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId + pageserver_http: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId ) -> Lsn: detail = pageserver_http.timeline_detail(tenant, timeline) @@ -39,7 +39,7 @@ def remote_consistent_lsn( def wait_for_upload( pageserver_http: PageserverHttpClient, - tenant: TenantId, + tenant: TenantShardId, timeline: TimelineId, lsn: Lsn, ): @@ -92,7 +92,7 @@ def wait_until_tenant_state( def wait_until_timeline_state( pageserver_http: PageserverHttpClient, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, expected_state: str, iterations: int, @@ -141,7 +141,7 @@ def wait_until_tenant_active( def last_record_lsn( - pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId + pageserver_http_client: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId ) -> Lsn: detail = pageserver_http_client.timeline_detail(tenant, timeline) @@ -152,7 +152,7 @@ def last_record_lsn( def wait_for_last_record_lsn( pageserver_http: PageserverHttpClient, - tenant: TenantId, + tenant: TenantShardId, timeline: TimelineId, lsn: Lsn, ) -> Lsn: @@ -194,7 +194,7 @@ def wait_for_upload_queue_empty( def wait_timeline_detail_404( pageserver_http: PageserverHttpClient, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, iterations: int, interval: Optional[float] = None, @@ -219,7 +219,7 @@ def wait_timeline_detail_404( def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, iterations: int = 20, interval: Optional[float] = None, diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 241531437c..30def1194d 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -5,6 +5,7 @@ from fixtures.neon_fixtures import ( Endpoint, NeonEnv, last_flush_lsn_upload, + tenant_get_shards, wait_for_last_flush_lsn, ) from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload @@ -31,7 +32,7 @@ class Workload: self._endpoint: Optional[Endpoint] = None - def endpoint(self, pageserver_id: int) -> Endpoint: + def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint: if self._endpoint is None: self._endpoint = self.env.endpoints.create( "main", @@ -54,7 +55,7 @@ class Workload: if self._endpoint is not None: self._endpoint.stop() - def init(self, pageserver_id: int): + def init(self, pageserver_id: Optional[int] = None): endpoint = self.endpoint(pageserver_id) endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);") @@ -63,7 +64,7 @@ class Workload: self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id ) - def write_rows(self, n, pageserver_id): + def write_rows(self, n, pageserver_id: Optional[int] = None): endpoint = self.endpoint(pageserver_id) start = self.expect_rows end = start + n - 1 @@ -81,7 +82,7 @@ class Workload: self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id ) - def churn_rows(self, n, pageserver_id, upload=True): + def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True): assert self.expect_rows >= n max_iters = 10 @@ -119,21 +120,24 @@ class Workload: ] ) - last_flush_lsn = wait_for_last_flush_lsn( - self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id - ) - ps_http = self.env.get_pageserver(pageserver_id).http_client() - wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) + for tenant_shard_id, pageserver in tenant_get_shards( + self.env, self.tenant_id, pageserver_id + ): + last_flush_lsn = wait_for_last_flush_lsn( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) + ps_http = pageserver.http_client() + wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn) - if upload: - # force a checkpoint to trigger upload - ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id) - wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) - log.info(f"Churn: waiting for remote LSN {last_flush_lsn}") - else: - log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}") + if upload: + # force a checkpoint to trigger upload + ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id) + wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn) + log.info(f"Churn: waiting for remote LSN {last_flush_lsn}") + else: + log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}") - def validate(self, pageserver_id): + def validate(self, pageserver_id: Optional[int] = None): endpoint = self.endpoint(pageserver_id) result = endpoint.safe_psql_many( [