mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
WIP: add test for storage_sync upload retries
This commit is contained in:
@@ -9,6 +9,9 @@ pub(super) async fn delete_layer(
|
||||
storage: &GenericRemoteStorage,
|
||||
local_layer_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
fail::fail_point!("before-delete-layer", |_| {
|
||||
anyhow::bail!("failpoint before-delete-layer")
|
||||
});
|
||||
async {
|
||||
debug!(
|
||||
"Deleting layer from remote storage: {:?}",
|
||||
|
||||
@@ -5,19 +5,20 @@ import os
|
||||
import shutil
|
||||
import time
|
||||
from pathlib import Path
|
||||
import re
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
assert_no_in_progress_downloads_for_tenant,
|
||||
available_remote_storages,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
from fixtures.utils import query_scalar, wait_until, print_gc_result
|
||||
|
||||
|
||||
#
|
||||
@@ -97,6 +98,15 @@ def test_remote_storage_backup_and_restore(
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
log.info(f"waiting for checkpoint {checkpoint_number} upload")
|
||||
|
||||
# insert upload failpoints to exercise retry code path
|
||||
action = "3*return->off" if checkpoint_number == 0 else "off"
|
||||
pageserver_http.configure_failpoints(
|
||||
[
|
||||
("before-upload-layer", action),
|
||||
("before-upload-index", action),
|
||||
]
|
||||
)
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||
log.info(f"upload of checkpoint {checkpoint_number} is done")
|
||||
@@ -160,3 +170,121 @@ def test_remote_storage_backup_and_restore(
|
||||
query_scalar(cur, f"SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};")
|
||||
== f"{data_secret}|{checkpoint_number}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_remote_storage_upload_queue_retries(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_storage_backup_and_restore",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# "gc_horizon": f"{1024 ** 2}",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
# small PITR interval to allow gc
|
||||
"pitr_interval": "1 s",
|
||||
}
|
||||
)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
pg.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
|
||||
def configure_storage_sync_failpoints(action):
|
||||
client.configure_failpoints(
|
||||
[
|
||||
("before-upload-layer", action),
|
||||
("before-upload-index", action),
|
||||
("before-delete-layer", action),
|
||||
]
|
||||
)
|
||||
|
||||
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
|
||||
# create initial set of layers & upload them with failpoints configured
|
||||
pg.safe_psql(
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 1000) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
"""
|
||||
)
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
|
||||
# let all of them queue up
|
||||
configure_storage_sync_failpoints("return")
|
||||
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver('a')
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
# now overwrite it again
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver('b')
|
||||
# trigger layer deletion by doing Compaction, then GC
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
# FIXME why doesn't this assertion work? I think GC is happening....
|
||||
#assert gc_result["layers_removed"] > 0
|
||||
|
||||
# confirm all operations are queued up
|
||||
def get_queued_count():
|
||||
metrics = client.get_metrics()
|
||||
matches = re.search(
|
||||
f'^pageserver_remote_upload_queue_unfinished_tasks{{tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$',
|
||||
metrics,
|
||||
re.MULTILINE,
|
||||
)
|
||||
assert matches
|
||||
return int(matches[1])
|
||||
|
||||
# ensure that operations have queued up
|
||||
queued_count = get_queued_count()
|
||||
log.info(f"queued_count={queued_count}")
|
||||
assert queued_count > 0
|
||||
|
||||
# unblock all operations and wait for them to finish
|
||||
configure_storage_sync_failpoints("off")
|
||||
wait_until(10, 1, lambda: get_queued_count() == 0)
|
||||
|
||||
# try a restore to verify that the uploads worked
|
||||
# XXX: should vary this test to selectively fail just layer uploads, index uploads, deletions
|
||||
# but how do we validate the result after restore?
|
||||
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.postgres.stop_all()
|
||||
|
||||
dir_to_clear = Path(env.repo_dir) / "tenants"
|
||||
shutil.rmtree(dir_to_clear)
|
||||
os.mkdir(dir_to_clear)
|
||||
|
||||
env.pageserver.start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
client.tenant_attach(tenant_id)
|
||||
|
||||
def tenant_active():
|
||||
all_states = client.tenant_list()
|
||||
[tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id]
|
||||
assert tenant["has_in_progress_downloads"] == False
|
||||
assert tenant["state"] == {"Active": {"background_jobs_running": True}}
|
||||
|
||||
wait_until(5, 1, tenant_active)
|
||||
|
||||
log.info("restarting postgres to validate")
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
with pg.cursor() as cur:
|
||||
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'b'") == 1000
|
||||
|
||||
Reference in New Issue
Block a user