diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 83096faa9c..4c504527a0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -84,6 +84,25 @@ use utils::{ lsn::{Lsn, RecordLsn}, }; +/// Declare a failpoint that can use the `pause` failpoint action. +/// We don't want to block the executor thread, hence, spawn_blocking + await. +macro_rules! pausable_failpoint { + ($name:literal) => { + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint {}", $name); + fail::fail_point!($name); + } + }) + .await + .expect("spawn_blocking"); + } + }; +} + pub mod blob_io; pub mod block_io; pub mod disk_btree; @@ -1498,20 +1517,7 @@ impl Tenant { remote_client.delete_all().await.context("delete_all")? }; - // Have a failpoint that can use the `pause` failpoint action. - // We don't want to block the executor thread, hence, spawn_blocking + await. - if cfg!(feature = "testing") { - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!("at failpoint in_progress_delete"); - fail::fail_point!("in_progress_delete"); - } - }) - .await - .expect("spawn_blocking"); - } + pausable_failpoint!("in_progress_delete"); { // Remove the timeline from the map. diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index d5468b43d0..8c04794e92 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -748,20 +748,8 @@ impl RemoteTimelineClient { stopped.deleted_at = SetDeletedFlagProgress::NotRunning; }); - // Have a failpoint that can use the `pause` failpoint action. - // We don't want to block the executor thread, hence, spawn_blocking + await. - if cfg!(feature = "testing") { - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!("at failpoint persist_deleted_index_part"); - fail::fail_point!("persist_deleted_index_part"); - } - }) - .await - .expect("spawn_blocking"); - } + pausable_failpoint!("persist_deleted_index_part"); + upload::upload_index_part( self.conf, &self.storage_impl, diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 098c661622..0178ef520c 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -2,7 +2,7 @@ use anyhow::{bail, Context}; use fail::fail_point; -use std::path::Path; +use std::{io::ErrorKind, path::Path}; use tokio::fs; use crate::{config::PageServerConf, tenant::remote_timeline_client::index::IndexPart}; @@ -11,6 +11,8 @@ use utils::id::{TenantId, TimelineId}; use super::index::LayerFileMetadata; +use tracing::info; + /// Serializes and uploads the given index part data to the remote storage. pub(super) async fn upload_index_part<'a>( conf: &'static PageServerConf, @@ -56,9 +58,22 @@ pub(super) async fn upload_timeline_layer<'a>( }); let storage_path = conf.remote_path(source_path)?; - let source_file = fs::File::open(&source_path) - .await - .with_context(|| format!("Failed to open a source file for layer {source_path:?}"))?; + let source_file_res = fs::File::open(&source_path).await; + let source_file = match source_file_res { + Ok(source_file) => source_file, + Err(e) if e.kind() == ErrorKind::NotFound => { + // In some situations we might run into the underlying file being deleted by + // e.g. compaction before the uploader gets to it. In that instance, we don't + // want to retry the error: a deleted file won't come back. In theory, the + // file might not have been written in the first place, which also indicates + // a bug. Still log the situation so that we can keep an eye on it. + // See https://github.com/neondatabase/neon/issues/4526 + info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."); + return Ok(()); + } + Err(e) => Err(e) + .with_context(|| format!("Failed to open a source file for layer {source_path:?}"))?, + }; let fs_size = source_file .metadata() diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 58a2adbb58..d589f10570 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2918,7 +2918,7 @@ impl Timeline { HashMap::from([(delta_path, metadata)]) }; - fail_point!("flush-frozen-before-sync"); + pausable_failpoint!("flush-frozen-before-sync"); // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. We do not modify `LayerFileManager` because diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 9a70266314..1b6e39ff31 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -777,6 +777,95 @@ def test_empty_branch_remote_storage_upload_on_restart( create_thread.join() +# Regression test for a race condition where files are compactified before the upload, +# resulting in the uploading complaining about the file not being found +# https://github.com/neondatabase/neon/issues/4526 +@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) +def test_compaction_delete_before_upload( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_compaction_delete_before_upload", + ) + + env = neon_env_builder.init_start() + + # create tenant with config that will determinstically allow + # compaction and disables gc + tenant_id, timeline_id = env.neon_cli.create_tenant( + conf={ + # Set a small compaction threshold + "compaction_threshold": "3", + # Disable GC + "gc_period": "0s", + # disable PITR + "pitr_interval": "0s", + } + ) + + client = env.pageserver.http_client() + + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + # Build two tables with some data inside + endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)") + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + + client.timeline_checkpoint(tenant_id, timeline_id) + + endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)") + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + + # Now make the flushing hang and update one small piece of data + client.configure_failpoints(("flush-frozen-before-sync", "pause")) + + endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") + + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + + q: queue.Queue[Optional[PageserverApiException]] = queue.Queue() + barrier = threading.Barrier(2) + + def checkpoint_in_background(): + barrier.wait() + try: + client.timeline_checkpoint(tenant_id, timeline_id) + q.put(None) + except PageserverApiException as e: + q.put(e) + + create_thread = threading.Thread(target=checkpoint_in_background) + create_thread.start() + + try: + barrier.wait() + + time.sleep(4) + client.timeline_compact(tenant_id, timeline_id) + + client.configure_failpoints(("flush-frozen-before-sync", "off")) + + conflict = q.get() + + assert conflict is None + finally: + create_thread.join() + + # Add a delay for the uploads to run into either the file not found or the + time.sleep(4) + + # Ensure that this actually terminates + wait_upload_queue_empty(client, tenant_id, timeline_id) + + # For now we are hitting this message. + # Maybe in the future the underlying race condition will be fixed, + # but until then, ensure that this message is hit instead. + assert env.pageserver.log_contains( + "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." + ) + + def wait_upload_queue_empty( client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId ):