diff --git a/pageserver/src/deletion_queue/deleter.rs b/pageserver/src/deletion_queue/deleter.rs index c2b6ec6c45..3d02387c98 100644 --- a/pageserver/src/deletion_queue/deleter.rs +++ b/pageserver/src/deletion_queue/deleter.rs @@ -10,12 +10,12 @@ use remote_storage::GenericRemoteStorage; use remote_storage::RemotePath; use remote_storage::TimeoutOrCancel; use remote_storage::MAX_KEYS_PER_DELETE; -use utils::pausable_failpoint; use std::time::Duration; use tokio_util::sync::CancellationToken; use tracing::info; use tracing::warn; use utils::backoff; +use utils::pausable_failpoint; use crate::metrics; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 94f42c7827..e78fc5cef8 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -822,7 +822,7 @@ impl RemoteTimelineClient { self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue)) + Some(self.schedule_barrier0(upload_queue, false)) } }; @@ -861,7 +861,7 @@ impl RemoteTimelineClient { self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue)) + Some(self.schedule_barrier0(upload_queue, false)) } }; @@ -899,7 +899,9 @@ impl RemoteTimelineClient { match (current, uploaded) { (x, y) if wanted(x) && wanted(y) => None, - (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)), + (x, y) if wanted(x) && !wanted(y) => { + Some(self.schedule_barrier0(upload_queue, false)) + } // Usual case: !wanted(x) && !wanted(y) // // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to @@ -916,7 +918,7 @@ impl RemoteTimelineClient { .map(|x| x.with_reason(reason)) .or_else(|| Some(index::GcBlocking::started_now_for(reason))); self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue)) + Some(self.schedule_barrier0(upload_queue, false)) } } }; @@ -959,7 +961,9 @@ impl RemoteTimelineClient { match (current, uploaded) { (x, y) if wanted(x) && wanted(y) => None, - (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)), + (x, y) if wanted(x) && !wanted(y) => { + Some(self.schedule_barrier0(upload_queue, false)) + } (x, y) => { if !wanted(x) && wanted(y) { warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)"); @@ -970,7 +974,7 @@ impl RemoteTimelineClient { assert!(wanted(upload_queue.dirty.gc_blocking.as_ref())); // FIXME: bogus ? self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue)) + Some(self.schedule_barrier0(upload_queue, false)) } } }; @@ -1226,7 +1230,7 @@ impl RemoteTimelineClient { let upload_queue = guard .initialized_mut() .map_err(WaitCompletionError::NotInitialized)?; - self.schedule_barrier0(upload_queue) + self.schedule_barrier0(upload_queue, false) }; Self::wait_completion0(receiver).await @@ -1242,19 +1246,20 @@ impl RemoteTimelineClient { Ok(()) } - pub(crate) fn schedule_barrier(self: &Arc) -> anyhow::Result<()> { + pub(crate) fn schedule_barrier(self: &Arc, initial_barrier: bool) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - self.schedule_barrier0(upload_queue); + self.schedule_barrier0(upload_queue, initial_barrier); Ok(()) } fn schedule_barrier0( self: &Arc, upload_queue: &mut UploadQueueInitialized, + initial_barrier: bool, ) -> tokio::sync::watch::Receiver<()> { let (sender, receiver) = tokio::sync::watch::channel(()); - let barrier_op = UploadOp::Barrier(sender); + let barrier_op = UploadOp::Barrier(sender, initial_barrier); upload_queue.queued_operations.push_back(barrier_op); // Don't count this kind of operation! @@ -1723,8 +1728,8 @@ impl RemoteTimelineClient { // Can we run this task now? let can_run_now = match next_op { UploadOp::UploadLayer(..) => { - // Can always be scheduled. - true + // Can always be scheduled except when there's a barrier + upload_queue.num_inprogress_barriers == 0 } UploadOp::UploadMetadata { .. } => { // These can only be performed after all the preceding operations @@ -1736,7 +1741,7 @@ impl RemoteTimelineClient { upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() } - UploadOp::Barrier(_) | UploadOp::Shutdown => { + UploadOp::Barrier(_, _) | UploadOp::Shutdown => { upload_queue.inprogress_tasks.is_empty() } }; @@ -1774,10 +1779,15 @@ impl RemoteTimelineClient { UploadOp::Delete(_) => { upload_queue.num_inprogress_deletions += 1; } - UploadOp::Barrier(sender) => { + UploadOp::Barrier(sender, false) => { + // For other barriers, simply send back the ack. sender.send_replace(()); continue; } + UploadOp::Barrier(_, true) => { + // For initial barrier, we need to wait for deletions. + upload_queue.num_inprogress_barriers += 1; + } UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"), }; @@ -1911,19 +1921,25 @@ impl RemoteTimelineClient { } res } - UploadOp::Delete(delete) => { - pausable_failpoint!("before-delete-layer-pausable"); - self.deletion_queue_client - .push_layers( - self.tenant_shard_id, - self.timeline_id, - self.generation, - delete.layers.clone(), - ) - .await - .map_err(|e| anyhow::anyhow!(e)) - } - unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => { + UploadOp::Delete(delete) => self + .deletion_queue_client + .push_layers( + self.tenant_shard_id, + self.timeline_id, + self.generation, + delete.layers.clone(), + ) + .await + .map_err(|e| anyhow::anyhow!(e)), + // Barrier flushes up the deletion queue. Usually, we don't wait until deletion + // completes before returning from `deletion_queue.push_layers` due to generation + // optimizations, but in case of a barrier scheduled, we need to wait. + UploadOp::Barrier(_, _) => self + .deletion_queue_client + .flush_execute() + .await + .map_err(|e| anyhow::anyhow!(e)), + unexpected @ UploadOp::Shutdown => { // unreachable. Barrier operations are handled synchronously in // launch_queued_tasks warn!("unexpected {unexpected:?} operation in perform_upload_task"); @@ -1933,6 +1949,10 @@ impl RemoteTimelineClient { match upload_result { Ok(()) => { + if let UploadOp::Barrier(sender, _) = &task.op { + // Notify the caller that the barrier has been reached. + sender.send(()).ok(); + } break; } Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => { @@ -2036,7 +2056,11 @@ impl RemoteTimelineClient { upload_queue.num_inprogress_deletions -= 1; None } - UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(), + UploadOp::Barrier(..) => { + upload_queue.num_inprogress_barriers -= 1; + None + } + UploadOp::Shutdown => unreachable!(), }; // Launch any queued tasks that were unblocked by this one. @@ -2162,6 +2186,7 @@ impl RemoteTimelineClient { num_inprogress_layer_uploads: 0, num_inprogress_metadata_uploads: 0, num_inprogress_deletions: 0, + num_inprogress_barriers: 0, inprogress_tasks: HashMap::default(), queued_operations: VecDeque::default(), #[cfg(feature = "testing")] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a87b4278a8..7eedf2ef0a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2611,7 +2611,7 @@ impl Timeline { // (1) and (4) ONLY IF generation number gets bumped. There are some cases where // we load a tenant without bumping the generation number (i.e., detach ancestor // and timeline offload/un-offload). In those cases, we need to rely on the barrier. - self.remote_client.schedule_barrier()?; + self.remote_client.schedule_barrier(true)?; // Tenant::create_timeline will wait for these uploads to happen before returning, or // on retry. diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 592f41cb21..4edbdb3a9d 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -68,6 +68,7 @@ pub(crate) struct UploadQueueInitialized { pub(crate) num_inprogress_layer_uploads: usize, pub(crate) num_inprogress_metadata_uploads: usize, pub(crate) num_inprogress_deletions: usize, + pub(crate) num_inprogress_barriers: usize, /// Tasks that are currently in-progress. In-progress means that a tokio Task /// has been launched for it. An in-progress task can be busy uploading, but it can @@ -176,6 +177,7 @@ impl UploadQueue { num_inprogress_layer_uploads: 0, num_inprogress_metadata_uploads: 0, num_inprogress_deletions: 0, + num_inprogress_barriers: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), #[cfg(feature = "testing")] @@ -216,6 +218,7 @@ impl UploadQueue { num_inprogress_layer_uploads: 0, num_inprogress_metadata_uploads: 0, num_inprogress_deletions: 0, + num_inprogress_barriers: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), #[cfg(feature = "testing")] @@ -290,7 +293,10 @@ pub(crate) enum UploadOp { Delete(Delete), /// Barrier. When the barrier operation is reached, the channel is closed. - Barrier(tokio::sync::watch::Sender<()>), + /// The boolean value indicates whether the barrier is an initial barrier scheduled + /// at timeline load -- if yes, we will need to wait for all deletions to be completed + /// before the next upload. + Barrier(tokio::sync::watch::Sender<()>, bool), /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise /// this is the same as a Barrier. @@ -317,7 +323,8 @@ impl std::fmt::Display for UploadOp { UploadOp::Delete(delete) => { write!(f, "Delete({} layers)", delete.layers.len()) } - UploadOp::Barrier(_) => write!(f, "Barrier"), + UploadOp::Barrier(_, false) => write!(f, "Barrier"), + UploadOp::Barrier(_, true) => write!(f, "Barrier (initial)"), UploadOp::Shutdown => write!(f, "Shutdown"), } } diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 9acc3da4bb..3afe87bc30 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -174,10 +174,8 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str): generation_before_detach = get_generation_number() env.pageserver.tenant_detach(tenant_id) failpoint_deletion_queue = "deletion-queue-before-execute-pause" - failpoint_upload_queue = "before-delete-layer-pausable" ps_http.configure_failpoints((failpoint_deletion_queue, "pause")) - ps_http.configure_failpoints((failpoint_upload_queue, "off")) if attach_mode == "default_generation": env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) @@ -270,3 +268,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str): final_stat = future_layer_path.stat() log.info(f"future layer path: {future_layer_path}") assert final_stat.st_mtime != pre_stat.st_mtime + + # Ensure no weird errors in the end... + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)