From c4fc6e433db3de336598ff4950017acc5ef2b1e8 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 17 Aug 2023 10:03:34 +0100 Subject: [PATCH] tests: add e2e deletion queue recovery test --- test_runner/fixtures/pageserver/http.py | 6 +- test_runner/regress/test_remote_storage.py | 242 ++++++++++++++------ test_runner/regress/test_timeline_delete.py | 2 +- 3 files changed, 175 insertions(+), 75 deletions(-) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index dcec0fd0d8..1ef2cb05ff 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -614,5 +614,7 @@ class PageserverHttpClient(requests.Session): ) self.verbose_error(res) - def deletion_queue_flush_execute(self): - self.put(f"http://localhost:{self.port}/v1/deletion_queue/flush_execute").raise_for_status() + def deletion_queue_flush(self, execute: bool = False): + self.put( + f"http://localhost:{self.port}/v1/deletion_queue/flush?execute={'true' if execute else 'false'}" + ).raise_for_status() diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 84ebebdfc3..cd64817425 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -12,7 +12,10 @@ from typing import Dict, List, Optional, Tuple import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( + NeonEnv, NeonEnvBuilder, + PgBin, + last_flush_lsn_upload, wait_for_last_flush_lsn, ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient @@ -250,10 +253,6 @@ def test_remote_storage_upload_queue_retries( client = env.pageserver.http_client() - endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) - - endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") - def configure_storage_write_failpoints(action): client.configure_failpoints( [ @@ -269,23 +268,6 @@ def test_remote_storage_upload_queue_retries( ] ) - def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data): - # create initial set of layers & upload them with failpoints configured - endpoint.safe_psql_many( - [ - f""" - INSERT INTO foo (id, val) - SELECT g, '{data}' - FROM generate_series(1, 20000) g - ON CONFLICT (id) DO UPDATE - SET val = EXCLUDED.val - """, - # to ensure that GC can actually remove some layers - "VACUUM foo", - ] - ) - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - def get_queued_count(file_kind, op_kind): val = client.get_remote_timeline_client_metric( "pageserver_remote_timeline_client_calls_unfinished", @@ -313,49 +295,19 @@ def test_remote_storage_upload_queue_retries( else: return int(executed) - def get_deletion_queue_depth() -> int: - """ - Queue depth if at least one deletion has been submitted, else None - """ - submitted = client.get_metric_value("pageserver_deletion_queue_submitted_total") - - if submitted is None: - return 0 - - executed = client.get_metric_value("pageserver_deletion_queue_executed_total") - executed = 0 if executed is None else executed - - depth = submitted - executed - assert depth >= 0 - - log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})") - return int(depth) - - # create some layers & wait for uploads to finish - overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a") - client.timeline_checkpoint(tenant_id, timeline_id) - client.timeline_compact(tenant_id, timeline_id) - overwrite_data_and_wait_for_it_to_arrive_at_pageserver("b") - client.timeline_checkpoint(tenant_id, timeline_id) - client.timeline_compact(tenant_id, timeline_id) - gc_result = client.timeline_gc(tenant_id, timeline_id, 0) - print_gc_result(gc_result) - assert gc_result["layers_removed"] > 0 - - def assert_deletion_queue(size_fn) -> None: - v = get_deletion_queue_depth() - assert v is not None - assert size_fn(v) is True + # Push some uploads into the remote_timeline_client queues, before failpoints + # are enabled: these should execute and the queue should revert to zero depth + generate_uploads_and_deletions(env, tenant_id=tenant_id, timeline_id=timeline_id) wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) # Wait for some deletions to happen in the above compactions, assert that # our metrics of interest exist - wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v is not None)) + wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v is not None)) # Before enabling failpoints, flushing deletions through should work - client.deletion_queue_flush_execute() + client.deletion_queue_flush(execute=True) executed = client.get_metric_value("pageserver_deletion_queue_executed_total") assert executed is not None assert executed > 0 @@ -375,15 +327,9 @@ def test_remote_storage_upload_queue_retries( churn_thread_result = [False] def churn_while_failpoints_active(result): - overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c") - client.timeline_checkpoint(tenant_id, timeline_id) - client.timeline_compact(tenant_id, timeline_id) - overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d") - client.timeline_checkpoint(tenant_id, timeline_id) - client.timeline_compact(tenant_id, timeline_id) - gc_result = client.timeline_gc(tenant_id, timeline_id, 0) - print_gc_result(gc_result) - assert gc_result["layers_removed"] > 0 + generate_uploads_and_deletions( + env, init=False, tenant_id=tenant_id, timeline_id=timeline_id, data="d" + ) result[0] = True churn_while_failpoints_active_thread = threading.Thread( @@ -399,7 +345,7 @@ def test_remote_storage_upload_queue_retries( # Deletion queue should not grow, because deletions wait for upload of # metadata, and we blocked that upload. - wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0)) + wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0)) # No more deletions should have executed assert get_deletions_executed() == deletions_executed_pre_failpoint @@ -414,12 +360,12 @@ def test_remote_storage_upload_queue_retries( # Deletions should have been enqueued now that index uploads proceeded log.info("Waiting to see deletions enqueued") - wait_until(5, 1, lambda: assert_deletion_queue(lambda v: v > 0)) + wait_until(5, 1, lambda: assert_deletion_queue(client, lambda v: v > 0)) # Deletions should not be processed while failpoint is still active. - client.deletion_queue_flush_execute() - assert get_deletion_queue_depth() is not None - assert get_deletion_queue_depth() > 0 + client.deletion_queue_flush(execute=True) + assert get_deletion_queue_depth(client) is not None + assert get_deletion_queue_depth(client) > 0 assert get_deletions_executed() == deletions_executed_pre_failpoint assert get_deletion_errors("failpoint") > 0 @@ -428,9 +374,9 @@ def test_remote_storage_upload_queue_retries( # issue a flush to the deletion queue -- otherwise it won't retry until hits # a deadline. - client.deletion_queue_flush_execute() + client.deletion_queue_flush(execute=True) # Queue should drain, which should involve executing some deletions - wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0)) + wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0)) assert get_deletions_executed() > deletions_executed_pre_failpoint finally: @@ -1025,4 +971,156 @@ def assert_nothing_to_upload( assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"]) +def get_deletion_queue_depth(ps_http) -> int: + """ + Queue depth if at least one deletion has been submitted, else None + """ + submitted = ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") + + if submitted is None: + return 0 + + executed = ps_http.get_metric_value("pageserver_deletion_queue_executed_total") + executed = 0 if executed is None else executed + + depth = submitted - executed + assert depth >= 0 + + log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})") + return int(depth) + + +def assert_deletion_queue(ps_http, size_fn) -> None: + v = get_deletion_queue_depth(ps_http) + assert v is not None + assert size_fn(v) is True + + # TODO Test that we correctly handle GC of files that are stuck in upload queue. + + +def generate_uploads_and_deletions( + env: NeonEnv, + *, + init: bool = True, + tenant_id: Optional[TenantId] = None, + timeline_id: Optional[TimelineId] = None, + data: Optional[str] = None, +): + """ + Using the environment's default tenant + timeline, generate a load pattern + that results in some uploads and some deletions to remote storage. + """ + + if tenant_id is None: + tenant_id = env.initial_tenant + assert tenant_id is not None + + if timeline_id is None: + timeline_id = env.initial_timeline + assert timeline_id is not None + + ps_http = env.pageserver.http_client() + + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + if init: + endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") + last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + + def churn(data): + endpoint.safe_psql_many( + [ + f""" + INSERT INTO foo (id, val) + SELECT g, '{data}' + FROM generate_series(1, 20000) g + ON CONFLICT (id) DO UPDATE + SET val = EXCLUDED.val + """, + # to ensure that GC can actually remove some layers + "VACUUM foo", + ] + ) + assert tenant_id is not None + assert timeline_id is not None + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + ps_http.timeline_checkpoint(tenant_id, timeline_id) + + # Compaction should generate some GC-elegible layers + for i in range(0, 2): + churn(f"{i if data is None else data}") + + gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0) + print_gc_result(gc_result) + assert gc_result["layers_removed"] > 0 + + +@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) +def test_deletion_queue_recovery( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, + pg_bin: PgBin, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_deletion_queue_recovery", + ) + + env = neon_env_builder.init_start( + initial_tenant_conf={ + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{128 * 1024}", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers eagerly, so that GC can remove some layers + "image_creation_threshold": "1", + } + ) + + ps_http = env.pageserver.http_client() + + # Prevent deletion lists from being executed, to build up some backlog of deletions + ps_http.configure_failpoints( + [ + ("deletion-queue-before-execute", "return"), + ] + ) + + generate_uploads_and_deletions(env) + + # There should be entries in the deletion queue + assert_deletion_queue(ps_http, lambda n: n > 0) + ps_http.deletion_queue_flush() + before_restart_depth = get_deletion_queue_depth(ps_http) + + log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued") + env.pageserver.stop(immediate=True) + env.pageserver.start() + + def assert_deletions_submitted(n: int): + assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n + + # Wait for recovery to complete (this is fast but async, so need a wait_until) + # + # After restart the failpoint is reset: execution may proceed. If we deleted enough layers + # to fill a DeleteObjects request, those will have executed already, so we check the total + # number of deletions recovered ("submitted") rather than the queue length. + wait_until(20, 0.25, lambda: assert_deletions_submitted(before_restart_depth)) + + # The queue should drain through completely if we flush it + ps_http.deletion_queue_flush(execute=True) + wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0)) + + # Restart again + env.pageserver.stop(immediate=True) + env.pageserver.start() + + # No deletion lists should be recovered: this demonstrates that deletion lists + # were cleaned up after being executed. + time.sleep(1) + assert_deletion_queue(ps_http, lambda n: n == 0) diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index ee0bf1e1fd..bd65e6cf83 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -275,7 +275,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints( # Flush deletion queue before restart/retry, so that anything logically deleted before the # failpoint is really deleted. - ps_http.deletion_queue_flush_execute() + ps_http.deletion_queue_flush(execute=True) if check is Check.RETRY_WITH_RESTART: env.pageserver.stop()