mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
146 lines
4.5 KiB
Python
146 lines
4.5 KiB
Python
import time
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.pageserver.http import PageserverHttpClient
|
|
from fixtures.types import Lsn, TenantId, TimelineId
|
|
|
|
|
|
def assert_tenant_status(
|
|
pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str
|
|
):
|
|
tenant_status = pageserver_http.tenant_status(tenant)
|
|
log.info(f"tenant_status: {tenant_status}")
|
|
assert tenant_status["state"] == expected_status, tenant_status
|
|
|
|
|
|
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
|
|
tenants = pageserver_http.tenant_list()
|
|
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
|
|
assert len(matching) < 2
|
|
if len(matching) == 0:
|
|
return None
|
|
return matching[0]
|
|
|
|
|
|
def remote_consistent_lsn(
|
|
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
|
|
) -> Lsn:
|
|
detail = pageserver_http.timeline_detail(tenant, timeline)
|
|
|
|
if detail["remote_consistent_lsn"] is None:
|
|
# No remote information at all. This happens right after creating
|
|
# a timeline, before any part of it has been uploaded to remote
|
|
# storage yet.
|
|
return Lsn(0)
|
|
else:
|
|
lsn_str = detail["remote_consistent_lsn"]
|
|
assert isinstance(lsn_str, str)
|
|
return Lsn(lsn_str)
|
|
|
|
|
|
def wait_for_upload(
|
|
pageserver_http: PageserverHttpClient,
|
|
tenant: TenantId,
|
|
timeline: TimelineId,
|
|
lsn: Lsn,
|
|
):
|
|
"""waits for local timeline upload up to specified lsn"""
|
|
for i in range(20):
|
|
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
|
|
if current_lsn >= lsn:
|
|
log.info("wait finished")
|
|
return
|
|
log.info(
|
|
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
|
|
lsn, current_lsn, i + 1
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
raise Exception(
|
|
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
|
|
lsn, current_lsn
|
|
)
|
|
)
|
|
|
|
|
|
def wait_until_tenant_state(
|
|
pageserver_http: PageserverHttpClient,
|
|
tenant_id: TenantId,
|
|
expected_state: str,
|
|
iterations: int,
|
|
) -> bool:
|
|
"""
|
|
Does not use `wait_until` for debugging purposes
|
|
"""
|
|
for _ in range(iterations):
|
|
try:
|
|
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
|
log.debug(f"Tenant {tenant_id} data: {tenant}")
|
|
if tenant["state"] == expected_state:
|
|
return True
|
|
except Exception as e:
|
|
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
|
|
|
time.sleep(1)
|
|
|
|
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
|
|
|
|
|
|
def wait_until_tenant_active(
|
|
pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30
|
|
):
|
|
wait_until_tenant_state(
|
|
pageserver_http, tenant_id, expected_state="Active", iterations=iterations
|
|
)
|
|
|
|
|
|
def last_record_lsn(
|
|
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
|
|
) -> Lsn:
|
|
detail = pageserver_http_client.timeline_detail(tenant, timeline)
|
|
|
|
lsn_str = detail["last_record_lsn"]
|
|
assert isinstance(lsn_str, str)
|
|
return Lsn(lsn_str)
|
|
|
|
|
|
def wait_for_last_record_lsn(
|
|
pageserver_http: PageserverHttpClient,
|
|
tenant: TenantId,
|
|
timeline: TimelineId,
|
|
lsn: Lsn,
|
|
) -> Lsn:
|
|
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
|
for i in range(10):
|
|
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
|
if current_lsn >= lsn:
|
|
return current_lsn
|
|
log.info(
|
|
"waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
|
|
lsn, current_lsn, i + 1
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
raise Exception(
|
|
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
|
|
)
|
|
|
|
|
|
def wait_for_upload_queue_empty(
|
|
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
|
|
):
|
|
while True:
|
|
all_metrics = pageserver_http.get_metrics()
|
|
tl = all_metrics.query_all(
|
|
"pageserver_remote_timeline_client_calls_unfinished",
|
|
{
|
|
"tenant_id": str(tenant_id),
|
|
"timeline_id": str(timeline_id),
|
|
},
|
|
)
|
|
assert len(tl) > 0
|
|
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
|
|
if all(m.value == 0 for m in tl):
|
|
return
|
|
time.sleep(0.2)
|