diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index a179ebdd09..dcec0fd0d8 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -613,3 +613,6 @@ class PageserverHttpClient(requests.Session): }, ) self.verbose_error(res) + + def deletion_queue_flush_execute(self): + self.put(f"http://localhost:{self.port}/v1/deletion_queue/flush_execute").raise_for_status() diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 4f5b193ce2..7e39b093b7 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -254,12 +254,18 @@ def test_remote_storage_upload_queue_retries( endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") - def configure_storage_sync_failpoints(action): + def configure_storage_write_failpoints(action): client.configure_failpoints( [ ("before-upload-layer", action), ("before-upload-index", action), - ("before-delete-layer", action), + ] + ) + + def configure_storage_delete_failpoints(action): + client.configure_failpoints( + [ + ("deletion-queue-before-execute", action), ] ) @@ -291,6 +297,40 @@ def test_remote_storage_upload_queue_retries( assert val is not None, "expecting metric to be present" return int(val) + def get_deletions_executed() -> int: + executed = client.get_metric_value("pageserver_deletion_queue_executed_total") + if executed is None: + return 0 + else: + return int(executed) + + def get_deletion_errors(op_type) -> int: + executed = client.get_metric_value( + "pageserver_deletion_queue_errors_total", {"op_kind": op_type} + ) + if executed is None: + return 0 + else: + return int(executed) + + def get_deletion_queue_depth() -> int: + """ + Queue depth if at least one deletion has been submitted, else None + """ + submitted = client.get_metric_value("pageserver_deletion_queue_submitted_total") + + if submitted is None: + return 0 + + executed = client.get_metric_value("pageserver_deletion_queue_executed_total") + executed = 0 if executed is None else executed + + depth = submitted - executed + assert depth >= 0 + + log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})") + return int(depth) + # create some layers & wait for uploads to finish overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a") client.timeline_checkpoint(tenant_id, timeline_id) @@ -302,12 +342,32 @@ def test_remote_storage_upload_queue_retries( print_gc_result(gc_result) assert gc_result["layers_removed"] > 0 + def assert_deletion_queue(size_fn) -> None: + v = get_deletion_queue_depth() + assert v is not None + assert size_fn(v) is True + wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) - wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0) + + # Wait for some deletions to happen in the above compactions, assert that + # our metrics of interest exist + wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v is not None)) + + # Before enabling failpoints, flushing deletions through should work + client.deletion_queue_flush_execute() + executed = client.get_metric_value("pageserver_deletion_queue_executed_total") + assert executed is not None + assert executed > 0 # let all future operations queue up - configure_storage_sync_failpoints("return") + configure_storage_write_failpoints("return") + configure_storage_delete_failpoints("return") + + # Snapshot of executed deletions: should not increment while failpoint is enabled + deletions_executed_pre_failpoint = client.get_metric_value( + "pageserver_deletion_queue_executed_total" + ) # Create more churn to generate all upload ops. # The checkpoint / compact / gc ops will block because they call remote_client.wait_completion(). @@ -329,24 +389,57 @@ def test_remote_storage_upload_queue_retries( churn_while_failpoints_active_thread = threading.Thread( target=churn_while_failpoints_active, args=[churn_thread_result] ) + log.info("Entered churn phase") churn_while_failpoints_active_thread.start() - # wait for churn thread's data to get stuck in the upload queue - wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0) - wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2) - wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0) + try: + # wait for churn thread's data to get stuck in the upload queue + wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0) + wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2) - # unblock churn operations - configure_storage_sync_failpoints("off") + # Deletion queue should not grow, because deletions wait for upload of + # metadata, and we blocked that upload. + wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0)) - # ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts. - wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) - wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) - wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0) + # No more deletions should have executed + assert get_deletions_executed() == deletions_executed_pre_failpoint + + # unblock write operations + log.info("Unblocking remote writes") + configure_storage_write_failpoints("off") + + # ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts. + wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) + wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) + + # Deletions should have been enqueued now that index uploads proceeded + log.info("Waiting to see deletions enqueued") + wait_until(5, 1, lambda: assert_deletion_queue(lambda v: v > 0)) + + # Deletions should not be processed while failpoint is still active. + client.deletion_queue_flush_execute() + assert get_deletion_queue_depth() is not None + assert get_deletion_queue_depth() > 0 + assert get_deletions_executed() == deletions_executed_pre_failpoint + assert get_deletion_errors("failpoint") > 0 + + log.info("Unblocking remote deletes") + configure_storage_delete_failpoints("off") + + # issue a flush to the deletion queue -- otherwise it won't retry until hits + # a deadline. + client.deletion_queue_flush_execute() + # Queue should drain, which should involve executing some deletions + wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0)) + assert get_deletions_executed() > deletions_executed_pre_failpoint + + finally: + # The churn thread doesn't make progress once it blocks on the first wait_completion() call, + # so, give it some time to wrap up. + log.info("Joining churn workload") + churn_while_failpoints_active_thread.join(30) + log.info("Joined churn workload") - # The churn thread doesn't make progress once it blocks on the first wait_completion() call, - # so, give it some time to wrap up. - churn_while_failpoints_active_thread.join(30) assert not churn_while_failpoints_active_thread.is_alive() assert churn_thread_result[0]