From b7ff5cfca7b2f0833a98f98dae5af3a86d72d4eb Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 24 Oct 2023 11:23:36 +0100 Subject: [PATCH] tests: fix test_compaction_delete_before_upload --- .../tenant/remote_timeline_client/upload.rs | 2 + test_runner/regress/test_remote_storage.py | 84 +++++++++++++------ 2 files changed, 61 insertions(+), 25 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 90e603deb0..03ba137566 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -60,6 +60,8 @@ pub(super) async fn upload_timeline_layer<'a>( bail!("failpoint before-upload-layer") }); + pausable_failpoint!("before-upload-layer-pausable"); + let storage_path = remote_path(conf, source_path, generation)?; let source_file_res = fs::File::open(&source_path).await; let source_file = match source_file_res { diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index ecd02c4c16..3dcece4fcf 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -757,12 +757,14 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv create_thread.join() -# Regression test for a race condition where L0 layers are compacted before the upload, -# resulting in the uploading complaining about the file not being found -# https://github.com/neondatabase/neon/issues/4526 -def test_compaction_delete_before_upload( +def test_compaction_waits_for_upload( neon_env_builder: NeonEnvBuilder, ): + """ + Compaction waits for outstanding uploads to complete, so that it avoids deleting layers + files that have not yet been uploaded. This test forces a race between upload and + compaction. + """ neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start( @@ -792,50 +794,82 @@ def test_compaction_delete_before_upload( wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) # Now make the flushing hang and update one small piece of data - client.configure_failpoints(("flush-frozen-pausable", "pause")) + client.configure_failpoints(("before-upload-layer-pausable", "pause")) endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - q: queue.Queue[Optional[PageserverApiException]] = queue.Queue() - barrier = threading.Barrier(2) + checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + compact_barrier = threading.Barrier(2) def checkpoint_in_background(): - barrier.wait() try: + log.info("Checkpoint starting") client.timeline_checkpoint(tenant_id, timeline_id) - q.put(None) + log.info("Checkpoint complete") + checkpoint_result.put(None) except PageserverApiException as e: - q.put(e) + log.info("Checkpoint errored: {e}") + checkpoint_result.put(e) - create_thread = threading.Thread(target=checkpoint_in_background) - create_thread.start() + def compact_in_background(): + compact_barrier.wait() + try: + log.info("Compaction starting") + client.timeline_compact(tenant_id, timeline_id) + log.info("Compaction complete") + compact_result.put(None) + except PageserverApiException as e: + log.info("Compaction errored: {e}") + compact_result.put(e) + + checkpoint_thread = threading.Thread(target=checkpoint_in_background) + checkpoint_thread.start() + + compact_thread = threading.Thread(target=compact_in_background) + compact_thread.start() try: - barrier.wait() + # Start the checkpoint, see that it blocks + log.info("Waiting to see checkpoint hang...") + time.sleep(5) + assert checkpoint_result.empty() - time.sleep(4) - client.timeline_compact(tenant_id, timeline_id) + # Start the compaction, see that it finds work to do but blocks + compact_barrier.wait() + log.info("Waiting to see compaction hang...") + time.sleep(5) + assert compact_barrier.n_waiting == 0 + assert compact_result.empty() - client.configure_failpoints(("flush-frozen-pausable", "off")) + # This is logged once compaction is started, but before we wait for operations to complete + assert env.pageserver.log_contains("compact_level0_phase1 stats available.") - conflict = q.get() + # Once we unblock uploads the compaction should complete successfully + log.info("Disabling failpoint") + client.configure_failpoints(("before-upload-layer-pausable", "off")) + log.info("Awaiting compaction result") + assert compact_result.get(timeout=10) is None + log.info("Awaiting checkpoint result") + assert checkpoint_result.get(timeout=10) is None - assert conflict is None + except Exception: + # Log the actual failure's backtrace here, before we proceed to join threads + log.exception("Failure, cleaning up...") + raise finally: - create_thread.join() + compact_barrier.abort() - # Add a delay for the uploads to run into either the file not found or the - time.sleep(4) + checkpoint_thread.join() + compact_thread.join() # Ensure that this actually terminates wait_upload_queue_empty(client, tenant_id, timeline_id) - # For now we are hitting this message. - # Maybe in the future the underlying race condition will be fixed, - # but until then, ensure that this message is hit instead. - assert env.pageserver.log_contains( + # We should not have hit the error handling path in uploads where the remote file is gone + assert not env.pageserver.log_contains( "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." )