mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
tests: add e2e deletion queue recovery test
This commit is contained in:
@@ -614,5 +614,7 @@ class PageserverHttpClient(requests.Session):
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def deletion_queue_flush_execute(self):
|
||||
self.put(f"http://localhost:{self.port}/v1/deletion_queue/flush_execute").raise_for_status()
|
||||
def deletion_queue_flush(self, execute: bool = False):
|
||||
self.put(
|
||||
f"http://localhost:{self.port}/v1/deletion_queue/flush?execute={'true' if execute else 'false'}"
|
||||
).raise_for_status()
|
||||
|
||||
@@ -12,7 +12,10 @@ from typing import Dict, List, Optional, Tuple
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
@@ -250,10 +253,6 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
|
||||
def configure_storage_write_failpoints(action):
|
||||
client.configure_failpoints(
|
||||
[
|
||||
@@ -269,23 +268,6 @@ def test_remote_storage_upload_queue_retries(
|
||||
]
|
||||
)
|
||||
|
||||
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
|
||||
# create initial set of layers & upload them with failpoints configured
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 20000) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
def get_queued_count(file_kind, op_kind):
|
||||
val = client.get_remote_timeline_client_metric(
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
@@ -313,49 +295,19 @@ def test_remote_storage_upload_queue_retries(
|
||||
else:
|
||||
return int(executed)
|
||||
|
||||
def get_deletion_queue_depth() -> int:
|
||||
"""
|
||||
Queue depth if at least one deletion has been submitted, else None
|
||||
"""
|
||||
submitted = client.get_metric_value("pageserver_deletion_queue_submitted_total")
|
||||
|
||||
if submitted is None:
|
||||
return 0
|
||||
|
||||
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
|
||||
executed = 0 if executed is None else executed
|
||||
|
||||
depth = submitted - executed
|
||||
assert depth >= 0
|
||||
|
||||
log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})")
|
||||
return int(depth)
|
||||
|
||||
# create some layers & wait for uploads to finish
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("b")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
def assert_deletion_queue(size_fn) -> None:
|
||||
v = get_deletion_queue_depth()
|
||||
assert v is not None
|
||||
assert size_fn(v) is True
|
||||
# Push some uploads into the remote_timeline_client queues, before failpoints
|
||||
# are enabled: these should execute and the queue should revert to zero depth
|
||||
generate_uploads_and_deletions(env, tenant_id=tenant_id, timeline_id=timeline_id)
|
||||
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
|
||||
|
||||
# Wait for some deletions to happen in the above compactions, assert that
|
||||
# our metrics of interest exist
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v is not None))
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v is not None))
|
||||
|
||||
# Before enabling failpoints, flushing deletions through should work
|
||||
client.deletion_queue_flush_execute()
|
||||
client.deletion_queue_flush(execute=True)
|
||||
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
|
||||
assert executed is not None
|
||||
assert executed > 0
|
||||
@@ -375,15 +327,9 @@ def test_remote_storage_upload_queue_retries(
|
||||
churn_thread_result = [False]
|
||||
|
||||
def churn_while_failpoints_active(result):
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
generate_uploads_and_deletions(
|
||||
env, init=False, tenant_id=tenant_id, timeline_id=timeline_id, data="d"
|
||||
)
|
||||
result[0] = True
|
||||
|
||||
churn_while_failpoints_active_thread = threading.Thread(
|
||||
@@ -399,7 +345,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
# Deletion queue should not grow, because deletions wait for upload of
|
||||
# metadata, and we blocked that upload.
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0))
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0))
|
||||
|
||||
# No more deletions should have executed
|
||||
assert get_deletions_executed() == deletions_executed_pre_failpoint
|
||||
@@ -414,12 +360,12 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
# Deletions should have been enqueued now that index uploads proceeded
|
||||
log.info("Waiting to see deletions enqueued")
|
||||
wait_until(5, 1, lambda: assert_deletion_queue(lambda v: v > 0))
|
||||
wait_until(5, 1, lambda: assert_deletion_queue(client, lambda v: v > 0))
|
||||
|
||||
# Deletions should not be processed while failpoint is still active.
|
||||
client.deletion_queue_flush_execute()
|
||||
assert get_deletion_queue_depth() is not None
|
||||
assert get_deletion_queue_depth() > 0
|
||||
client.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(client) is not None
|
||||
assert get_deletion_queue_depth(client) > 0
|
||||
assert get_deletions_executed() == deletions_executed_pre_failpoint
|
||||
assert get_deletion_errors("failpoint") > 0
|
||||
|
||||
@@ -428,9 +374,9 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
# issue a flush to the deletion queue -- otherwise it won't retry until hits
|
||||
# a deadline.
|
||||
client.deletion_queue_flush_execute()
|
||||
client.deletion_queue_flush(execute=True)
|
||||
# Queue should drain, which should involve executing some deletions
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0))
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0))
|
||||
assert get_deletions_executed() > deletions_executed_pre_failpoint
|
||||
|
||||
finally:
|
||||
@@ -1025,4 +971,156 @@ def assert_nothing_to_upload(
|
||||
assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"])
|
||||
|
||||
|
||||
def get_deletion_queue_depth(ps_http) -> int:
|
||||
"""
|
||||
Queue depth if at least one deletion has been submitted, else None
|
||||
"""
|
||||
submitted = ps_http.get_metric_value("pageserver_deletion_queue_submitted_total")
|
||||
|
||||
if submitted is None:
|
||||
return 0
|
||||
|
||||
executed = ps_http.get_metric_value("pageserver_deletion_queue_executed_total")
|
||||
executed = 0 if executed is None else executed
|
||||
|
||||
depth = submitted - executed
|
||||
assert depth >= 0
|
||||
|
||||
log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})")
|
||||
return int(depth)
|
||||
|
||||
|
||||
def assert_deletion_queue(ps_http, size_fn) -> None:
|
||||
v = get_deletion_queue_depth(ps_http)
|
||||
assert v is not None
|
||||
assert size_fn(v) is True
|
||||
|
||||
|
||||
# TODO Test that we correctly handle GC of files that are stuck in upload queue.
|
||||
|
||||
|
||||
def generate_uploads_and_deletions(
|
||||
env: NeonEnv,
|
||||
*,
|
||||
init: bool = True,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
data: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Using the environment's default tenant + timeline, generate a load pattern
|
||||
that results in some uploads and some deletions to remote storage.
|
||||
"""
|
||||
|
||||
if tenant_id is None:
|
||||
tenant_id = env.initial_tenant
|
||||
assert tenant_id is not None
|
||||
|
||||
if timeline_id is None:
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
if init:
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
def churn(data):
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 20000) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
assert tenant_id is not None
|
||||
assert timeline_id is not None
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# Compaction should generate some GC-elegible layers
|
||||
for i in range(0, 2):
|
||||
churn(f"{i if data is None else data}")
|
||||
|
||||
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_deletion_queue_recovery(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_deletion_queue_recovery",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "0s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
}
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
)
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
|
||||
# There should be entries in the deletion queue
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
ps_http.deletion_queue_flush()
|
||||
before_restart_depth = get_deletion_queue_depth(ps_http)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
|
||||
def assert_deletions_submitted(n: int):
|
||||
assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n
|
||||
|
||||
# Wait for recovery to complete (this is fast but async, so need a wait_until)
|
||||
#
|
||||
# After restart the failpoint is reset: execution may proceed. If we deleted enough layers
|
||||
# to fill a DeleteObjects request, those will have executed already, so we check the total
|
||||
# number of deletions recovered ("submitted") rather than the queue length.
|
||||
wait_until(20, 0.25, lambda: assert_deletions_submitted(before_restart_depth))
|
||||
|
||||
# The queue should drain through completely if we flush it
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
|
||||
|
||||
# Restart again
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
|
||||
# No deletion lists should be recovered: this demonstrates that deletion lists
|
||||
# were cleaned up after being executed.
|
||||
time.sleep(1)
|
||||
assert_deletion_queue(ps_http, lambda n: n == 0)
|
||||
|
||||
@@ -275,7 +275,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
|
||||
# Flush deletion queue before restart/retry, so that anything logically deleted before the
|
||||
# failpoint is really deleted.
|
||||
ps_http.deletion_queue_flush_execute()
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
|
||||
if check is Check.RETRY_WITH_RESTART:
|
||||
env.pageserver.stop()
|
||||
|
||||
Reference in New Issue
Block a user