tests: fix test_compaction_delete_before_upload

This commit is contained in:
John Spray
2023-10-24 11:23:36 +01:00
parent 96afb8327b
commit b7ff5cfca7
2 changed files with 61 additions and 25 deletions

View File

@@ -60,6 +60,8 @@ pub(super) async fn upload_timeline_layer<'a>(
bail!("failpoint before-upload-layer")
});
pausable_failpoint!("before-upload-layer-pausable");
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await;
let source_file = match source_file_res {

View File

@@ -757,12 +757,14 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
create_thread.join()
# Regression test for a race condition where L0 layers are compacted before the upload,
# resulting in the uploading complaining about the file not being found
# https://github.com/neondatabase/neon/issues/4526
def test_compaction_delete_before_upload(
def test_compaction_waits_for_upload(
neon_env_builder: NeonEnvBuilder,
):
"""
Compaction waits for outstanding uploads to complete, so that it avoids deleting layers
files that have not yet been uploaded. This test forces a race between upload and
compaction.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
@@ -792,50 +794,82 @@ def test_compaction_delete_before_upload(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# Now make the flushing hang and update one small piece of data
client.configure_failpoints(("flush-frozen-pausable", "pause"))
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_barrier = threading.Barrier(2)
def checkpoint_in_background():
barrier.wait()
try:
log.info("Checkpoint starting")
client.timeline_checkpoint(tenant_id, timeline_id)
q.put(None)
log.info("Checkpoint complete")
checkpoint_result.put(None)
except PageserverApiException as e:
q.put(e)
log.info("Checkpoint errored: {e}")
checkpoint_result.put(e)
create_thread = threading.Thread(target=checkpoint_in_background)
create_thread.start()
def compact_in_background():
compact_barrier.wait()
try:
log.info("Compaction starting")
client.timeline_compact(tenant_id, timeline_id)
log.info("Compaction complete")
compact_result.put(None)
except PageserverApiException as e:
log.info("Compaction errored: {e}")
compact_result.put(e)
checkpoint_thread = threading.Thread(target=checkpoint_in_background)
checkpoint_thread.start()
compact_thread = threading.Thread(target=compact_in_background)
compact_thread.start()
try:
barrier.wait()
# Start the checkpoint, see that it blocks
log.info("Waiting to see checkpoint hang...")
time.sleep(5)
assert checkpoint_result.empty()
time.sleep(4)
client.timeline_compact(tenant_id, timeline_id)
# Start the compaction, see that it finds work to do but blocks
compact_barrier.wait()
log.info("Waiting to see compaction hang...")
time.sleep(5)
assert compact_barrier.n_waiting == 0
assert compact_result.empty()
client.configure_failpoints(("flush-frozen-pausable", "off"))
# This is logged once compaction is started, but before we wait for operations to complete
assert env.pageserver.log_contains("compact_level0_phase1 stats available.")
conflict = q.get()
# Once we unblock uploads the compaction should complete successfully
log.info("Disabling failpoint")
client.configure_failpoints(("before-upload-layer-pausable", "off"))
log.info("Awaiting compaction result")
assert compact_result.get(timeout=10) is None
log.info("Awaiting checkpoint result")
assert checkpoint_result.get(timeout=10) is None
assert conflict is None
except Exception:
# Log the actual failure's backtrace here, before we proceed to join threads
log.exception("Failure, cleaning up...")
raise
finally:
create_thread.join()
compact_barrier.abort()
# Add a delay for the uploads to run into either the file not found or the
time.sleep(4)
checkpoint_thread.join()
compact_thread.join()
# Ensure that this actually terminates
wait_upload_queue_empty(client, tenant_id, timeline_id)
# For now we are hitting this message.
# Maybe in the future the underlying race condition will be fixed,
# but until then, ensure that this message is hit instead.
assert env.pageserver.log_contains(
# We should not have hit the error handling path in uploads where the remote file is gone
assert not env.pageserver.log_contains(
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
)