diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 724ba055c3..4256e8194e 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -4,7 +4,7 @@ import json import time from collections import defaultdict from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Union import requests from requests.adapters import HTTPAdapter @@ -211,7 +211,7 @@ class PageserverHttpClient(requests.Session): def tenant_create( self, - new_tenant_id: TenantId, + new_tenant_id: Union[TenantId, TenantShardId], conf: Optional[Dict[str, Any]] = None, generation: Optional[int] = None, ) -> TenantId: @@ -239,7 +239,7 @@ class PageserverHttpClient(requests.Session): def tenant_attach( self, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], config: None | Dict[str, Any] = None, config_null: bool = False, generation: Optional[int] = None, @@ -269,7 +269,7 @@ class PageserverHttpClient(requests.Session): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params) self.verbose_error(res) - def tenant_reset(self, tenant_id: TenantId, drop_cache: bool): + def tenant_reset(self, tenant_id: Union[TenantId, TenantShardId], drop_cache: bool): params = {} if drop_cache: params["drop_cache"] = "true" @@ -278,7 +278,7 @@ class PageserverHttpClient(requests.Session): self.verbose_error(res) def tenant_location_conf( - self, tenant_id: TenantId, location_conf=dict[str, Any], flush_ms=None + self, tenant_id: Union[TenantId, TenantShardId], location_conf=dict[str, Any], flush_ms=None ): body = location_conf.copy() body["tenant_id"] = str(tenant_id) @@ -294,7 +294,7 @@ class PageserverHttpClient(requests.Session): ) self.verbose_error(res) - def tenant_delete(self, tenant_id: TenantId): + def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]): res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") self.verbose_error(res) return res @@ -310,27 +310,27 @@ class PageserverHttpClient(requests.Session): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore") self.verbose_error(res) - def tenant_status(self, tenant_id: TenantId) -> Dict[Any, Any]: + def tenant_status(self, tenant_id: Union[TenantId, TenantShardId]) -> Dict[Any, Any]: res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") self.verbose_error(res) res_json = res.json() assert isinstance(res_json, dict) return res_json - def tenant_config(self, tenant_id: TenantId) -> TenantConfig: + def tenant_config(self, tenant_id: Union[TenantId, TenantShardId]) -> TenantConfig: res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/config") self.verbose_error(res) return TenantConfig.from_json(res.json()) - def tenant_heatmap_upload(self, tenant_id: TenantId): + def tenant_heatmap_upload(self, tenant_id: Union[TenantId, TenantShardId]): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload") self.verbose_error(res) - def tenant_secondary_download(self, tenant_id: TenantId): + def tenant_secondary_download(self, tenant_id: Union[TenantId, TenantShardId]): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download") self.verbose_error(res) - def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]): + def set_tenant_config(self, tenant_id: Union[TenantId, TenantShardId], config: dict[str, Any]): assert "tenant_id" not in config.keys() res = self.put( f"http://localhost:{self.port}/v1/tenant/config", @@ -352,10 +352,12 @@ class PageserverHttpClient(requests.Session): del current[key] self.set_tenant_config(tenant_id, current) - def tenant_size(self, tenant_id: TenantId) -> int: + def tenant_size(self, tenant_id: Union[TenantId, TenantShardId]) -> int: return self.tenant_size_and_modelinputs(tenant_id)[0] - def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]: + def tenant_size_and_modelinputs( + self, tenant_id: Union[TenantId, TenantShardId] + ) -> Tuple[int, Dict[str, Any]]: """ Returns the tenant size, together with the model inputs as the second tuple item. """ @@ -370,7 +372,7 @@ class PageserverHttpClient(requests.Session): assert isinstance(inputs, dict) return (size, inputs) - def tenant_size_debug(self, tenant_id: TenantId) -> str: + def tenant_size_debug(self, tenant_id: Union[TenantId, TenantShardId]) -> str: """ Returns the tenant size debug info, as an HTML string """ @@ -382,7 +384,7 @@ class PageserverHttpClient(requests.Session): def timeline_list( self, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], include_non_incremental_logical_size: bool = False, include_timeline_dir_layer_file_size_sum: bool = False, ) -> List[Dict[str, Any]]: @@ -403,7 +405,7 @@ class PageserverHttpClient(requests.Session): def timeline_create( self, pg_version: PgVersion, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], new_timeline_id: TimelineId, ancestor_timeline_id: Optional[TimelineId] = None, ancestor_start_lsn: Optional[Lsn] = None, @@ -437,7 +439,7 @@ class PageserverHttpClient(requests.Session): def timeline_detail( self, - tenant_id: TenantShardId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, include_non_incremental_logical_size: bool = False, include_timeline_dir_layer_file_size_sum: bool = False, @@ -459,7 +461,9 @@ class PageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def timeline_delete(self, tenant_id: TenantShardId, timeline_id: TimelineId, **kwargs): + def timeline_delete( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, **kwargs + ): """ Note that deletion is not instant, it is scheduled and performed mostly in the background. So if you need to wait for it to complete use `timeline_delete_wait_completed`. @@ -473,7 +477,10 @@ class PageserverHttpClient(requests.Session): assert res_json is None def timeline_gc( - self, tenant_id: TenantShardId, timeline_id: TimelineId, gc_horizon: Optional[int] + self, + tenant_id: Union[TenantId, TenantShardId], + timeline_id: TimelineId, + gc_horizon: Optional[int], ) -> dict[str, Any]: """ Unlike most handlers, this will wait for the layers to be actually @@ -496,7 +503,10 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_compact( - self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False + self, + tenant_id: Union[TenantId, TenantShardId], + timeline_id: TimelineId, + force_repartition=False, ): self.is_testing_enabled_or_skip() query = {} @@ -515,7 +525,7 @@ class PageserverHttpClient(requests.Session): def timeline_get_lsn_by_timestamp( self, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, timestamp, version: Optional[int] = None, @@ -534,7 +544,9 @@ class PageserverHttpClient(requests.Session): res_json = res.json() return res_json - def timeline_get_timestamp_of_lsn(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn): + def timeline_get_timestamp_of_lsn( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, lsn: Lsn + ): log.info(f"Requesting time range of lsn {lsn}, tenant {tenant_id}, timeline {timeline_id}") res = self.get( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_timestamp_of_lsn?lsn={lsn}", @@ -544,7 +556,10 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_checkpoint( - self, tenant_id: TenantShardId, timeline_id: TimelineId, force_repartition=False + self, + tenant_id: Union[TenantId, TenantShardId], + timeline_id: TimelineId, + force_repartition=False, ): self.is_testing_enabled_or_skip() query = {} @@ -563,7 +578,7 @@ class PageserverHttpClient(requests.Session): def timeline_spawn_download_remote_layers( self, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, max_concurrent_downloads: int, ) -> dict[str, Any]: @@ -582,7 +597,7 @@ class PageserverHttpClient(requests.Session): def timeline_poll_download_remote_layers_status( self, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, spawn_response: dict[str, Any], poll_state=None, @@ -604,7 +619,7 @@ class PageserverHttpClient(requests.Session): def timeline_download_remote_layers( self, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, max_concurrent_downloads: int, errors_ok=False, @@ -716,7 +731,7 @@ class PageserverHttpClient(requests.Session): def layer_map_info( self, - tenant_id: TenantId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, ) -> LayerMapInfo: res = self.get( @@ -725,7 +740,9 @@ class PageserverHttpClient(requests.Session): self.verbose_error(res) return LayerMapInfo.from_json(res.json()) - def download_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str): + def download_layer( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str + ): res = self.get( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}", ) @@ -733,14 +750,18 @@ class PageserverHttpClient(requests.Session): assert res.status_code == 200 - def download_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId): + def download_all_layers( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ): info = self.layer_map_info(tenant_id, timeline_id) for layer in info.historic_layers: if not layer.remote: continue self.download_layer(tenant_id, timeline_id, layer.layer_file_name) - def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str): + def evict_layer( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str + ): res = self.delete( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}", ) @@ -748,7 +769,7 @@ class PageserverHttpClient(requests.Session): assert res.status_code in (200, 304) - def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId): + def evict_all_layers(self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId): info = self.layer_map_info(tenant_id, timeline_id) for layer in info.historic_layers: self.evict_layer(tenant_id, timeline_id, layer.layer_file_name) @@ -761,7 +782,7 @@ class PageserverHttpClient(requests.Session): self.verbose_error(res) return res.json() - def tenant_break(self, tenant_id: TenantId): + def tenant_break(self, tenant_id: Union[TenantId, TenantShardId]): res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break") self.verbose_error(res) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 0dd771568a..d0bb566408 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -1,5 +1,5 @@ import time -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef @@ -22,7 +22,9 @@ def assert_tenant_state( def remote_consistent_lsn( - pageserver_http: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId + pageserver_http: PageserverHttpClient, + tenant: Union[TenantId, TenantShardId], + timeline: TimelineId, ) -> Lsn: detail = pageserver_http.timeline_detail(tenant, timeline) @@ -39,7 +41,7 @@ def remote_consistent_lsn( def wait_for_upload( pageserver_http: PageserverHttpClient, - tenant: TenantShardId, + tenant: Union[TenantId, TenantShardId], timeline: TimelineId, lsn: Lsn, ): @@ -92,7 +94,7 @@ def wait_until_tenant_state( def wait_until_timeline_state( pageserver_http: PageserverHttpClient, - tenant_id: TenantShardId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, expected_state: str, iterations: int, @@ -141,7 +143,9 @@ def wait_until_tenant_active( def last_record_lsn( - pageserver_http_client: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId + pageserver_http_client: PageserverHttpClient, + tenant: Union[TenantId, TenantShardId], + timeline: TimelineId, ) -> Lsn: detail = pageserver_http_client.timeline_detail(tenant, timeline) @@ -152,7 +156,7 @@ def last_record_lsn( def wait_for_last_record_lsn( pageserver_http: PageserverHttpClient, - tenant: TenantShardId, + tenant: Union[TenantId, TenantShardId], timeline: TimelineId, lsn: Lsn, ) -> Lsn: @@ -194,7 +198,7 @@ def wait_for_upload_queue_empty( def wait_timeline_detail_404( pageserver_http: PageserverHttpClient, - tenant_id: TenantShardId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, iterations: int, interval: Optional[float] = None, @@ -219,7 +223,7 @@ def wait_timeline_detail_404( def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, - tenant_id: TenantShardId, + tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, iterations: int = 20, interval: Optional[float] = None,