test: update fixtures for sharding/attachment service

This commit is contained in:
John Spray
2024-01-02 14:21:52 +00:00
parent 150c3c79a5
commit c570c78816
5 changed files with 231 additions and 56 deletions

View File

@@ -16,6 +16,7 @@ class Metrics:
def query_all(self, name: str, filter: Optional[Dict[str, str]] = None) -> List[Sample]:
filter = filter or {}
res = []
for sample in self.metrics[name]:
try:
if all(sample.labels[k] == v for k, v in filter.items()):

View File

@@ -60,7 +60,7 @@ from fixtures.remote_storage import (
default_remote_storage,
remote_storage_to_toml_inline_table,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
allure_add_grafana_links,
@@ -490,6 +490,8 @@ class NeonEnvBuilder:
self,
initial_tenant_conf: Optional[Dict[str, str]] = None,
default_remote_storage_if_missing: bool = True,
initial_tenant_shard_count: Optional[int] = None,
initial_tenant_shard_stripe_size: Optional[int] = None,
) -> NeonEnv:
"""
Default way to create and start NeonEnv. Also creates the initial_tenant with root initial_timeline.
@@ -507,7 +509,11 @@ class NeonEnvBuilder:
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(
tenant_id=env.initial_tenant, conf=initial_tenant_conf, timeline_id=env.initial_timeline
tenant_id=env.initial_tenant,
conf=initial_tenant_conf,
timeline_id=env.initial_timeline,
shard_count=initial_tenant_shard_count,
shard_stripe_size=initial_tenant_shard_stripe_size,
)
assert env.initial_tenant == initial_tenant
assert env.initial_timeline == initial_timeline
@@ -1130,15 +1136,29 @@ class AbstractNeonCli(abc.ABC):
env_vars[var] = val
# Intercept CalledProcessError and print more info
res = subprocess.run(
args,
env=env_vars,
check=False,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
try:
res = subprocess.run(
args,
env=env_vars,
check=False,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
except subprocess.TimeoutExpired as e:
if e.stderr:
stderr = e.stderr.decode(errors="replace")
else:
stderr = ""
if e.stdout:
stdout = e.stdout.decode(errors="replace")
else:
stdout = ""
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
raise
indent = " "
if not res.returncode:
@@ -1189,6 +1209,8 @@ class NeonCli(AbstractNeonCli):
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
conf: Optional[Dict[str, str]] = None,
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
set_default: bool = False,
) -> Tuple[TenantId, TimelineId]:
"""
@@ -1216,6 +1238,12 @@ class NeonCli(AbstractNeonCli):
if set_default:
args.append("--set-default")
if shard_count is not None:
args.extend(["--shard-count", str(shard_count)])
if shard_stripe_size is not None:
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
res = self.raw_cli(args)
res.check_returncode()
return tenant_id, timeline_id
@@ -1536,6 +1564,19 @@ class NeonCli(AbstractNeonCli):
return self.raw_cli(args, check_return_code=True)
def tenant_migrate(
self, tenant_shard_id: TenantShardId, new_pageserver: int, timeout_secs: Optional[int]
):
args = [
"tenant",
"migrate",
"--tenant-id",
str(tenant_shard_id),
"--id",
str(new_pageserver),
]
return self.raw_cli(args, check_return_code=True, timeout=timeout_secs)
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
return self.raw_cli(["start"], check_return_code=check_return_code)
@@ -1631,6 +1672,66 @@ class NeonAttachmentService:
else:
return None
def node_register(self, node: NeonPageserver):
body = {
"node_id": int(node.id),
"listen_http_addr": "localhost",
"listen_http_port": node.service_port.http,
}
log.info(f"node_register({body})")
requests.post(f"{self.env.control_plane_api}/node", json=body).raise_for_status()
def tenant_create(
self,
tenant_id: TenantId,
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
tenant_config: Optional[Dict[Any, Any]] = None,
):
body: Dict[str, Any] = {"new_tenant_id": str(tenant_id)}
if shard_count is not None:
shard_params = {"count": shard_count}
if shard_stripe_size is not None:
shard_params["stripe_size"] = shard_stripe_size
body["shard_parameters"] = shard_params
if tenant_config is not None:
for k, v in tenant_config.items():
body[k] = v
response = requests.post(f"{self.env.control_plane_api}/tenant", json=body)
response.raise_for_status()
log.info(f"tenant_create success: {response.json()}")
def tenant_timeline_create(self, tenant_id: TenantId, timeline_id: TimelineId):
body: Dict[str, Any] = {"new_timeline_id": str(timeline_id)}
response = requests.post(
f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body
)
response.raise_for_status()
log.info(f"tenant_timeline_create success: {response.json()}")
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
response = requests.get(f"{self.env.control_plane_api}/tenant/{tenant_id}/locate")
response.raise_for_status()
body = response.json()
shards: list[dict[str, Any]] = body["shards"]
return shards
def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]:
response = requests.put(
f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split",
json={"new_shard_count": shard_count},
)
response.raise_for_status()
body = response.json()
log.info(f"tenant_shard_split success: {body}")
shards: list[TenantShardId] = body["new_shards"]
return shards
def __enter__(self) -> "NeonAttachmentService":
return self
@@ -3201,7 +3302,7 @@ def pytest_addoption(parser: Parser):
SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
r"config|config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql)"
r"config|config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
)
@@ -3297,9 +3398,7 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(
test_output_dir: Path, env: NeonEnv, endpoint: Endpoint, pageserver_id: Optional[int] = None
):
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint):
# Get the timeline ID. We need it for the 'basebackup' command
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
@@ -3320,6 +3419,7 @@ def check_restored_datadir_content(
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
cmd = rf"""
{psql_path} \
--no-psqlrc \
@@ -3388,6 +3488,27 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -
time.sleep(0.5)
def tenant_get_shards(
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None
) -> list[tuple[TenantShardId, NeonPageserver]]:
"""
Helper for when you want to talk to one or more pageservers, and the
caller _might_ have specified a pageserver, or they might leave it to
us to figure out the shards for a tenant.
Caller should over the response to apply their per-pageserver action to
each shard
"""
if len(env.pageservers) > 1:
return [
(TenantShardId.parse(s["shard_id"]), env.get_pageserver(s["node_id"]))
for s in env.attachment_service.locate(tenant_id)
]
else:
# Assume an unsharded tenant
return [(TenantShardId(tenant_id, 0, 0), env.pageserver)]
def wait_for_last_flush_lsn(
env: NeonEnv,
endpoint: Endpoint,
@@ -3397,10 +3518,22 @@ def wait_for_last_flush_lsn(
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
shards = tenant_get_shards(env, tenant, pageserver_id)
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return wait_for_last_record_lsn(
env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn
)
results = []
for tenant_shard_id, pageserver in shards:
log.info(f"wait_for_last_flush_lsn: shard {tenant_shard_id}")
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
)
assert waited >= last_flush_lsn
results.append(waited)
# Return the lowest LSN that has been ingested by all shards
return min(results)
def wait_for_wal_insert_lsn(
@@ -3412,9 +3545,16 @@ def wait_for_wal_insert_lsn(
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(
env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn
)
result = None
for tenant_shard_id, pageserver in tenant_get_shards(env, tenant, pageserver_id):
shard_r = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
)
if result is None:
result = shard_r
assert result is not None
return result
def fork_at_current_lsn(
@@ -3448,11 +3588,13 @@ def last_flush_lsn_upload(
last_flush_lsn = wait_for_last_flush_lsn(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id
)
ps_http = env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
shards = tenant_get_shards(env, tenant_id, pageserver_id)
for tenant_shard_id, pageserver in shards:
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id)
wait_for_upload(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
return last_flush_lsn

View File

@@ -13,7 +13,7 @@ from urllib3.util.retry import Retry
from fixtures.log_helper import log
from fixtures.metrics import Metrics, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import Fn
@@ -433,7 +433,7 @@ class PageserverHttpClient(requests.Session):
def timeline_detail(
self,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
@@ -455,7 +455,7 @@ class PageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs):
def timeline_delete(self, tenant_id: TenantShardId, timeline_id: TimelineId, **kwargs):
"""
Note that deletion is not instant, it is scheduled and performed mostly in the background.
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
@@ -469,7 +469,7 @@ class PageserverHttpClient(requests.Session):
assert res_json is None
def timeline_gc(
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
self, tenant_id: TenantShardId, timeline_id: TimelineId, gc_horizon: Optional[int]
) -> dict[str, Any]:
"""
Unlike most handlers, this will wait for the layers to be actually
@@ -540,7 +540,7 @@ class PageserverHttpClient(requests.Session):
return res_json
def timeline_checkpoint(
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
self, tenant_id: TenantShardId, timeline_id: TimelineId, force_repartition=False
):
self.is_testing_enabled_or_skip()
query = {}
@@ -682,6 +682,34 @@ class PageserverHttpClient(requests.Session):
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
return results[0].value
def get_metrics_values(
self, names: list[str], filter: Optional[Dict[str, str]] = None
) -> Dict[str, float]:
"""
When fetching multiple named metrics, it is more efficient to use this
than to call `get_metric_value` repeatedly.
Throws RuntimeError if no metrics matching `names` are found, or if
not all of `names` are found: this method is intended for loading sets
of metrics whose existence is coupled.
"""
metrics = self.get_metrics()
samples = []
for name in names:
samples.extend(metrics.query_all(name, filter=filter))
result = {}
for sample in samples:
if sample.name in result:
raise RuntimeError(f"Multiple values found for {sample.name}")
result[sample.name] = sample.value
if len(result) != len(names):
log.info(f"Metrics found: {metrics.metrics}")
raise RuntimeError(f"could not find all metrics {' '.join(names)}")
return result
def layer_map_info(
self,
tenant_id: TenantId,

View File

@@ -6,7 +6,7 @@ from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorageKind, S3Storage
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until
@@ -22,7 +22,7 @@ def assert_tenant_state(
def remote_consistent_lsn(
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
pageserver_http: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http.timeline_detail(tenant, timeline)
@@ -39,7 +39,7 @@ def remote_consistent_lsn(
def wait_for_upload(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
tenant: TenantShardId,
timeline: TimelineId,
lsn: Lsn,
):
@@ -92,7 +92,7 @@ def wait_until_tenant_state(
def wait_until_timeline_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
expected_state: str,
iterations: int,
@@ -141,7 +141,7 @@ def wait_until_tenant_active(
def last_record_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
pageserver_http_client: PageserverHttpClient, tenant: TenantShardId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
@@ -152,7 +152,7 @@ def last_record_lsn(
def wait_for_last_record_lsn(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
tenant: TenantShardId,
timeline: TimelineId,
lsn: Lsn,
) -> Lsn:
@@ -194,7 +194,7 @@ def wait_for_upload_queue_empty(
def wait_timeline_detail_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
iterations: int,
interval: Optional[float] = None,
@@ -219,7 +219,7 @@ def wait_timeline_detail_404(
def timeline_delete_wait_completed(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
tenant_id: TenantShardId,
timeline_id: TimelineId,
iterations: int = 20,
interval: Optional[float] = None,

View File

@@ -5,6 +5,7 @@ from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
last_flush_lsn_upload,
tenant_get_shards,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
@@ -31,7 +32,7 @@ class Workload:
self._endpoint: Optional[Endpoint] = None
def endpoint(self, pageserver_id: int) -> Endpoint:
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
"main",
@@ -54,7 +55,7 @@ class Workload:
if self._endpoint is not None:
self._endpoint.stop()
def init(self, pageserver_id: int):
def init(self, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
@@ -63,7 +64,7 @@ class Workload:
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n, pageserver_id):
def write_rows(self, n, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)
start = self.expect_rows
end = start + n - 1
@@ -81,7 +82,7 @@ class Workload:
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def churn_rows(self, n, pageserver_id, upload=True):
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
assert self.expect_rows >= n
max_iters = 10
@@ -119,21 +120,24 @@ class Workload:
]
)
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = self.env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
for tenant_shard_id, pageserver in tenant_get_shards(
self.env, self.tenant_id, pageserver_id
):
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id)
wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
def validate(self, pageserver_id):
def validate(self, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)
result = endpoint.safe_psql_many(
[