From a0b862a8bd63bfbad4cf509774e47436ec79ebf0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 24 Oct 2023 13:57:01 +0100 Subject: [PATCH] pageserver: schedule frozen layer uploads inside the layers lock (#5639) ## Problem Compaction's source of truth for what layers exist is the LayerManager. `flush_frozen_layer` updates LayerManager before it has scheduled upload of the frozen layer. Compaction can then "see" the new layer, decide to delete it, schedule uploads of replacement layers, all before `flush_frozen_layer` wakes up again and schedules the upload. When the upload is scheduled, the local layer file may be gone, in which case we end up with no such layer in remote storage, but an entry still added to IndexPart pointing to the missing layer. ## Summary of changes Schedule layer uploads inside the `self.layers` lock, so that whenever a frozen layer is present in LayerManager, it is also present in RemoteTimelineClient's metadata. Closes: #5635 --- .../tenant/remote_timeline_client/upload.rs | 2 + pageserver/src/tenant/timeline.rs | 52 ++++++++---- test_runner/regress/test_remote_storage.py | 83 +++++++++++++------ 3 files changed, 94 insertions(+), 43 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 90e603deb0..03ba137566 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -60,6 +60,8 @@ pub(super) async fn upload_timeline_layer<'a>( bail!("failpoint before-upload-layer") }); + pausable_failpoint!("before-upload-layer-pausable"); + let storage_path = remote_path(conf, source_path, generation)?; let source_file_res = fs::File::open(&source_path).await; let source_file = match source_file_res { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2c76155e2a..8ce6aa2108 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2793,10 +2793,13 @@ impl Timeline { ) }; + let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1); + let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); + // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. The flushed layer is stored in // the mapping in `create_delta_layer`. - { + let metadata = { let mut guard = self.layers.write().await; if let Some(ref l) = delta_layer_to_add { @@ -2812,8 +2815,17 @@ impl Timeline { } guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer); + if disk_consistent_lsn != old_disk_consistent_lsn { + assert!(disk_consistent_lsn > old_disk_consistent_lsn); + self.disk_consistent_lsn.store(disk_consistent_lsn); + + // Schedule remote uploads that will reflect our new disk_consistent_lsn + Some(self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?) + } else { + None + } // release lock on 'layers' - } + }; // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, // a compaction can delete the file and then it won't be available for uploads any more. @@ -2829,28 +2841,22 @@ impl Timeline { // // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing // *all* the layers, to avoid fsyncing the file multiple times. - let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1); - let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); - // If we were able to advance 'disk_consistent_lsn', save it the metadata file. - // After crash, we will restart WAL streaming and processing from that point. - if disk_consistent_lsn != old_disk_consistent_lsn { - assert!(disk_consistent_lsn > old_disk_consistent_lsn); - self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload) + // If we updated our disk_consistent_lsn, persist the updated metadata to local disk. + if let Some(metadata) = metadata { + save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata) .await - .context("update_metadata_file")?; - // Also update the in-memory copy - self.disk_consistent_lsn.store(disk_consistent_lsn); + .context("save_metadata")?; } Ok(()) } /// Update metadata file - async fn update_metadata_file( + fn schedule_uploads( &self, disk_consistent_lsn: Lsn, layer_paths_to_upload: HashMap, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { // We can only save a valid 'prev_record_lsn' value on disk if we // flushed *all* in-memory changes to disk. We only track // 'prev_record_lsn' in memory for the latest processed record, so we @@ -2887,10 +2893,6 @@ impl Timeline { x.unwrap() )); - save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata) - .await - .context("save_metadata")?; - if let Some(remote_client) = &self.remote_client { for (path, layer_metadata) in layer_paths_to_upload { remote_client.schedule_layer_file_upload(&path, &layer_metadata)?; @@ -2898,6 +2900,20 @@ impl Timeline { remote_client.schedule_index_upload_for_metadata_update(&metadata)?; } + Ok(metadata) + } + + async fn update_metadata_file( + &self, + disk_consistent_lsn: Lsn, + layer_paths_to_upload: HashMap, + ) -> anyhow::Result<()> { + let metadata = self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?; + + save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata) + .await + .context("save_metadata")?; + Ok(()) } diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index ecd02c4c16..f4bf9207b0 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -757,12 +757,14 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv create_thread.join() -# Regression test for a race condition where L0 layers are compacted before the upload, -# resulting in the uploading complaining about the file not being found -# https://github.com/neondatabase/neon/issues/4526 -def test_compaction_delete_before_upload( +def test_compaction_waits_for_upload( neon_env_builder: NeonEnvBuilder, ): + """ + Compaction waits for outstanding uploads to complete, so that it avoids deleting layers + files that have not yet been uploaded. This test forces a race between upload and + compaction. + """ neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start( @@ -792,50 +794,81 @@ def test_compaction_delete_before_upload( wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) # Now make the flushing hang and update one small piece of data - client.configure_failpoints(("flush-frozen-pausable", "pause")) + client.configure_failpoints(("before-upload-layer-pausable", "pause")) endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - q: queue.Queue[Optional[PageserverApiException]] = queue.Queue() - barrier = threading.Barrier(2) + checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + compact_barrier = threading.Barrier(2) def checkpoint_in_background(): - barrier.wait() try: + log.info("Checkpoint starting") client.timeline_checkpoint(tenant_id, timeline_id) - q.put(None) + log.info("Checkpoint complete") + checkpoint_result.put(None) except PageserverApiException as e: - q.put(e) + log.info("Checkpoint errored: {e}") + checkpoint_result.put(e) - create_thread = threading.Thread(target=checkpoint_in_background) - create_thread.start() + def compact_in_background(): + compact_barrier.wait() + try: + log.info("Compaction starting") + client.timeline_compact(tenant_id, timeline_id) + log.info("Compaction complete") + compact_result.put(None) + except PageserverApiException as e: + log.info("Compaction errored: {e}") + compact_result.put(e) + + checkpoint_thread = threading.Thread(target=checkpoint_in_background) + checkpoint_thread.start() + + compact_thread = threading.Thread(target=compact_in_background) + compact_thread.start() try: - barrier.wait() + # Start the checkpoint, see that it blocks + log.info("Waiting to see checkpoint hang...") + time.sleep(5) + assert checkpoint_result.empty() - time.sleep(4) - client.timeline_compact(tenant_id, timeline_id) + # Start the compaction, see that it finds work to do but blocks + compact_barrier.wait() + log.info("Waiting to see compaction hang...") + time.sleep(5) + assert compact_result.empty() - client.configure_failpoints(("flush-frozen-pausable", "off")) + # This is logged once compaction is started, but before we wait for operations to complete + assert env.pageserver.log_contains("compact_level0_phase1 stats available.") - conflict = q.get() + # Once we unblock uploads the compaction should complete successfully + log.info("Disabling failpoint") + client.configure_failpoints(("before-upload-layer-pausable", "off")) + log.info("Awaiting compaction result") + assert compact_result.get(timeout=10) is None + log.info("Awaiting checkpoint result") + assert checkpoint_result.get(timeout=10) is None - assert conflict is None + except Exception: + # Log the actual failure's backtrace here, before we proceed to join threads + log.exception("Failure, cleaning up...") + raise finally: - create_thread.join() + compact_barrier.abort() - # Add a delay for the uploads to run into either the file not found or the - time.sleep(4) + checkpoint_thread.join() + compact_thread.join() # Ensure that this actually terminates wait_upload_queue_empty(client, tenant_id, timeline_id) - # For now we are hitting this message. - # Maybe in the future the underlying race condition will be fixed, - # but until then, ensure that this message is hit instead. - assert env.pageserver.log_contains( + # We should not have hit the error handling path in uploads where the remote file is gone + assert not env.pageserver.log_contains( "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." )