Files
neon/test_runner/fixtures/safekeeper_utils.py
John Spray b953daa21f safekeeper: allow remote deletion to proceed after dropped requests (#11042)
## Problem

If a caller times out on safekeeper timeline deletion on a large
timeline, and waits a while before retrying, the deletion will not
progress while the retry is waiting. The net effect is very very slow
deletion as it only proceeds in 30 second bursts across 5 minute idle
periods.

Related: https://github.com/neondatabase/neon/issues/10265

## Summary of changes

- Run remote deletion in a background task
- Carry a watch::Receiver on the Timeline for other callers to join the
wait
- Restart deletion if the API is called again and the previous attempt
failed
2025-03-03 16:03:51 +00:00

93 lines
3.1 KiB
Python

from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonPageserver, Safekeeper
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import get_dir_size
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(sk.timeline_dir(tenant_id, timeline_id))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
def wait_lsn_force_checkpoint(
tenant_id: TenantId,
timeline_id: TimelineId,
endpoint: Endpoint,
ps: NeonPageserver,
pageserver_conn_options=None,
):
pageserver_conn_options = pageserver_conn_options or {}
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at_sk(
safekeeper: Safekeeper,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
sk_flush_lsn = safekeeper.get_flush_lsn(tenant_id, timeline_id)
wait_lsn_force_checkpoint_at(sk_flush_lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at(
lsn: Lsn,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
"""
Wait until pageserver receives given lsn, force checkpoint and wait for
upload, i.e. remote_consistent_lsn advancement.
"""
pageserver_conn_options = pageserver_conn_options or {}
auth_token = None
if "password" in pageserver_conn_options:
auth_token = pageserver_conn_options["password"]
# wait for the pageserver to catch up
wait_for_last_record_lsn(
ps.http_client(auth_token=auth_token),
tenant_id,
timeline_id,
lsn,
)
# force checkpoint to advance remote_consistent_lsn
ps.http_client(auth_token).timeline_checkpoint(tenant_id, timeline_id)
# ensure that remote_consistent_lsn is advanced
wait_for_upload(
ps.http_client(auth_token=auth_token),
tenant_id,
timeline_id,
lsn,
)