tests: update remote storage test for deletion queue

This commit is contained in:
John Spray
2023-08-15 10:06:22 +01:00
parent 3edd7ece40
commit 43c9a09d8f
2 changed files with 113 additions and 17 deletions

View File

@@ -613,3 +613,6 @@ class PageserverHttpClient(requests.Session):
},
)
self.verbose_error(res)
def deletion_queue_flush_execute(self):
self.put(f"http://localhost:{self.port}/v1/deletion_queue/flush_execute").raise_for_status()

View File

@@ -254,12 +254,18 @@ def test_remote_storage_upload_queue_retries(
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
def configure_storage_sync_failpoints(action):
def configure_storage_write_failpoints(action):
client.configure_failpoints(
[
("before-upload-layer", action),
("before-upload-index", action),
("before-delete-layer", action),
]
)
def configure_storage_delete_failpoints(action):
client.configure_failpoints(
[
("deletion-queue-before-execute", action),
]
)
@@ -291,6 +297,40 @@ def test_remote_storage_upload_queue_retries(
assert val is not None, "expecting metric to be present"
return int(val)
def get_deletions_executed() -> int:
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
if executed is None:
return 0
else:
return int(executed)
def get_deletion_errors(op_type) -> int:
executed = client.get_metric_value(
"pageserver_deletion_queue_errors_total", {"op_kind": op_type}
)
if executed is None:
return 0
else:
return int(executed)
def get_deletion_queue_depth() -> int:
"""
Queue depth if at least one deletion has been submitted, else None
"""
submitted = client.get_metric_value("pageserver_deletion_queue_submitted_total")
if submitted is None:
return 0
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
executed = 0 if executed is None else executed
depth = submitted - executed
assert depth >= 0
log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})")
return int(depth)
# create some layers & wait for uploads to finish
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
client.timeline_checkpoint(tenant_id, timeline_id)
@@ -302,12 +342,32 @@ def test_remote_storage_upload_queue_retries(
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
def assert_deletion_queue(size_fn) -> None:
v = get_deletion_queue_depth()
assert v is not None
assert size_fn(v) is True
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
# Wait for some deletions to happen in the above compactions, assert that
# our metrics of interest exist
wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v is not None))
# Before enabling failpoints, flushing deletions through should work
client.deletion_queue_flush_execute()
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
assert executed is not None
assert executed > 0
# let all future operations queue up
configure_storage_sync_failpoints("return")
configure_storage_write_failpoints("return")
configure_storage_delete_failpoints("return")
# Snapshot of executed deletions: should not increment while failpoint is enabled
deletions_executed_pre_failpoint = client.get_metric_value(
"pageserver_deletion_queue_executed_total"
)
# Create more churn to generate all upload ops.
# The checkpoint / compact / gc ops will block because they call remote_client.wait_completion().
@@ -329,24 +389,57 @@ def test_remote_storage_upload_queue_retries(
churn_while_failpoints_active_thread = threading.Thread(
target=churn_while_failpoints_active, args=[churn_thread_result]
)
log.info("Entered churn phase")
churn_while_failpoints_active_thread.start()
# wait for churn thread's data to get stuck in the upload queue
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0)
try:
# wait for churn thread's data to get stuck in the upload queue
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2)
# unblock churn operations
configure_storage_sync_failpoints("off")
# Deletion queue should not grow, because deletions wait for upload of
# metadata, and we blocked that upload.
wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0))
# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
# No more deletions should have executed
assert get_deletions_executed() == deletions_executed_pre_failpoint
# unblock write operations
log.info("Unblocking remote writes")
configure_storage_write_failpoints("off")
# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
# Deletions should have been enqueued now that index uploads proceeded
log.info("Waiting to see deletions enqueued")
wait_until(5, 1, lambda: assert_deletion_queue(lambda v: v > 0))
# Deletions should not be processed while failpoint is still active.
client.deletion_queue_flush_execute()
assert get_deletion_queue_depth() is not None
assert get_deletion_queue_depth() > 0
assert get_deletions_executed() == deletions_executed_pre_failpoint
assert get_deletion_errors("failpoint") > 0
log.info("Unblocking remote deletes")
configure_storage_delete_failpoints("off")
# issue a flush to the deletion queue -- otherwise it won't retry until hits
# a deadline.
client.deletion_queue_flush_execute()
# Queue should drain, which should involve executing some deletions
wait_until(2, 1, lambda: assert_deletion_queue(lambda v: v == 0))
assert get_deletions_executed() > deletions_executed_pre_failpoint
finally:
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
# so, give it some time to wrap up.
log.info("Joining churn workload")
churn_while_failpoints_active_thread.join(30)
log.info("Joined churn workload")
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
# so, give it some time to wrap up.
churn_while_failpoints_active_thread.join(30)
assert not churn_while_failpoints_active_thread.is_alive()
assert churn_thread_result[0]