mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
## Problem The code for tenant create and tenant attach was just a special case of what upsert_location does. ## Summary of changes - Use `upsert_location` for create and attach APIs - Clean up error handling in upsert_location so that it can generate appropriate HTTP response codes - Update tests that asserted the old non-idempotent behavior of attach - Rework the `test_ignore_while_attaching` test, and fix tenant shutdown during activation, which this test was supposed to cover, but it was actually just waiting for activation to complete.
344 lines
13 KiB
Python
344 lines
13 KiB
Python
import os
|
|
import time
|
|
from contextlib import closing
|
|
from datetime import datetime
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
import pytest
|
|
from fixtures.log_helper import log
|
|
from fixtures.metrics import (
|
|
PAGESERVER_GLOBAL_METRICS,
|
|
PAGESERVER_PER_TENANT_METRICS,
|
|
parse_metrics,
|
|
)
|
|
from fixtures.neon_fixtures import (
|
|
NeonEnv,
|
|
NeonEnvBuilder,
|
|
)
|
|
from fixtures.pageserver.utils import timeline_delete_wait_completed
|
|
from fixtures.remote_storage import RemoteStorageKind
|
|
from fixtures.types import Lsn, TenantId
|
|
from fixtures.utils import wait_until
|
|
from prometheus_client.samples import Sample
|
|
|
|
|
|
def test_tenant_creation_fails(neon_simple_env: NeonEnv):
|
|
tenants_dir = neon_simple_env.pageserver.tenant_dir()
|
|
initial_tenants = sorted(
|
|
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
|
|
)
|
|
[d for d in tenants_dir.iterdir()]
|
|
|
|
neon_simple_env.pageserver.allowed_errors.append(".*tenant-config-before-write.*")
|
|
|
|
pageserver_http = neon_simple_env.pageserver.http_client()
|
|
pageserver_http.configure_failpoints(("tenant-config-before-write", "return"))
|
|
with pytest.raises(Exception, match="tenant-config-before-write"):
|
|
_ = neon_simple_env.neon_cli.create_tenant()
|
|
|
|
new_tenants = sorted(
|
|
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
|
|
)
|
|
assert initial_tenants == new_tenants, "should not create new tenants"
|
|
|
|
# Any files left behind on disk during failed creation do not prevent
|
|
# a retry from succeeding.
|
|
pageserver_http.configure_failpoints(("tenant-config-before-write", "off"))
|
|
neon_simple_env.neon_cli.create_tenant()
|
|
|
|
|
|
def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
|
|
neon_env_builder.num_safekeepers = 3
|
|
|
|
env = neon_env_builder.init_start()
|
|
"""Tests tenants with and without wal acceptors"""
|
|
tenant_1, _ = env.neon_cli.create_tenant()
|
|
tenant_2, _ = env.neon_cli.create_tenant()
|
|
|
|
env.neon_cli.create_timeline("test_tenants_normal_work", tenant_id=tenant_1)
|
|
env.neon_cli.create_timeline("test_tenants_normal_work", tenant_id=tenant_2)
|
|
|
|
endpoint_tenant1 = env.endpoints.create_start(
|
|
"test_tenants_normal_work",
|
|
tenant_id=tenant_1,
|
|
)
|
|
endpoint_tenant2 = env.endpoints.create_start(
|
|
"test_tenants_normal_work",
|
|
tenant_id=tenant_2,
|
|
)
|
|
|
|
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
|
|
with closing(endpoint.connect()) as conn:
|
|
with conn.cursor() as cur:
|
|
# we rely upon autocommit after each statement
|
|
# as waiting for acceptors happens there
|
|
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
|
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
|
cur.execute("SELECT sum(key) FROM t")
|
|
assert cur.fetchone() == (5000050000,)
|
|
|
|
|
|
def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
|
neon_env_builder.num_safekeepers = 3
|
|
neon_env_builder.pageserver_config_override = "availability_zone='test_ps_az'"
|
|
|
|
env = neon_env_builder.init_start()
|
|
tenant_1, _ = env.neon_cli.create_tenant()
|
|
tenant_2, _ = env.neon_cli.create_tenant()
|
|
|
|
timeline_1 = env.neon_cli.create_timeline("test_metrics_normal_work", tenant_id=tenant_1)
|
|
timeline_2 = env.neon_cli.create_timeline("test_metrics_normal_work", tenant_id=tenant_2)
|
|
|
|
endpoint_tenant1 = env.endpoints.create_start("test_metrics_normal_work", tenant_id=tenant_1)
|
|
endpoint_tenant2 = env.endpoints.create_start("test_metrics_normal_work", tenant_id=tenant_2)
|
|
|
|
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
|
|
with closing(endpoint.connect()) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
|
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
|
cur.execute("SELECT sum(key) FROM t")
|
|
assert cur.fetchone() == (5000050000,)
|
|
|
|
collected_metrics = {
|
|
"pageserver": env.pageserver.http_client().get_metrics_str(),
|
|
}
|
|
for sk in env.safekeepers:
|
|
collected_metrics[f"safekeeper{sk.id}"] = sk.http_client().get_metrics_str()
|
|
|
|
for name in collected_metrics:
|
|
basepath = os.path.join(neon_env_builder.repo_dir, f"{name}.metrics")
|
|
|
|
with open(basepath, "w") as stdout_f:
|
|
print(collected_metrics[name], file=stdout_f, flush=True)
|
|
|
|
all_metrics = [parse_metrics(m, name) for name, m in collected_metrics.items()]
|
|
ps_metrics = all_metrics[0]
|
|
sk_metrics = all_metrics[1:]
|
|
|
|
# Find all metrics among all safekeepers, accepts the same arguments as query_all()
|
|
def query_all_safekeepers(name, filter):
|
|
return list(
|
|
chain.from_iterable(
|
|
map(
|
|
lambda sk: sk.query_all(name, filter),
|
|
sk_metrics,
|
|
)
|
|
)
|
|
)
|
|
|
|
ttids = [
|
|
{"tenant_id": str(tenant_1), "timeline_id": str(timeline_1)},
|
|
{"tenant_id": str(tenant_2), "timeline_id": str(timeline_2)},
|
|
]
|
|
|
|
# Test metrics per timeline
|
|
for tt in ttids:
|
|
log.info(f"Checking metrics for {tt}")
|
|
|
|
ps_lsn = Lsn(int(ps_metrics.query_one("pageserver_last_record_lsn", filter=tt).value))
|
|
sk_lsns = [
|
|
Lsn(int(sk.query_one("safekeeper_commit_lsn", filter=tt).value)) for sk in sk_metrics
|
|
]
|
|
|
|
log.info(f"ps_lsn: {ps_lsn}")
|
|
log.info(f"sk_lsns: {sk_lsns}")
|
|
|
|
assert ps_lsn <= max(sk_lsns)
|
|
assert ps_lsn > Lsn(0)
|
|
|
|
# Test common metrics
|
|
for metrics in all_metrics:
|
|
log.info(f"Checking common metrics for {metrics.name}")
|
|
|
|
log.info(
|
|
f"process_cpu_seconds_total: {metrics.query_one('process_cpu_seconds_total').value}"
|
|
)
|
|
log.info(f"process_threads: {int(metrics.query_one('process_threads').value)}")
|
|
log.info(
|
|
f"process_resident_memory_bytes (MB): {metrics.query_one('process_resident_memory_bytes').value / 1024 / 1024}"
|
|
)
|
|
log.info(
|
|
f"process_virtual_memory_bytes (MB): {metrics.query_one('process_virtual_memory_bytes').value / 1024 / 1024}"
|
|
)
|
|
log.info(f"process_open_fds: {int(metrics.query_one('process_open_fds').value)}")
|
|
log.info(f"process_max_fds: {int(metrics.query_one('process_max_fds').value)}")
|
|
log.info(
|
|
f"process_start_time_seconds (UTC): {datetime.fromtimestamp(metrics.query_one('process_start_time_seconds').value)}"
|
|
)
|
|
|
|
for io_direction in ["read", "write"]:
|
|
# Querying all metrics for number of bytes read/written by pageserver in another AZ
|
|
io_metrics = query_all_safekeepers(
|
|
"safekeeper_pg_io_bytes_total",
|
|
{
|
|
"app_name": "pageserver",
|
|
"client_az": "test_ps_az",
|
|
"dir": io_direction,
|
|
"same_az": "false",
|
|
},
|
|
)
|
|
total_bytes = sum(int(metric.value) for metric in io_metrics)
|
|
log.info(f"Pageserver {io_direction} bytes from another AZ: {total_bytes}")
|
|
# We expect some bytes to be read/written, to make sure metrics are working
|
|
assert total_bytes > 0
|
|
|
|
# Test (a subset of) safekeeper global metrics
|
|
for sk_m in sk_metrics:
|
|
# Test that every safekeeper has read some bytes
|
|
assert any(
|
|
map(
|
|
lambda x: x.value > 0,
|
|
sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "read"}),
|
|
)
|
|
), f"{sk_m.name} has not read bytes"
|
|
|
|
# Test that every safekeeper has written some bytes
|
|
assert any(
|
|
map(
|
|
lambda x: x.value > 0,
|
|
sk_m.query_all("safekeeper_pg_io_bytes_total", {"dir": "write"}),
|
|
)
|
|
), f"{sk_m.name} has not written bytes"
|
|
|
|
# Test (a subset of) pageserver global metrics
|
|
for metric in PAGESERVER_GLOBAL_METRICS:
|
|
if metric.startswith("pageserver_remote"):
|
|
continue
|
|
|
|
ps_samples = ps_metrics.query_all(metric, {})
|
|
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
|
|
for sample in ps_samples:
|
|
labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()])
|
|
log.info(f"{sample.name}{{{labels}}} {sample.value}")
|
|
|
|
# Test that we gather tenant create metric
|
|
storage_operation_metrics = [
|
|
"pageserver_storage_operations_seconds_global_bucket",
|
|
"pageserver_storage_operations_seconds_global_sum",
|
|
"pageserver_storage_operations_seconds_global_count",
|
|
]
|
|
for metric in storage_operation_metrics:
|
|
value = ps_metrics.query_all(metric, filter={"operation": "create tenant"})
|
|
assert value
|
|
|
|
|
|
def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilder):
|
|
"""Tests that when a tenant is detached, the tenant specific metrics are not left behind"""
|
|
|
|
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
|
|
|
neon_env_builder.num_safekeepers = 3
|
|
|
|
env = neon_env_builder.init_start()
|
|
tenant_1, _ = env.neon_cli.create_tenant()
|
|
tenant_2, _ = env.neon_cli.create_tenant()
|
|
|
|
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_1)
|
|
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_2)
|
|
|
|
endpoint_tenant1 = env.endpoints.create_start(
|
|
"test_metrics_removed_after_detach", tenant_id=tenant_1
|
|
)
|
|
endpoint_tenant2 = env.endpoints.create_start(
|
|
"test_metrics_removed_after_detach", tenant_id=tenant_2
|
|
)
|
|
|
|
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
|
|
with closing(endpoint.connect()) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
|
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
|
cur.execute("SELECT sum(key) FROM t")
|
|
assert cur.fetchone() == (5000050000,)
|
|
endpoint.stop()
|
|
|
|
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
|
|
ps_metrics = env.pageserver.http_client().get_metrics()
|
|
samples = []
|
|
for metric_name in ps_metrics.metrics:
|
|
for sample in ps_metrics.query_all(
|
|
name=metric_name, filter={"tenant_id": str(tenant_id)}
|
|
):
|
|
samples.append(sample)
|
|
return samples
|
|
|
|
for tenant in [tenant_1, tenant_2]:
|
|
pre_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
|
|
expected = set(PAGESERVER_PER_TENANT_METRICS)
|
|
assert pre_detach_samples == expected
|
|
|
|
env.pageserver.http_client().tenant_detach(tenant)
|
|
|
|
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
|
|
assert post_detach_samples == set()
|
|
|
|
|
|
def test_pageserver_with_empty_tenants(neon_env_builder: NeonEnvBuilder):
|
|
env = neon_env_builder.init_start()
|
|
|
|
env.pageserver.allowed_errors.extend(
|
|
[
|
|
".*marking .* as locally complete, while it doesnt exist in remote index.*",
|
|
".*load failed.*list timelines directory.*",
|
|
]
|
|
)
|
|
|
|
client = env.pageserver.http_client()
|
|
|
|
tenant_with_empty_timelines = env.initial_tenant
|
|
timeline_delete_wait_completed(client, tenant_with_empty_timelines, env.initial_timeline)
|
|
|
|
files_in_timelines_dir = sum(
|
|
1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines))
|
|
)
|
|
assert (
|
|
files_in_timelines_dir == 0
|
|
), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory"
|
|
|
|
# Trigger timeline re-initialization after pageserver restart
|
|
env.endpoints.stop_all()
|
|
env.pageserver.stop()
|
|
|
|
env.pageserver.start()
|
|
|
|
client = env.pageserver.http_client()
|
|
|
|
def not_attaching():
|
|
tenants = client.tenant_list()
|
|
assert len(tenants) == 1
|
|
assert all(t["state"]["slug"] != "Attaching" for t in tenants)
|
|
|
|
wait_until(10, 0.2, not_attaching)
|
|
|
|
tenants = client.tenant_list()
|
|
|
|
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
|
|
assert (
|
|
loaded_tenant["state"]["slug"] == "Active"
|
|
), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation"
|
|
|
|
loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines)
|
|
assert (
|
|
loaded_tenant_status["state"]["slug"] == "Active"
|
|
), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active"
|
|
|
|
time.sleep(1) # to allow metrics propagation
|
|
|
|
ps_metrics = client.get_metrics()
|
|
active_tenants_metric_filter = {
|
|
"state": "Active",
|
|
}
|
|
|
|
tenant_active_count = int(
|
|
ps_metrics.query_one(
|
|
"pageserver_tenant_states_count", filter=active_tenants_metric_filter
|
|
).value
|
|
)
|
|
|
|
assert (
|
|
tenant_active_count == 1
|
|
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
|