diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 37fa300467..907c3f9abb 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -3,6 +3,7 @@ mod list_writer; mod validator; use std::collections::HashMap; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Duration; @@ -157,6 +158,7 @@ pub struct DeletionQueueClient { executor_tx: tokio::sync::mpsc::Sender, lsn_table: Arc>, + max_layer_generation_in_queue: Arc, } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -382,6 +384,17 @@ pub enum DeletionQueueError { } impl DeletionQueueClient { + /// Returns if there is any layer file <= `gen` in the deletion queue. + pub(crate) fn maybe_processing_generation(&self, gen: Generation) -> bool { + if let Some(gen) = gen.into() { + self.max_layer_generation_in_queue + .load(std::sync::atomic::Ordering::SeqCst) + >= gen + } else { + false + } + } + /// This is cancel-safe. If you drop the future before it completes, the message /// is not pushed, although in the context of the deletion queue it doesn't matter: once /// we decide to do a deletion the decision is always final. @@ -505,6 +518,13 @@ impl DeletionQueueClient { metrics::DELETION_QUEUE .keys_submitted .inc_by(layers.len() as u64); + for (_, meta) in &layers { + let Some(gen) = meta.generation.into() else { + continue; + }; + self.max_layer_generation_in_queue + .fetch_max(gen, std::sync::atomic::Ordering::SeqCst); + } self.do_push( &self.tx, ListWriterQueueMessage::Delete(DeletionOp { @@ -651,6 +671,7 @@ impl DeletionQueue { tx, executor_tx: executor_tx.clone(), lsn_table: lsn_table.clone(), + max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)), }, cancel: cancel.clone(), }, @@ -1265,6 +1286,7 @@ pub(crate) mod mock { tx: self.tx.clone(), executor_tx: self.executor_tx.clone(), lsn_table: self.lsn_table.clone(), + max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)), } } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 2c54f5c991..bff5bd1bed 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -244,7 +244,7 @@ use self::index::IndexPart; use super::config::AttachedLocationConfig; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; -use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; +use super::upload_queue::{BarrierType, NotInitialized, SetDeletedFlagProgress}; use super::{DeleteTimelineError, Generation}; pub(crate) use download::{ @@ -896,7 +896,7 @@ impl RemoteTimelineClient { self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue, false)) + Some(self.schedule_wait_barrier0(upload_queue)) } }; @@ -935,7 +935,7 @@ impl RemoteTimelineClient { self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue, false)) + Some(self.schedule_wait_barrier0(upload_queue)) } }; @@ -974,7 +974,7 @@ impl RemoteTimelineClient { match (current, uploaded) { (x, y) if wanted(x) && wanted(y) => None, (x, y) if wanted(x) && !wanted(y) => { - Some(self.schedule_barrier0(upload_queue, false)) + Some(self.schedule_wait_barrier0(upload_queue)) } // Usual case: !wanted(x) && !wanted(y) // @@ -992,7 +992,7 @@ impl RemoteTimelineClient { .map(|x| x.with_reason(reason)) .or_else(|| Some(index::GcBlocking::started_now_for(reason))); self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue, false)) + Some(self.schedule_wait_barrier0(upload_queue)) } } }; @@ -1036,7 +1036,7 @@ impl RemoteTimelineClient { match (current, uploaded) { (x, y) if wanted(x) && wanted(y) => None, (x, y) if wanted(x) && !wanted(y) => { - Some(self.schedule_barrier0(upload_queue, false)) + Some(self.schedule_wait_barrier0(upload_queue)) } (x, y) => { if !wanted(x) && wanted(y) { @@ -1048,7 +1048,7 @@ impl RemoteTimelineClient { assert!(wanted(upload_queue.dirty.gc_blocking.as_ref())); // FIXME: bogus ? self.schedule_index_upload(upload_queue)?; - Some(self.schedule_barrier0(upload_queue, false)) + Some(self.schedule_wait_barrier0(upload_queue)) } } }; @@ -1304,7 +1304,7 @@ impl RemoteTimelineClient { let upload_queue = guard .initialized_mut() .map_err(WaitCompletionError::NotInitialized)?; - self.schedule_barrier0(upload_queue, false) + self.schedule_wait_barrier0(upload_queue) }; Self::wait_completion0(receiver).await @@ -1320,20 +1320,32 @@ impl RemoteTimelineClient { Ok(()) } - pub(crate) fn schedule_barrier(self: &Arc, initial_barrier: bool) -> anyhow::Result<()> { + pub fn schedule_initial_barrier(self: &Arc) -> anyhow::Result<()> { + self.schedule_barrier(BarrierType::Initial) + } + + fn schedule_barrier(self: &Arc, barrier: BarrierType) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - self.schedule_barrier0(upload_queue, initial_barrier); + self.schedule_barrier0(upload_queue, barrier); Ok(()) } + /// Schedule a barrier to wait for all previously scheduled operations to complete. + fn schedule_wait_barrier0( + self: &Arc, + upload_queue: &mut UploadQueueInitialized, + ) -> tokio::sync::watch::Receiver<()> { + self.schedule_barrier0(upload_queue, BarrierType::Normal) + } + fn schedule_barrier0( self: &Arc, upload_queue: &mut UploadQueueInitialized, - initial_barrier: bool, + barrier: BarrierType, ) -> tokio::sync::watch::Receiver<()> { let (sender, receiver) = tokio::sync::watch::channel(()); - let barrier_op = UploadOp::Barrier(sender, initial_barrier); + let barrier_op = UploadOp::Barrier(sender, barrier); upload_queue.queued_operations.push_back(barrier_op); // Don't count this kind of operation! @@ -1801,9 +1813,12 @@ impl RemoteTimelineClient { while let Some(next_op) = upload_queue.queued_operations.front() { // Can we run this task now? let can_run_now = match next_op { - UploadOp::UploadLayer(..) => { - // Can always be scheduled except when there's a barrier + UploadOp::UploadLayer(_, meta) => { + // Can always be scheduled except when there's a barrier, or if the deletion queue doesn't contain any file with the same/lower generation. upload_queue.num_inprogress_barriers == 0 + || !self + .deletion_queue_client + .maybe_processing_generation(meta.generation) } UploadOp::UploadMetadata { .. } => { // These can only be performed after all the preceding operations @@ -1853,12 +1868,12 @@ impl RemoteTimelineClient { UploadOp::Delete(_) => { upload_queue.num_inprogress_deletions += 1; } - UploadOp::Barrier(sender, false) => { + UploadOp::Barrier(sender, BarrierType::Normal) => { // For other barriers, simply send back the ack. sender.send_replace(()); continue; } - UploadOp::Barrier(_, true) => { + UploadOp::Barrier(_, BarrierType::Initial) => { // For initial barrier, we need to wait for deletions. upload_queue.num_inprogress_barriers += 1; } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5adbce268f..93e3b888c0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2649,7 +2649,7 @@ impl Timeline { // (1) and (4) ONLY IF generation number gets bumped. There are some cases where // we load a tenant without bumping the generation number (i.e., detach ancestor // and timeline offload/un-offload). In those cases, we need to rely on the barrier. - self.remote_client.schedule_barrier(true)?; + self.remote_client.schedule_initial_barrier()?; // Tenant::create_timeline will wait for these uploads to happen before returning, or // on retry. diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index a7500c4af9..ee9039b312 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -283,6 +283,14 @@ pub(crate) struct Delete { pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>, } +#[derive(Debug)] +pub(crate) enum BarrierType { + /// Barrier is a normal barrier, not an initial barrier. + Normal, + /// Barrier is an initial barrier, scheduled at timeline load. + Initial, +} + #[derive(Debug)] pub(crate) enum UploadOp { /// Upload a layer file @@ -301,7 +309,7 @@ pub(crate) enum UploadOp { /// The boolean value indicates whether the barrier is an initial barrier scheduled /// at timeline load -- if yes, we will need to wait for all deletions to be completed /// before the next upload. - Barrier(tokio::sync::watch::Sender<()>, bool), + Barrier(tokio::sync::watch::Sender<()>, BarrierType), /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise /// this is the same as a Barrier. @@ -328,8 +336,8 @@ impl std::fmt::Display for UploadOp { UploadOp::Delete(delete) => { write!(f, "Delete({} layers)", delete.layers.len()) } - UploadOp::Barrier(_, false) => write!(f, "Barrier"), - UploadOp::Barrier(_, true) => write!(f, "Barrier (initial)"), + UploadOp::Barrier(_, BarrierType::Normal) => write!(f, "Barrier"), + UploadOp::Barrier(_, BarrierType::Initial) => write!(f, "Barrier (initial)"), UploadOp::Shutdown => write!(f, "Shutdown"), } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index dae440b18d..9e778e6476 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4947,6 +4947,7 @@ def last_flush_lsn_upload( timeline_id: TimelineId, pageserver_id: int | None = None, auth_token: str | None = None, + wait_until_uploaded: bool = True, ) -> Lsn: """ Wait for pageserver to catch to the latest flush LSN of given endpoint, @@ -4960,7 +4961,9 @@ def last_flush_lsn_upload( for tenant_shard_id, pageserver in shards: ps_http = pageserver.http_client(auth_token=auth_token) wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) - ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_checkpoint( + tenant_shard_id, timeline_id, wait_until_uploaded=wait_until_uploaded + ) return last_flush_lsn @@ -4985,7 +4988,7 @@ def generate_uploads_and_deletions( timeline_id: TimelineId | None = None, data: str | None = None, pageserver: NeonPageserver, - wait_for_upload: bool = True, + wait_until_uploaded: bool = True, ): """ Using the environment's default tenant + timeline, generate a load pattern @@ -5008,7 +5011,12 @@ def generate_uploads_and_deletions( if init: endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + env, + endpoint, + tenant_id, + timeline_id, + pageserver_id=pageserver.id, + wait_until_uploaded=wait_until_uploaded, ) def churn(data): @@ -5031,7 +5039,12 @@ def generate_uploads_and_deletions( # in a state where there are "future layers" in remote storage that will generate deletions # after a restart. last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + env, + endpoint, + tenant_id, + timeline_id, + pageserver_id=pageserver.id, + wait_until_uploaded=wait_until_uploaded, ) # Compaction should generate some GC-elegible layers @@ -5047,4 +5060,4 @@ def generate_uploads_and_deletions( # background ingest, no more uploads pending, and therefore no non-determinism # in subsequent actions like pageserver restarts. flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) - ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_for_upload) + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_until_uploaded) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 4df624def3..feba43f7af 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -794,7 +794,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): if compact is not None: query["compact"] = "true" if compact else "false" - log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") + log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}") res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", params=query, diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index bc15b48645..6ba5753420 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -462,7 +462,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): # Because it is in emergency mode, it will not attempt to validate deletions required by the initial barrier, and therefore # other files cannot be uploaded b/c it's waiting for the initial barrier to be validated. generate_uploads_and_deletions( - env, init=False, pageserver=env.pageserver, wait_for_upload=False + env, init=False, pageserver=env.pageserver, wait_until_uploaded=False ) # The pageserver should neither validate nor execute any deletions, it should have