diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 90e603deb0..03ba137566 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -60,6 +60,8 @@ pub(super) async fn upload_timeline_layer<'a>( bail!("failpoint before-upload-layer") }); + pausable_failpoint!("before-upload-layer-pausable"); + let storage_path = remote_path(conf, source_path, generation)?; let source_file_res = fs::File::open(&source_path).await; let source_file = match source_file_res { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2c76155e2a..8ce6aa2108 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2793,10 +2793,13 @@ impl Timeline { ) }; + let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1); + let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); + // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. The flushed layer is stored in // the mapping in `create_delta_layer`. - { + let metadata = { let mut guard = self.layers.write().await; if let Some(ref l) = delta_layer_to_add { @@ -2812,8 +2815,17 @@ impl Timeline { } guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer); + if disk_consistent_lsn != old_disk_consistent_lsn { + assert!(disk_consistent_lsn > old_disk_consistent_lsn); + self.disk_consistent_lsn.store(disk_consistent_lsn); + + // Schedule remote uploads that will reflect our new disk_consistent_lsn + Some(self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?) + } else { + None + } // release lock on 'layers' - } + }; // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, // a compaction can delete the file and then it won't be available for uploads any more. @@ -2829,28 +2841,22 @@ impl Timeline { // // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing // *all* the layers, to avoid fsyncing the file multiple times. - let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1); - let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); - // If we were able to advance 'disk_consistent_lsn', save it the metadata file. - // After crash, we will restart WAL streaming and processing from that point. - if disk_consistent_lsn != old_disk_consistent_lsn { - assert!(disk_consistent_lsn > old_disk_consistent_lsn); - self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload) + // If we updated our disk_consistent_lsn, persist the updated metadata to local disk. + if let Some(metadata) = metadata { + save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata) .await - .context("update_metadata_file")?; - // Also update the in-memory copy - self.disk_consistent_lsn.store(disk_consistent_lsn); + .context("save_metadata")?; } Ok(()) } /// Update metadata file - async fn update_metadata_file( + fn schedule_uploads( &self, disk_consistent_lsn: Lsn, layer_paths_to_upload: HashMap, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { // We can only save a valid 'prev_record_lsn' value on disk if we // flushed *all* in-memory changes to disk. We only track // 'prev_record_lsn' in memory for the latest processed record, so we @@ -2887,10 +2893,6 @@ impl Timeline { x.unwrap() )); - save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata) - .await - .context("save_metadata")?; - if let Some(remote_client) = &self.remote_client { for (path, layer_metadata) in layer_paths_to_upload { remote_client.schedule_layer_file_upload(&path, &layer_metadata)?; @@ -2898,6 +2900,20 @@ impl Timeline { remote_client.schedule_index_upload_for_metadata_update(&metadata)?; } + Ok(metadata) + } + + async fn update_metadata_file( + &self, + disk_consistent_lsn: Lsn, + layer_paths_to_upload: HashMap, + ) -> anyhow::Result<()> { + let metadata = self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?; + + save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata) + .await + .context("save_metadata")?; + Ok(()) } diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index ecd02c4c16..f4bf9207b0 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -757,12 +757,14 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv create_thread.join() -# Regression test for a race condition where L0 layers are compacted before the upload, -# resulting in the uploading complaining about the file not being found -# https://github.com/neondatabase/neon/issues/4526 -def test_compaction_delete_before_upload( +def test_compaction_waits_for_upload( neon_env_builder: NeonEnvBuilder, ): + """ + Compaction waits for outstanding uploads to complete, so that it avoids deleting layers + files that have not yet been uploaded. This test forces a race between upload and + compaction. + """ neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start( @@ -792,50 +794,81 @@ def test_compaction_delete_before_upload( wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) # Now make the flushing hang and update one small piece of data - client.configure_failpoints(("flush-frozen-pausable", "pause")) + client.configure_failpoints(("before-upload-layer-pausable", "pause")) endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - q: queue.Queue[Optional[PageserverApiException]] = queue.Queue() - barrier = threading.Barrier(2) + checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + compact_barrier = threading.Barrier(2) def checkpoint_in_background(): - barrier.wait() try: + log.info("Checkpoint starting") client.timeline_checkpoint(tenant_id, timeline_id) - q.put(None) + log.info("Checkpoint complete") + checkpoint_result.put(None) except PageserverApiException as e: - q.put(e) + log.info("Checkpoint errored: {e}") + checkpoint_result.put(e) - create_thread = threading.Thread(target=checkpoint_in_background) - create_thread.start() + def compact_in_background(): + compact_barrier.wait() + try: + log.info("Compaction starting") + client.timeline_compact(tenant_id, timeline_id) + log.info("Compaction complete") + compact_result.put(None) + except PageserverApiException as e: + log.info("Compaction errored: {e}") + compact_result.put(e) + + checkpoint_thread = threading.Thread(target=checkpoint_in_background) + checkpoint_thread.start() + + compact_thread = threading.Thread(target=compact_in_background) + compact_thread.start() try: - barrier.wait() + # Start the checkpoint, see that it blocks + log.info("Waiting to see checkpoint hang...") + time.sleep(5) + assert checkpoint_result.empty() - time.sleep(4) - client.timeline_compact(tenant_id, timeline_id) + # Start the compaction, see that it finds work to do but blocks + compact_barrier.wait() + log.info("Waiting to see compaction hang...") + time.sleep(5) + assert compact_result.empty() - client.configure_failpoints(("flush-frozen-pausable", "off")) + # This is logged once compaction is started, but before we wait for operations to complete + assert env.pageserver.log_contains("compact_level0_phase1 stats available.") - conflict = q.get() + # Once we unblock uploads the compaction should complete successfully + log.info("Disabling failpoint") + client.configure_failpoints(("before-upload-layer-pausable", "off")) + log.info("Awaiting compaction result") + assert compact_result.get(timeout=10) is None + log.info("Awaiting checkpoint result") + assert checkpoint_result.get(timeout=10) is None - assert conflict is None + except Exception: + # Log the actual failure's backtrace here, before we proceed to join threads + log.exception("Failure, cleaning up...") + raise finally: - create_thread.join() + compact_barrier.abort() - # Add a delay for the uploads to run into either the file not found or the - time.sleep(4) + checkpoint_thread.join() + compact_thread.join() # Ensure that this actually terminates wait_upload_queue_empty(client, tenant_id, timeline_id) - # For now we are hitting this message. - # Maybe in the future the underlying race condition will be fixed, - # but until then, ensure that this message is hit instead. - assert env.pageserver.log_contains( + # We should not have hit the error handling path in uploads where the remote file is gone + assert not env.pageserver.log_contains( "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." )