mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
test_remote_storage_upload_queue_retries: actually generate Delete ops
This commit is contained in:
@@ -243,20 +243,6 @@ def test_remote_storage_upload_queue_retries(
|
||||
)
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
|
||||
# let all of them queue up
|
||||
configure_storage_sync_failpoints("return")
|
||||
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
# now overwrite it again
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("b")
|
||||
# trigger layer deletion by doing Compaction, then GC
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
# confirm all operations are queued up
|
||||
def get_queued_count(file_kind, op_kind):
|
||||
metrics = client.get_metrics()
|
||||
matches = re.search(
|
||||
@@ -267,14 +253,43 @@ def test_remote_storage_upload_queue_retries(
|
||||
assert matches
|
||||
return int(matches[1])
|
||||
|
||||
# create some layers & wait for uploads to finish
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("b")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
|
||||
|
||||
# let all future operations queue up
|
||||
configure_storage_sync_failpoints("return")
|
||||
|
||||
# create more churn to generate all upload ops
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
# ensure that all operation types that can be in the upload queue have queued up
|
||||
assert get_queued_count(file_kind="layer", op_kind="upload") > 0
|
||||
assert get_queued_count(file_kind="index", op_kind="upload") >= 2
|
||||
assert get_queued_count(file_kind="layer", op_kind="remove") > 0
|
||||
assert get_queued_count(file_kind="layer", op_kind="delete") > 0
|
||||
|
||||
# unblock all operations and wait for them to finish
|
||||
configure_storage_sync_failpoints("off")
|
||||
wait_until(10, 1, lambda: get_queued_count() == 0)
|
||||
|
||||
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
|
||||
wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
|
||||
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
|
||||
|
||||
# try a restore to verify that the uploads worked
|
||||
# XXX: should vary this test to selectively fail just layer uploads, index uploads, deletions
|
||||
@@ -303,4 +318,4 @@ def test_remote_storage_upload_queue_retries(
|
||||
log.info("restarting postgres to validate")
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
with pg.cursor() as cur:
|
||||
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'b'") == 10000
|
||||
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000
|
||||
|
||||
Reference in New Issue
Block a user