diff --git a/pageserver/src/storage_sync/delete.rs b/pageserver/src/storage_sync/delete.rs index 88567ef624..99b013f96b 100644 --- a/pageserver/src/storage_sync/delete.rs +++ b/pageserver/src/storage_sync/delete.rs @@ -9,6 +9,9 @@ pub(super) async fn delete_layer( storage: &GenericRemoteStorage, local_layer_path: &Path, ) -> anyhow::Result<()> { + fail::fail_point!("before-delete-layer", |_| { + anyhow::bail!("failpoint before-delete-layer") + }); async { debug!( "Deleting layer from remote storage: {:?}", diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 80f9215a96..c8fe19d01e 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -5,19 +5,20 @@ import os import shutil import time from pathlib import Path +import re import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, RemoteStorageKind, - assert_no_in_progress_downloads_for_tenant, available_remote_storages, wait_for_last_record_lsn, + wait_for_last_flush_lsn, wait_for_upload, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import query_scalar, wait_until +from fixtures.utils import query_scalar, wait_until, print_gc_result # @@ -97,6 +98,15 @@ def test_remote_storage_backup_and_restore( pageserver_http.timeline_checkpoint(tenant_id, timeline_id) log.info(f"waiting for checkpoint {checkpoint_number} upload") + + # insert upload failpoints to exercise retry code path + action = "3*return->off" if checkpoint_number == 0 else "off" + pageserver_http.configure_failpoints( + [ + ("before-upload-layer", action), + ("before-upload-index", action), + ] + ) # wait until pageserver successfully uploaded a checkpoint to remote storage wait_for_upload(client, tenant_id, timeline_id, current_lsn) log.info(f"upload of checkpoint {checkpoint_number} is done") @@ -160,3 +170,121 @@ def test_remote_storage_backup_and_restore( query_scalar(cur, f"SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};") == f"{data_secret}|{checkpoint_number}" ) + + +@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) +def test_remote_storage_upload_queue_retries( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, +): + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_remote_storage_backup_and_restore", + ) + + env = neon_env_builder.init_start() + + # create tenant with config that will determinstically allow + # compaction and gc + tenant_id, timeline_id = env.neon_cli.create_tenant( + conf={ + # "gc_horizon": f"{1024 ** 2}", + "checkpoint_distance": f"{1024 ** 2}", + "compaction_threshold": "1", + "compaction_target_size": f"{1024 ** 2}", + # small PITR interval to allow gc + "pitr_interval": "1 s", + } + ) + + client = env.pageserver.http_client() + + pg = env.postgres.create_start("main", tenant_id=tenant_id) + + pg.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") + + def configure_storage_sync_failpoints(action): + client.configure_failpoints( + [ + ("before-upload-layer", action), + ("before-upload-index", action), + ("before-delete-layer", action), + ] + ) + + def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data): + # create initial set of layers & upload them with failpoints configured + pg.safe_psql( + f""" + INSERT INTO foo (id, val) + SELECT g, '{data}' + FROM generate_series(1, 1000) g + ON CONFLICT (id) DO UPDATE + SET val = EXCLUDED.val + """ + ) + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + + # let all of them queue up + configure_storage_sync_failpoints("return") + + overwrite_data_and_wait_for_it_to_arrive_at_pageserver('a') + client.timeline_checkpoint(tenant_id, timeline_id) + # now overwrite it again + overwrite_data_and_wait_for_it_to_arrive_at_pageserver('b') + # trigger layer deletion by doing Compaction, then GC + client.timeline_compact(tenant_id, timeline_id) + gc_result = client.timeline_gc(tenant_id, timeline_id, 0) + print_gc_result(gc_result) + # FIXME why doesn't this assertion work? I think GC is happening.... + #assert gc_result["layers_removed"] > 0 + + # confirm all operations are queued up + def get_queued_count(): + metrics = client.get_metrics() + matches = re.search( + f'^pageserver_remote_upload_queue_unfinished_tasks{{tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$', + metrics, + re.MULTILINE, + ) + assert matches + return int(matches[1]) + + # ensure that operations have queued up + queued_count = get_queued_count() + log.info(f"queued_count={queued_count}") + assert queued_count > 0 + + # unblock all operations and wait for them to finish + configure_storage_sync_failpoints("off") + wait_until(10, 1, lambda: get_queued_count() == 0) + + # try a restore to verify that the uploads worked + # XXX: should vary this test to selectively fail just layer uploads, index uploads, deletions + # but how do we validate the result after restore? + + env.pageserver.stop(immediate=True) + env.postgres.stop_all() + + dir_to_clear = Path(env.repo_dir) / "tenants" + shutil.rmtree(dir_to_clear) + os.mkdir(dir_to_clear) + + env.pageserver.start() + client = env.pageserver.http_client() + + client.tenant_attach(tenant_id) + + def tenant_active(): + all_states = client.tenant_list() + [tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id] + assert tenant["has_in_progress_downloads"] == False + assert tenant["state"] == {"Active": {"background_jobs_running": True}} + + wait_until(5, 1, tenant_active) + + log.info("restarting postgres to validate") + pg = env.postgres.create_start("main", tenant_id=tenant_id) + with pg.cursor() as cur: + assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'b'") == 1000