From bfeb428d1b367351df11b7b17a75cff1c3c64ff6 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Fri, 7 Apr 2023 13:47:28 +0300 Subject: [PATCH] tests: make neon_fixtures a bit thinner by splitting out some pageserver related helpers (#3977) neon_fixture is quite big and messy, lets clean it up a bit. --- test_runner/fixtures/neon_fixtures.py | 702 +----------------- test_runner/fixtures/pageserver/__init__.py | 0 test_runner/fixtures/pageserver/http.py | 545 ++++++++++++++ test_runner/fixtures/pageserver/utils.py | 145 ++++ test_runner/fixtures/utils.py | 16 + .../performance/test_branch_creation.py | 2 +- test_runner/regress/test_auth.py | 3 +- test_runner/regress/test_compatibility.py | 5 +- .../regress/test_disk_usage_eviction.py | 10 +- test_runner/regress/test_import.py | 3 +- test_runner/regress/test_layer_eviction.py | 3 +- test_runner/regress/test_neon_cli.py | 2 +- test_runner/regress/test_normal_work.py | 3 +- test_runner/regress/test_ondemand_download.py | 12 +- test_runner/regress/test_pageserver_api.py | 2 +- test_runner/regress/test_read_trace.py | 3 +- test_runner/regress/test_readonly_node.py | 3 +- test_runner/regress/test_remote_storage.py | 33 +- test_runner/regress/test_tenant_conf.py | 3 +- test_runner/regress/test_tenant_detach.py | 8 +- test_runner/regress/test_tenant_relocation.py | 12 +- test_runner/regress/test_tenant_size.py | 2 +- .../test_tenants_with_remote_storage.py | 6 +- test_runner/regress/test_timeline_delete.py | 3 +- test_runner/regress/test_timeline_size.py | 26 +- test_runner/regress/test_wal_acceptor.py | 3 +- .../test_walredo_not_left_behind_on_detach.py | 3 +- 27 files changed, 779 insertions(+), 779 deletions(-) create mode 100644 test_runner/fixtures/pageserver/__init__.py create mode 100644 test_runner/fixtures/pageserver/http.py create mode 100644 test_runner/fixtures/pageserver/utils.py diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c24158e9ec..5b6f2e5c96 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -14,7 +14,6 @@ import tempfile import textwrap import time import uuid -from collections import defaultdict from contextlib import closing, contextmanager from dataclasses import dataclass, field from datetime import datetime @@ -44,11 +43,11 @@ from psycopg2.extensions import make_dsn, parse_dsn from typing_extensions import Literal from fixtures.log_helper import log -from fixtures.metrics import Metrics, parse_metrics +from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import ( ATTACHMENT_NAME_REGEX, - Fn, allure_add_grafana_links, allure_attach_from_dir, get_self_dir, @@ -1120,538 +1119,6 @@ def neon_env_builder( yield builder -class PageserverApiException(Exception): - def __init__(self, message, status_code: int): - super().__init__(message) - self.status_code = status_code - - -class PageserverHttpClient(requests.Session): - def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None): - super().__init__() - self.port = port - self.auth_token = auth_token - self.is_testing_enabled_or_skip = is_testing_enabled_or_skip - - if auth_token is not None: - self.headers["Authorization"] = f"Bearer {auth_token}" - - def verbose_error(self, res: requests.Response): - try: - res.raise_for_status() - except requests.RequestException as e: - try: - msg = res.json()["msg"] - except: # noqa: E722 - msg = "" - raise PageserverApiException(msg, res.status_code) from e - - def check_status(self): - self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() - - def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]): - self.is_testing_enabled_or_skip() - - if isinstance(config_strings, tuple): - pairs = [config_strings] - else: - pairs = config_strings - - log.info(f"Requesting config failpoints: {repr(pairs)}") - - res = self.put( - f"http://localhost:{self.port}/v1/failpoints", - json=[{"name": name, "actions": actions} for name, actions in pairs], - ) - log.info(f"Got failpoints request response code {res.status_code}") - self.verbose_error(res) - res_json = res.json() - assert res_json is None - return res_json - - def tenant_list(self) -> List[Dict[Any, Any]]: - res = self.get(f"http://localhost:{self.port}/v1/tenant") - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, list) - return res_json - - def tenant_create(self, new_tenant_id: Optional[TenantId] = None) -> TenantId: - res = self.post( - f"http://localhost:{self.port}/v1/tenant", - json={ - "new_tenant_id": str(new_tenant_id) if new_tenant_id else None, - }, - ) - self.verbose_error(res) - if res.status_code == 409: - raise Exception(f"could not create tenant: already exists for id {new_tenant_id}") - new_tenant_id = res.json() - assert isinstance(new_tenant_id, str) - return TenantId(new_tenant_id) - - def tenant_attach(self, tenant_id: TenantId): - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach") - self.verbose_error(res) - - def tenant_detach(self, tenant_id: TenantId, detach_ignored=False): - params = {} - if detach_ignored: - params["detach_ignored"] = "true" - - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params) - self.verbose_error(res) - - def tenant_load(self, tenant_id: TenantId): - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load") - self.verbose_error(res) - - def tenant_ignore(self, tenant_id: TenantId): - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore") - self.verbose_error(res) - - def tenant_status(self, tenant_id: TenantId) -> Dict[Any, Any]: - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, dict) - return res_json - - def tenant_config(self, tenant_id: TenantId) -> TenantConfig: - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/config") - self.verbose_error(res) - return TenantConfig.from_json(res.json()) - - def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]): - assert "tenant_id" not in config.keys() - res = self.put( - f"http://localhost:{self.port}/v1/tenant/config", - json={**config, "tenant_id": str(tenant_id)}, - ) - self.verbose_error(res) - - def patch_tenant_config_client_side( - self, - tenant_id: TenantId, - inserts: Optional[Dict[str, Any]] = None, - removes: Optional[List[str]] = None, - ): - current = self.tenant_config(tenant_id).tenant_specific_overrides - if inserts is not None: - current.update(inserts) - if removes is not None: - for key in removes: - del current[key] - self.set_tenant_config(tenant_id, current) - - def tenant_size(self, tenant_id: TenantId) -> int: - return self.tenant_size_and_modelinputs(tenant_id)[0] - - def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]: - """ - Returns the tenant size, together with the model inputs as the second tuple item. - """ - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size") - self.verbose_error(res) - res = res.json() - assert isinstance(res, dict) - assert TenantId(res["id"]) == tenant_id - size = res["size"] - assert type(size) == int - inputs = res["inputs"] - assert type(inputs) is dict - return (size, inputs) - - def tenant_size_debug(self, tenant_id: TenantId) -> str: - """ - Returns the tenant size debug info, as an HTML string - """ - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size", - headers={"Accept": "text/html"}, - ) - return res.text - - def timeline_list( - self, - tenant_id: TenantId, - include_non_incremental_logical_size: bool = False, - include_timeline_dir_layer_file_size_sum: bool = False, - ) -> List[Dict[str, Any]]: - params = {} - if include_non_incremental_logical_size: - params["include-non-incremental-logical-size"] = "true" - if include_timeline_dir_layer_file_size_sum: - params["include-timeline-dir-layer-file-size-sum"] = "true" - - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", params=params - ) - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, list) - return res_json - - def timeline_create( - self, - tenant_id: TenantId, - new_timeline_id: Optional[TimelineId] = None, - ancestor_timeline_id: Optional[TimelineId] = None, - ancestor_start_lsn: Optional[Lsn] = None, - ) -> Dict[Any, Any]: - res = self.post( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", - json={ - "new_timeline_id": str(new_timeline_id) if new_timeline_id else None, - "ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None, - "ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None, - }, - ) - self.verbose_error(res) - if res.status_code == 409: - raise Exception(f"could not create timeline: already exists for id {new_timeline_id}") - - res_json = res.json() - assert isinstance(res_json, dict) - return res_json - - def timeline_detail( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - include_non_incremental_logical_size: bool = False, - include_timeline_dir_layer_file_size_sum: bool = False, - **kwargs, - ) -> Dict[Any, Any]: - params = {} - if include_non_incremental_logical_size: - params["include-non-incremental-logical-size"] = "true" - if include_timeline_dir_layer_file_size_sum: - params["include-timeline-dir-layer-file-size-sum"] = "true" - - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", - params=params, - **kwargs, - ) - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, dict) - return res_json - - def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId): - res = self.delete( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}" - ) - self.verbose_error(res) - res_json = res.json() - assert res_json is None - - def timeline_gc( - self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int] - ) -> dict[str, Any]: - self.is_testing_enabled_or_skip() - - log.info( - f"Requesting GC: tenant {tenant_id}, timeline {timeline_id}, gc_horizon {repr(gc_horizon)}" - ) - res = self.put( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc", - json={"gc_horizon": gc_horizon}, - ) - log.info(f"Got GC request response code: {res.status_code}") - self.verbose_error(res) - res_json = res.json() - assert res_json is not None - assert isinstance(res_json, dict) - return res_json - - def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId): - self.is_testing_enabled_or_skip() - - log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}") - res = self.put( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact" - ) - log.info(f"Got compact request response code: {res.status_code}") - self.verbose_error(res) - res_json = res.json() - assert res_json is None - - def timeline_get_lsn_by_timestamp( - self, tenant_id: TenantId, timeline_id: TimelineId, timestamp - ): - log.info( - f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}" - ) - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}", - ) - self.verbose_error(res) - res_json = res.json() - return res_json - - def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): - self.is_testing_enabled_or_skip() - - log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") - res = self.put( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint" - ) - log.info(f"Got checkpoint request response code: {res.status_code}") - self.verbose_error(res) - res_json = res.json() - assert res_json is None - - def timeline_spawn_download_remote_layers( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - max_concurrent_downloads: int, - ) -> dict[str, Any]: - body = { - "max_concurrent_downloads": max_concurrent_downloads, - } - res = self.post( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers", - json=body, - ) - self.verbose_error(res) - res_json = res.json() - assert res_json is not None - assert isinstance(res_json, dict) - return res_json - - def timeline_poll_download_remote_layers_status( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - spawn_response: dict[str, Any], - poll_state=None, - ) -> None | dict[str, Any]: - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers", - ) - self.verbose_error(res) - res_json = res.json() - assert res_json is not None - assert isinstance(res_json, dict) - - # assumption in this API client here is that nobody else spawns the task - assert res_json["task_id"] == spawn_response["task_id"] - - if poll_state is None or res_json["state"] == poll_state: - return res_json - return None - - def timeline_download_remote_layers( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - max_concurrent_downloads: int, - errors_ok=False, - at_least_one_download=True, - ): - res = self.timeline_spawn_download_remote_layers( - tenant_id, timeline_id, max_concurrent_downloads - ) - while True: - completed = self.timeline_poll_download_remote_layers_status( - tenant_id, timeline_id, res, poll_state="Completed" - ) - if not completed: - time.sleep(0.1) - continue - if not errors_ok: - assert completed["failed_download_count"] == 0 - if at_least_one_download: - assert completed["successful_download_count"] > 0 - return completed - - def get_metrics_str(self) -> str: - """You probably want to use get_metrics() instead.""" - res = self.get(f"http://localhost:{self.port}/metrics") - self.verbose_error(res) - return res.text - - def get_metrics(self) -> Metrics: - res = self.get_metrics_str() - return parse_metrics(res) - - def get_timeline_metric( - self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str - ) -> float: - metrics = self.get_metrics() - return metrics.query_one( - metric_name, - filter={ - "tenant_id": str(tenant_id), - "timeline_id": str(timeline_id), - }, - ).value - - def get_remote_timeline_client_metric( - self, - metric_name: str, - tenant_id: TenantId, - timeline_id: TimelineId, - file_kind: str, - op_kind: str, - ) -> Optional[float]: - metrics = self.get_metrics() - matches = metrics.query_all( - name=metric_name, - filter={ - "tenant_id": str(tenant_id), - "timeline_id": str(timeline_id), - "file_kind": str(file_kind), - "op_kind": str(op_kind), - }, - ) - if len(matches) == 0: - value = None - elif len(matches) == 1: - value = matches[0].value - assert value is not None - else: - assert len(matches) < 2, "above filter should uniquely identify metric" - return value - - def get_metric_value( - self, name: str, filter: Optional[Dict[str, str]] = None - ) -> Optional[float]: - metrics = self.get_metrics() - results = metrics.query_all(name, filter=filter) - if not results: - log.info(f'could not find metric "{name}"') - return None - assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}" - return results[0].value - - def layer_map_info( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - ) -> LayerMapInfo: - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/", - ) - self.verbose_error(res) - return LayerMapInfo.from_json(res.json()) - - def download_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str): - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}", - ) - self.verbose_error(res) - - assert res.status_code == 200 - - def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str): - res = self.delete( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}", - ) - self.verbose_error(res) - - assert res.status_code == 200 - - def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId): - info = self.layer_map_info(tenant_id, timeline_id) - for layer in info.historic_layers: - self.evict_layer(tenant_id, timeline_id, layer.layer_file_name) - - def disk_usage_eviction_run(self, request: dict[str, Any]): - res = self.put( - f"http://localhost:{self.port}/v1/disk_usage_eviction/run", - json=request, - ) - self.verbose_error(res) - return res.json() - - def tenant_break(self, tenant_id: TenantId): - res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break") - self.verbose_error(res) - - -@dataclass -class TenantConfig: - tenant_specific_overrides: Dict[str, Any] - effective_config: Dict[str, Any] - - @classmethod - def from_json(cls, d: Dict[str, Any]) -> TenantConfig: - return TenantConfig( - tenant_specific_overrides=d["tenant_specific_overrides"], - effective_config=d["effective_config"], - ) - - -@dataclass -class LayerMapInfo: - in_memory_layers: List[InMemoryLayerInfo] - historic_layers: List[HistoricLayerInfo] - - @classmethod - def from_json(cls, d: Dict[str, Any]) -> LayerMapInfo: - info = LayerMapInfo(in_memory_layers=[], historic_layers=[]) - - json_in_memory_layers = d["in_memory_layers"] - assert isinstance(json_in_memory_layers, List) - for json_in_memory_layer in json_in_memory_layers: - info.in_memory_layers.append(InMemoryLayerInfo.from_json(json_in_memory_layer)) - - json_historic_layers = d["historic_layers"] - assert isinstance(json_historic_layers, List) - for json_historic_layer in json_historic_layers: - info.historic_layers.append(HistoricLayerInfo.from_json(json_historic_layer)) - - return info - - def kind_count(self) -> Dict[str, int]: - counts: Dict[str, int] = defaultdict(int) - for inmem_layer in self.in_memory_layers: - counts[inmem_layer.kind] += 1 - for hist_layer in self.historic_layers: - counts[hist_layer.kind] += 1 - return counts - - -@dataclass -class InMemoryLayerInfo: - kind: str - lsn_start: str - lsn_end: Optional[str] - - @classmethod - def from_json(cls, d: Dict[str, Any]) -> InMemoryLayerInfo: - return InMemoryLayerInfo( - kind=d["kind"], - lsn_start=d["lsn_start"], - lsn_end=d.get("lsn_end"), - ) - - -@dataclass(frozen=True) -class HistoricLayerInfo: - kind: str - layer_file_name: str - layer_file_size: Optional[int] - lsn_start: str - lsn_end: Optional[str] - remote: bool - - @classmethod - def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo: - return HistoricLayerInfo( - kind=d["kind"], - layer_file_name=d["layer_file_name"], - layer_file_size=d.get("layer_file_size"), - lsn_start=d["lsn_start"], - lsn_end=d.get("lsn_end"), - remote=d["remote"], - ) - - @dataclass class PageserverPort: pg: int @@ -3386,151 +2853,6 @@ def check_restored_datadir_content( assert (mismatch, error) == ([], []) -def wait_until(number_of_iterations: int, interval: float, func): - """ - Wait until 'func' returns successfully, without exception. Returns the - last return value from the function. - """ - last_exception = None - for i in range(number_of_iterations): - try: - res = func() - except Exception as e: - log.info("waiting for %s iteration %s failed", func, i + 1) - last_exception = e - time.sleep(interval) - continue - return res - raise Exception("timed out while waiting for %s" % func) from last_exception - - -def wait_while(number_of_iterations: int, interval: float, func): - """ - Wait until 'func' returns false, or throws an exception. - """ - for i in range(number_of_iterations): - try: - if not func(): - return - log.info("waiting for %s iteration %s failed", func, i + 1) - time.sleep(interval) - continue - except Exception: - return - raise Exception("timed out while waiting for %s" % func) - - -def assert_tenant_status( - pageserver_http_client: PageserverHttpClient, tenant: TenantId, expected_status: str -): - tenant_status = pageserver_http_client.tenant_status(tenant) - log.info(f"tenant_status: {tenant_status}") - assert tenant_status["state"] == expected_status, tenant_status - - -def tenant_exists(ps_http: PageserverHttpClient, tenant_id: TenantId): - tenants = ps_http.tenant_list() - matching = [t for t in tenants if TenantId(t["id"]) == tenant_id] - assert len(matching) < 2 - if len(matching) == 0: - return None - return matching[0] - - -def remote_consistent_lsn( - pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId -) -> Lsn: - detail = pageserver_http_client.timeline_detail(tenant, timeline) - - if detail["remote_consistent_lsn"] is None: - # No remote information at all. This happens right after creating - # a timeline, before any part of it has been uploaded to remote - # storage yet. - return Lsn(0) - else: - lsn_str = detail["remote_consistent_lsn"] - assert isinstance(lsn_str, str) - return Lsn(lsn_str) - - -def wait_for_upload( - pageserver_http_client: PageserverHttpClient, - tenant: TenantId, - timeline: TimelineId, - lsn: Lsn, -): - """waits for local timeline upload up to specified lsn""" - for i in range(20): - current_lsn = remote_consistent_lsn(pageserver_http_client, tenant, timeline) - if current_lsn >= lsn: - log.info("wait finished") - return - log.info( - "waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format( - lsn, current_lsn, i + 1 - ) - ) - time.sleep(1) - raise Exception( - "timed out while waiting for remote_consistent_lsn to reach {}, was {}".format( - lsn, current_lsn - ) - ) - - -# Does not use `wait_until` for debugging purposes -def wait_until_tenant_state( - pageserver_http: PageserverHttpClient, - tenant_id: TenantId, - expected_state: str, - iterations: int, -) -> bool: - for _ in range(iterations): - try: - tenant = pageserver_http.tenant_status(tenant_id=tenant_id) - log.debug(f"Tenant {tenant_id} data: {tenant}") - if tenant["state"] == expected_state: - return True - except Exception as e: - log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") - - time.sleep(1) - - raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds") - - -def last_record_lsn( - pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId -) -> Lsn: - detail = pageserver_http_client.timeline_detail(tenant, timeline) - - lsn_str = detail["last_record_lsn"] - assert isinstance(lsn_str, str) - return Lsn(lsn_str) - - -def wait_for_last_record_lsn( - pageserver_http_client: PageserverHttpClient, - tenant: TenantId, - timeline: TimelineId, - lsn: Lsn, -) -> Lsn: - """waits for pageserver to catch up to a certain lsn, returns the last observed lsn.""" - for i in range(10): - current_lsn = last_record_lsn(pageserver_http_client, tenant, timeline) - if current_lsn >= lsn: - return current_lsn - log.info( - "waiting for last_record_lsn to reach {}, now {}, iteration {}".format( - lsn, current_lsn, i + 1 - ) - ) - time.sleep(1) - raise Exception( - "timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn) - ) - - def wait_for_last_flush_lsn( env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId ) -> Lsn: @@ -3592,23 +2914,3 @@ def wait_for_sk_commit_lsn_to_reach_remote_storage( ps_http.timeline_checkpoint(tenant_id, timeline_id) wait_for_upload(ps_http, tenant_id, timeline_id, lsn) return lsn - - -def wait_for_upload_queue_empty( - pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId -): - ps_http = pageserver.http_client() - while True: - all_metrics = ps_http.get_metrics() - tl = all_metrics.query_all( - "pageserver_remote_timeline_client_calls_unfinished", - { - "tenant_id": str(tenant_id), - "timeline_id": str(timeline_id), - }, - ) - assert len(tl) > 0 - log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}") - if all(m.value == 0 for m in tl): - return - time.sleep(0.2) diff --git a/test_runner/fixtures/pageserver/__init__.py b/test_runner/fixtures/pageserver/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py new file mode 100644 index 0000000000..1e1effe295 --- /dev/null +++ b/test_runner/fixtures/pageserver/http.py @@ -0,0 +1,545 @@ +from __future__ import annotations + +import time +from collections import defaultdict +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + +import requests + +from fixtures.log_helper import log +from fixtures.metrics import Metrics, parse_metrics +from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import Fn + + +class PageserverApiException(Exception): + def __init__(self, message, status_code: int): + super().__init__(message) + self.status_code = status_code + + +@dataclass +class InMemoryLayerInfo: + kind: str + lsn_start: str + lsn_end: Optional[str] + + @classmethod + def from_json(cls, d: Dict[str, Any]) -> InMemoryLayerInfo: + return InMemoryLayerInfo( + kind=d["kind"], + lsn_start=d["lsn_start"], + lsn_end=d.get("lsn_end"), + ) + + +@dataclass(frozen=True) +class HistoricLayerInfo: + kind: str + layer_file_name: str + layer_file_size: Optional[int] + lsn_start: str + lsn_end: Optional[str] + remote: bool + + @classmethod + def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo: + return HistoricLayerInfo( + kind=d["kind"], + layer_file_name=d["layer_file_name"], + layer_file_size=d.get("layer_file_size"), + lsn_start=d["lsn_start"], + lsn_end=d.get("lsn_end"), + remote=d["remote"], + ) + + +@dataclass +class LayerMapInfo: + in_memory_layers: List[InMemoryLayerInfo] + historic_layers: List[HistoricLayerInfo] + + @classmethod + def from_json(cls, d: Dict[str, Any]) -> LayerMapInfo: + info = LayerMapInfo(in_memory_layers=[], historic_layers=[]) + + json_in_memory_layers = d["in_memory_layers"] + assert isinstance(json_in_memory_layers, List) + for json_in_memory_layer in json_in_memory_layers: + info.in_memory_layers.append(InMemoryLayerInfo.from_json(json_in_memory_layer)) + + json_historic_layers = d["historic_layers"] + assert isinstance(json_historic_layers, List) + for json_historic_layer in json_historic_layers: + info.historic_layers.append(HistoricLayerInfo.from_json(json_historic_layer)) + + return info + + def kind_count(self) -> Dict[str, int]: + counts: Dict[str, int] = defaultdict(int) + for inmem_layer in self.in_memory_layers: + counts[inmem_layer.kind] += 1 + for hist_layer in self.historic_layers: + counts[hist_layer.kind] += 1 + return counts + + +@dataclass +class TenantConfig: + tenant_specific_overrides: Dict[str, Any] + effective_config: Dict[str, Any] + + @classmethod + def from_json(cls, d: Dict[str, Any]) -> TenantConfig: + return TenantConfig( + tenant_specific_overrides=d["tenant_specific_overrides"], + effective_config=d["effective_config"], + ) + + +class PageserverHttpClient(requests.Session): + def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None): + super().__init__() + self.port = port + self.auth_token = auth_token + self.is_testing_enabled_or_skip = is_testing_enabled_or_skip + + if auth_token is not None: + self.headers["Authorization"] = f"Bearer {auth_token}" + + def verbose_error(self, res: requests.Response): + try: + res.raise_for_status() + except requests.RequestException as e: + try: + msg = res.json()["msg"] + except: # noqa: E722 + msg = "" + raise PageserverApiException(msg, res.status_code) from e + + def check_status(self): + self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + + def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]): + self.is_testing_enabled_or_skip() + + if isinstance(config_strings, tuple): + pairs = [config_strings] + else: + pairs = config_strings + + log.info(f"Requesting config failpoints: {repr(pairs)}") + + res = self.put( + f"http://localhost:{self.port}/v1/failpoints", + json=[{"name": name, "actions": actions} for name, actions in pairs], + ) + log.info(f"Got failpoints request response code {res.status_code}") + self.verbose_error(res) + res_json = res.json() + assert res_json is None + return res_json + + def tenant_list(self) -> List[Dict[Any, Any]]: + res = self.get(f"http://localhost:{self.port}/v1/tenant") + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, list) + return res_json + + def tenant_create(self, new_tenant_id: Optional[TenantId] = None) -> TenantId: + res = self.post( + f"http://localhost:{self.port}/v1/tenant", + json={ + "new_tenant_id": str(new_tenant_id) if new_tenant_id else None, + }, + ) + self.verbose_error(res) + if res.status_code == 409: + raise Exception(f"could not create tenant: already exists for id {new_tenant_id}") + new_tenant_id = res.json() + assert isinstance(new_tenant_id, str) + return TenantId(new_tenant_id) + + def tenant_attach(self, tenant_id: TenantId): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach") + self.verbose_error(res) + + def tenant_detach(self, tenant_id: TenantId, detach_ignored=False): + params = {} + if detach_ignored: + params["detach_ignored"] = "true" + + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params) + self.verbose_error(res) + + def tenant_load(self, tenant_id: TenantId): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load") + self.verbose_error(res) + + def tenant_ignore(self, tenant_id: TenantId): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore") + self.verbose_error(res) + + def tenant_status(self, tenant_id: TenantId) -> Dict[Any, Any]: + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + + def tenant_config(self, tenant_id: TenantId) -> TenantConfig: + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/config") + self.verbose_error(res) + return TenantConfig.from_json(res.json()) + + def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]): + assert "tenant_id" not in config.keys() + res = self.put( + f"http://localhost:{self.port}/v1/tenant/config", + json={**config, "tenant_id": str(tenant_id)}, + ) + self.verbose_error(res) + + def patch_tenant_config_client_side( + self, + tenant_id: TenantId, + inserts: Optional[Dict[str, Any]] = None, + removes: Optional[List[str]] = None, + ): + current = self.tenant_config(tenant_id).tenant_specific_overrides + if inserts is not None: + current.update(inserts) + if removes is not None: + for key in removes: + del current[key] + self.set_tenant_config(tenant_id, current) + + def tenant_size(self, tenant_id: TenantId) -> int: + return self.tenant_size_and_modelinputs(tenant_id)[0] + + def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]: + """ + Returns the tenant size, together with the model inputs as the second tuple item. + """ + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size") + self.verbose_error(res) + res = res.json() + assert isinstance(res, dict) + assert TenantId(res["id"]) == tenant_id + size = res["size"] + assert type(size) == int + inputs = res["inputs"] + assert type(inputs) is dict + return (size, inputs) + + def tenant_size_debug(self, tenant_id: TenantId) -> str: + """ + Returns the tenant size debug info, as an HTML string + """ + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size", + headers={"Accept": "text/html"}, + ) + return res.text + + def timeline_list( + self, + tenant_id: TenantId, + include_non_incremental_logical_size: bool = False, + include_timeline_dir_layer_file_size_sum: bool = False, + ) -> List[Dict[str, Any]]: + params = {} + if include_non_incremental_logical_size: + params["include-non-incremental-logical-size"] = "true" + if include_timeline_dir_layer_file_size_sum: + params["include-timeline-dir-layer-file-size-sum"] = "true" + + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", params=params + ) + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, list) + return res_json + + def timeline_create( + self, + tenant_id: TenantId, + new_timeline_id: Optional[TimelineId] = None, + ancestor_timeline_id: Optional[TimelineId] = None, + ancestor_start_lsn: Optional[Lsn] = None, + ) -> Dict[Any, Any]: + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", + json={ + "new_timeline_id": str(new_timeline_id) if new_timeline_id else None, + "ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None, + "ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None, + }, + ) + self.verbose_error(res) + if res.status_code == 409: + raise Exception(f"could not create timeline: already exists for id {new_timeline_id}") + + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + + def timeline_detail( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + include_non_incremental_logical_size: bool = False, + include_timeline_dir_layer_file_size_sum: bool = False, + **kwargs, + ) -> Dict[Any, Any]: + params = {} + if include_non_incremental_logical_size: + params["include-non-incremental-logical-size"] = "true" + if include_timeline_dir_layer_file_size_sum: + params["include-timeline-dir-layer-file-size-sum"] = "true" + + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", + params=params, + **kwargs, + ) + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + + def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId): + res = self.delete( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}" + ) + self.verbose_error(res) + res_json = res.json() + assert res_json is None + + def timeline_gc( + self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int] + ) -> dict[str, Any]: + self.is_testing_enabled_or_skip() + + log.info( + f"Requesting GC: tenant {tenant_id}, timeline {timeline_id}, gc_horizon {repr(gc_horizon)}" + ) + res = self.put( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc", + json={"gc_horizon": gc_horizon}, + ) + log.info(f"Got GC request response code: {res.status_code}") + self.verbose_error(res) + res_json = res.json() + assert res_json is not None + assert isinstance(res_json, dict) + return res_json + + def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId): + self.is_testing_enabled_or_skip() + + log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}") + res = self.put( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact" + ) + log.info(f"Got compact request response code: {res.status_code}") + self.verbose_error(res) + res_json = res.json() + assert res_json is None + + def timeline_get_lsn_by_timestamp( + self, tenant_id: TenantId, timeline_id: TimelineId, timestamp + ): + log.info( + f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}" + ) + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}", + ) + self.verbose_error(res) + res_json = res.json() + return res_json + + def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): + self.is_testing_enabled_or_skip() + + log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") + res = self.put( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint" + ) + log.info(f"Got checkpoint request response code: {res.status_code}") + self.verbose_error(res) + res_json = res.json() + assert res_json is None + + def timeline_spawn_download_remote_layers( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + max_concurrent_downloads: int, + ) -> dict[str, Any]: + body = { + "max_concurrent_downloads": max_concurrent_downloads, + } + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers", + json=body, + ) + self.verbose_error(res) + res_json = res.json() + assert res_json is not None + assert isinstance(res_json, dict) + return res_json + + def timeline_poll_download_remote_layers_status( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + spawn_response: dict[str, Any], + poll_state=None, + ) -> None | dict[str, Any]: + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers", + ) + self.verbose_error(res) + res_json = res.json() + assert res_json is not None + assert isinstance(res_json, dict) + + # assumption in this API client here is that nobody else spawns the task + assert res_json["task_id"] == spawn_response["task_id"] + + if poll_state is None or res_json["state"] == poll_state: + return res_json + return None + + def timeline_download_remote_layers( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + max_concurrent_downloads: int, + errors_ok=False, + at_least_one_download=True, + ): + res = self.timeline_spawn_download_remote_layers( + tenant_id, timeline_id, max_concurrent_downloads + ) + while True: + completed = self.timeline_poll_download_remote_layers_status( + tenant_id, timeline_id, res, poll_state="Completed" + ) + if not completed: + time.sleep(0.1) + continue + if not errors_ok: + assert completed["failed_download_count"] == 0 + if at_least_one_download: + assert completed["successful_download_count"] > 0 + return completed + + def get_metrics_str(self) -> str: + """You probably want to use get_metrics() instead.""" + res = self.get(f"http://localhost:{self.port}/metrics") + self.verbose_error(res) + return res.text + + def get_metrics(self) -> Metrics: + res = self.get_metrics_str() + return parse_metrics(res) + + def get_timeline_metric( + self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str + ) -> float: + metrics = self.get_metrics() + return metrics.query_one( + metric_name, + filter={ + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + }, + ).value + + def get_remote_timeline_client_metric( + self, + metric_name: str, + tenant_id: TenantId, + timeline_id: TimelineId, + file_kind: str, + op_kind: str, + ) -> Optional[float]: + metrics = self.get_metrics() + matches = metrics.query_all( + name=metric_name, + filter={ + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "file_kind": str(file_kind), + "op_kind": str(op_kind), + }, + ) + if len(matches) == 0: + value = None + elif len(matches) == 1: + value = matches[0].value + assert value is not None + else: + assert len(matches) < 2, "above filter should uniquely identify metric" + return value + + def get_metric_value( + self, name: str, filter: Optional[Dict[str, str]] = None + ) -> Optional[float]: + metrics = self.get_metrics() + results = metrics.query_all(name, filter=filter) + if not results: + log.info(f'could not find metric "{name}"') + return None + assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}" + return results[0].value + + def layer_map_info( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> LayerMapInfo: + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/", + ) + self.verbose_error(res) + return LayerMapInfo.from_json(res.json()) + + def download_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str): + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}", + ) + self.verbose_error(res) + + assert res.status_code == 200 + + def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str): + res = self.delete( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}", + ) + self.verbose_error(res) + + assert res.status_code == 200 + + def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId): + info = self.layer_map_info(tenant_id, timeline_id) + for layer in info.historic_layers: + self.evict_layer(tenant_id, timeline_id, layer.layer_file_name) + + def disk_usage_eviction_run(self, request: dict[str, Any]): + res = self.put( + f"http://localhost:{self.port}/v1/disk_usage_eviction/run", + json=request, + ) + self.verbose_error(res) + return res.json() + + def tenant_break(self, tenant_id: TenantId): + res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break") + self.verbose_error(res) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py new file mode 100644 index 0000000000..65eda5b636 --- /dev/null +++ b/test_runner/fixtures/pageserver/utils.py @@ -0,0 +1,145 @@ +import time + +from fixtures.log_helper import log +from fixtures.pageserver.http import PageserverHttpClient +from fixtures.types import Lsn, TenantId, TimelineId + + +def assert_tenant_status( + pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str +): + tenant_status = pageserver_http.tenant_status(tenant) + log.info(f"tenant_status: {tenant_status}") + assert tenant_status["state"] == expected_status, tenant_status + + +def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId): + tenants = pageserver_http.tenant_list() + matching = [t for t in tenants if TenantId(t["id"]) == tenant_id] + assert len(matching) < 2 + if len(matching) == 0: + return None + return matching[0] + + +def remote_consistent_lsn( + pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId +) -> Lsn: + detail = pageserver_http.timeline_detail(tenant, timeline) + + if detail["remote_consistent_lsn"] is None: + # No remote information at all. This happens right after creating + # a timeline, before any part of it has been uploaded to remote + # storage yet. + return Lsn(0) + else: + lsn_str = detail["remote_consistent_lsn"] + assert isinstance(lsn_str, str) + return Lsn(lsn_str) + + +def wait_for_upload( + pageserver_http: PageserverHttpClient, + tenant: TenantId, + timeline: TimelineId, + lsn: Lsn, +): + """waits for local timeline upload up to specified lsn""" + for i in range(20): + current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline) + if current_lsn >= lsn: + log.info("wait finished") + return + log.info( + "waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format( + lsn, current_lsn, i + 1 + ) + ) + time.sleep(1) + raise Exception( + "timed out while waiting for remote_consistent_lsn to reach {}, was {}".format( + lsn, current_lsn + ) + ) + + +def wait_until_tenant_state( + pageserver_http: PageserverHttpClient, + tenant_id: TenantId, + expected_state: str, + iterations: int, +) -> bool: + """ + Does not use `wait_until` for debugging purposes + """ + for _ in range(iterations): + try: + tenant = pageserver_http.tenant_status(tenant_id=tenant_id) + log.debug(f"Tenant {tenant_id} data: {tenant}") + if tenant["state"] == expected_state: + return True + except Exception as e: + log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") + + time.sleep(1) + + raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds") + + +def wait_until_tenant_active( + pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30 +): + wait_until_tenant_state( + pageserver_http, tenant_id, expected_state="Active", iterations=iterations + ) + + +def last_record_lsn( + pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId +) -> Lsn: + detail = pageserver_http_client.timeline_detail(tenant, timeline) + + lsn_str = detail["last_record_lsn"] + assert isinstance(lsn_str, str) + return Lsn(lsn_str) + + +def wait_for_last_record_lsn( + pageserver_http: PageserverHttpClient, + tenant: TenantId, + timeline: TimelineId, + lsn: Lsn, +) -> Lsn: + """waits for pageserver to catch up to a certain lsn, returns the last observed lsn.""" + for i in range(10): + current_lsn = last_record_lsn(pageserver_http, tenant, timeline) + if current_lsn >= lsn: + return current_lsn + log.info( + "waiting for last_record_lsn to reach {}, now {}, iteration {}".format( + lsn, current_lsn, i + 1 + ) + ) + time.sleep(1) + raise Exception( + "timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn) + ) + + +def wait_for_upload_queue_empty( + pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId +): + while True: + all_metrics = pageserver_http.get_metrics() + tl = all_metrics.query_all( + "pageserver_remote_timeline_client_calls_unfinished", + { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + }, + ) + assert len(tl) > 0 + log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}") + if all(m.value == 0 for m in tl): + return + time.sleep(0.2) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index b58539ca86..71df74dfba 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -278,3 +278,19 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn): continue return res raise Exception("timed out while waiting for %s" % func) from last_exception + + +def wait_while(number_of_iterations: int, interval: float, func): + """ + Wait until 'func' returns false, or throws an exception. + """ + for i in range(number_of_iterations): + try: + if not func(): + return + log.info("waiting for %s iteration %s failed", func, i + 1) + time.sleep(interval) + continue + except Exception: + return + raise Exception("timed out while waiting for %s" % func) diff --git a/test_runner/performance/test_branch_creation.py b/test_runner/performance/test_branch_creation.py index 4b109c150f..16c5438b8f 100644 --- a/test_runner/performance/test_branch_creation.py +++ b/test_runner/performance/test_branch_creation.py @@ -10,7 +10,7 @@ import pytest from fixtures.benchmark_fixture import MetricReport from fixtures.compare_fixtures import NeonCompare from fixtures.log_helper import log -from fixtures.neon_fixtures import wait_for_last_record_lsn +from fixtures.pageserver.utils import wait_for_last_record_lsn from fixtures.types import Lsn diff --git a/test_runner/regress/test_auth.py b/test_runner/regress/test_auth.py index f3d153d934..f7c4736e04 100644 --- a/test_runner/regress/test_auth.py +++ b/test_runner/regress/test_auth.py @@ -1,7 +1,8 @@ from contextlib import closing import pytest -from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException, PgProtocol +from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol +from fixtures.pageserver.http import PageserverApiException from fixtures.types import TenantId diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index be6e1a69b2..0cc111bd8c 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -10,12 +10,11 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonCli, NeonEnvBuilder, - PageserverHttpClient, PgBin, PortDistributor, - wait_for_last_record_lsn, - wait_for_upload, ) +from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.types import Lsn from pytest import FixtureRequest diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index 6ed09734fe..413d6c9d5a 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -11,14 +11,14 @@ from fixtures.neon_fixtures import ( LocalFsStorage, NeonEnv, NeonEnvBuilder, - PageserverHttpClient, PgBin, RemoteStorageKind, wait_for_last_flush_lsn, - wait_for_upload_queue_empty, - wait_until, ) +from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import wait_for_upload_queue_empty from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import wait_until GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy" @@ -138,7 +138,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev # remove the initial tenant ## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865 assert env.initial_timeline - wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, env.initial_timeline) + wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline) pageserver_http.tenant_detach(env.initial_tenant) assert isinstance(env.remote_storage, LocalFsStorage) tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant) @@ -182,7 +182,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev # after stopping the safekeepers, we know that no new WAL will be coming in for tenant_id, timeline_id in timelines: pageserver_http.timeline_checkpoint(tenant_id, timeline_id) - wait_for_upload_queue_empty(env.pageserver, tenant_id, timeline_id) + wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id) tl_info = pageserver_http.timeline_detail(tenant_id, timeline_id) assert tl_info["last_record_lsn"] == tl_info["disk_consistent_lsn"] assert tl_info["disk_consistent_lsn"] == tl_info["remote_consistent_lsn"] diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 1dc10fbf4f..774ed98563 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -13,9 +13,8 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, Postgres, - wait_for_last_record_lsn, - wait_for_upload, ) +from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import subprocess_capture diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index 80e7ae8d7e..2d07d02ce7 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -6,10 +6,9 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, RemoteStorageKind, wait_for_last_flush_lsn, - wait_for_last_record_lsn, wait_for_sk_commit_lsn_to_reach_remote_storage, - wait_for_upload, ) +from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import query_scalar diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index d146f78c3a..cd481e69eb 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -5,8 +5,8 @@ from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, NeonEnv, NeonEnvBuilder, - PageserverHttpClient, ) +from fixtures.pageserver.http import PageserverHttpClient from fixtures.types import TenantId, TimelineId diff --git a/test_runner/regress/test_normal_work.py b/test_runner/regress/test_normal_work.py index 73933021a4..aa37a2411c 100644 --- a/test_runner/regress/test_normal_work.py +++ b/test_runner/regress/test_normal_work.py @@ -1,6 +1,7 @@ import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PageserverHttpClient +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder +from fixtures.pageserver.http import PageserverHttpClient def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient): diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index fd13651427..90ab8e68d8 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -10,20 +10,20 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, - PageserverApiException, - PageserverHttpClient, RemoteStorageKind, - assert_tenant_status, available_remote_storages, wait_for_last_flush_lsn, - wait_for_last_record_lsn, wait_for_sk_commit_lsn_to_reach_remote_storage, +) +from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient +from fixtures.pageserver.utils import ( + assert_tenant_status, + wait_for_last_record_lsn, wait_for_upload, - wait_until, wait_until_tenant_state, ) from fixtures.types import Lsn -from fixtures.utils import query_scalar +from fixtures.utils import query_scalar, wait_until def get_num_downloaded_layers(client: PageserverHttpClient, tenant_id, timeline_id): diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index eb22ac5f99..5b05989ae4 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -6,8 +6,8 @@ from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, NeonEnv, NeonEnvBuilder, - PageserverHttpClient, ) +from fixtures.pageserver.http import PageserverHttpClient from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import wait_until diff --git a/test_runner/regress/test_read_trace.py b/test_runner/regress/test_read_trace.py index 1b00b272c2..be0eb76ccd 100644 --- a/test_runner/regress/test_read_trace.py +++ b/test_runner/regress/test_read_trace.py @@ -1,6 +1,7 @@ from contextlib import closing -from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_record_lsn +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.pageserver.utils import wait_for_last_record_lsn from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import query_scalar diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 7487757071..69d6e427ce 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -1,6 +1,7 @@ import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, wait_for_last_record_lsn +from fixtures.neon_fixtures import NeonEnv +from fixtures.pageserver.utils import wait_for_last_record_lsn from fixtures.types import Lsn from fixtures.utils import query_scalar diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index f6600e8974..222305f006 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -13,13 +13,15 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( LocalFsStorage, NeonEnvBuilder, - PageserverApiException, - PageserverHttpClient, RemoteStorageKind, available_remote_storages, wait_for_last_flush_lsn, +) +from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient +from fixtures.pageserver.utils import ( wait_for_last_record_lsn, wait_for_upload, + wait_until_tenant_active, wait_until_tenant_state, ) from fixtures.types import Lsn, TenantId, TimelineId @@ -172,15 +174,10 @@ def test_remote_storage_backup_and_restore( client.tenant_attach(tenant_id) log.info("waiting for tenant to become active. this should be quick with on-demand download") - def tenant_active(): - all_states = client.tenant_list() - [tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id] - assert tenant["state"] == "Active" - - wait_until( - number_of_iterations=5, - interval=1, - func=tenant_active, + wait_until_tenant_active( + pageserver_http=client, + tenant_id=tenant_id, + iterations=5, ) detail = client.timeline_detail(tenant_id, timeline_id) @@ -357,12 +354,7 @@ def test_remote_storage_upload_queue_retries( client.tenant_attach(tenant_id) - def tenant_active(): - all_states = client.tenant_list() - [tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id] - assert tenant["state"] == "Active" - - wait_until(30, 1, tenant_active) + wait_until_tenant_active(client, tenant_id) log.info("restarting postgres to validate") pg = env.postgres.create_start("main", tenant_id=tenant_id) @@ -497,12 +489,7 @@ def test_remote_timeline_client_calls_started_metric( client.tenant_attach(tenant_id) - def tenant_active(): - all_states = client.tenant_list() - [tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id] - assert tenant["state"] == "Active" - - wait_until(30, 1, tenant_active) + wait_until_tenant_active(client, tenant_id) log.info("restarting postgres to validate") pg = env.postgres.create_start("main", tenant_id=tenant_id) diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index c5f9a3d157..67aba227e5 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -6,9 +6,8 @@ from fixtures.neon_fixtures import ( LocalFsStorage, NeonEnvBuilder, RemoteStorageKind, - assert_tenant_status, - wait_for_upload, ) +from fixtures.pageserver.utils import assert_tenant_status, wait_for_upload from fixtures.types import Lsn from fixtures.utils import wait_until diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 5db79eef4a..58a010951e 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -9,18 +9,18 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, - PageserverApiException, - PageserverHttpClient, Postgres, RemoteStorageKind, available_remote_storages, +) +from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient +from fixtures.pageserver.utils import ( wait_for_last_record_lsn, wait_for_upload, - wait_until, wait_until_tenant_state, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import query_scalar +from fixtures.utils import query_scalar, wait_until def do_gc_target( diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index aaf33c0d59..8ad4bd1c11 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -10,18 +10,24 @@ from fixtures.neon_fixtures import ( NeonBroker, NeonEnv, NeonEnvBuilder, - PageserverHttpClient, PortDistributor, Postgres, +) +from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import ( assert_tenant_status, tenant_exists, wait_for_last_record_lsn, wait_for_upload, +) +from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import ( + query_scalar, + start_in_background, + subprocess_capture, wait_until, wait_while, ) -from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import query_scalar, start_in_background, subprocess_capture def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float): diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index a4b5f7739a..9037fe0045 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -6,11 +6,11 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, - PageserverHttpClient, Postgres, wait_for_last_flush_lsn, wait_for_wal_insert_lsn, ) +from fixtures.pageserver.http import PageserverHttpClient from fixtures.types import Lsn, TenantId, TimelineId diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index c786f8a8e1..ec1c12a0d8 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -20,10 +20,12 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, Postgres, RemoteStorageKind, - assert_tenant_status, available_remote_storages, - wait_for_last_record_lsn, wait_for_sk_commit_lsn_to_reach_remote_storage, +) +from fixtures.pageserver.utils import ( + assert_tenant_status, + wait_for_last_record_lsn, wait_for_upload, ) from fixtures.types import Lsn, TenantId, TimelineId diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 93fafff934..cf607f4f7b 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -1,5 +1,6 @@ import pytest -from fixtures.neon_fixtures import NeonEnv, PageserverApiException +from fixtures.neon_fixtures import NeonEnv +from fixtures.pageserver.http import PageserverApiException from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index c4e8e7aa07..7c77e1fe59 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -14,20 +14,21 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, - PageserverApiException, - PageserverHttpClient, PgBin, PortDistributor, Postgres, RemoteStorageKind, VanillaPostgres, - assert_tenant_status, wait_for_last_flush_lsn, +) +from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient +from fixtures.pageserver.utils import ( + assert_tenant_status, wait_for_upload_queue_empty, - wait_until, + wait_until_tenant_active, ) from fixtures.types import TenantId, TimelineId -from fixtures.utils import get_timeline_dir_size +from fixtures.utils import get_timeline_dir_size, wait_until def test_timeline_size(neon_simple_env: NeonEnv): @@ -246,12 +247,7 @@ def test_timeline_initial_logical_size_calculation_cancellation( extra_env_vars={"FAILPOINTS": "timeline-calculate-logical-size-pause=pause"} ) - def tenant_active(): - all_states = client.tenant_list() - [tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id] - assert tenant["state"] == "Active" - - wait_until(30, 1, tenant_active) + wait_until_tenant_active(client, tenant_id) # kick off initial size calculation task (the response we get here is the estimated size) def assert_size_calculation_not_done(): @@ -425,7 +421,7 @@ def test_timeline_physical_size_post_compaction( pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id) if remote_storage_kind is not None: - wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, new_timeline_id) + wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id) assert_physical_size_invariants( get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind), @@ -478,7 +474,7 @@ def test_timeline_physical_size_post_gc( pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None) if remote_storage_kind is not None: - wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, new_timeline_id) + wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id) assert_physical_size_invariants( get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind), @@ -584,7 +580,7 @@ def test_tenant_physical_size( tenant, timeline = env.neon_cli.create_tenant() if remote_storage_kind is not None: - wait_for_upload_queue_empty(env.pageserver, tenant, timeline) + wait_for_upload_queue_empty(pageserver_http, tenant, timeline) def get_timeline_resident_physical_size(timeline: TimelineId): sizes = get_physical_size_values(env, tenant, timeline, remote_storage_kind) @@ -609,7 +605,7 @@ def test_tenant_physical_size( pageserver_http.timeline_checkpoint(tenant, timeline) if remote_storage_kind is not None: - wait_for_upload_queue_empty(env.pageserver, tenant, timeline) + wait_for_upload_queue_empty(pageserver_http, tenant, timeline) timeline_total_resident_physical_size += get_timeline_resident_physical_size(timeline) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 407085a01a..306c492e8f 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -30,9 +30,8 @@ from fixtures.neon_fixtures import ( SafekeeperHttpClient, SafekeeperPort, available_remote_storages, - wait_for_last_record_lsn, - wait_for_upload, ) +from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import get_dir_size, query_scalar, start_in_background diff --git a/test_runner/regress/test_walredo_not_left_behind_on_detach.py b/test_runner/regress/test_walredo_not_left_behind_on_detach.py index 395d54b8c3..d6302f8632 100644 --- a/test_runner/regress/test_walredo_not_left_behind_on_detach.py +++ b/test_runner/regress/test_walredo_not_left_behind_on_detach.py @@ -3,7 +3,8 @@ import time import psutil import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.pageserver.http import PageserverApiException from fixtures.types import TenantId