diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 447d459435..0f36808b4a 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -139,41 +139,13 @@ pub struct DeletionQueueClient { tx: tokio::sync::mpsc::Sender, executor_tx: tokio::sync::mpsc::Sender, - lsn_table: Arc>, + lsn_table: Arc>, } mod deletion_list; use deletion_list::*; -struct PendingLsn { - projected: Lsn, - result_slot: Arc, -} - -struct TenantLsnState { - timelines: HashMap, - - // In what generation was the most recent update proposed? - generation: Generation, -} - -struct VisibleLsnUpdates { - tenants: HashMap, -} - -impl VisibleLsnUpdates { - fn new() -> Self { - Self { - tenants: HashMap::new(), - } - } -} - -impl std::fmt::Debug for VisibleLsnUpdates { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len()) - } -} +mod lsn_visibility; #[derive(Error, Debug)] pub enum DeletionQueueError { @@ -189,7 +161,9 @@ impl DeletionQueueClient { Self { tx, executor_tx, - lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())), + lsn_table: Arc::new(std::sync::RwLock::new( + lsn_visibility::VisibleLsnUpdates::new(), + )), } } @@ -241,10 +215,14 @@ impl DeletionQueueClient { .write() .expect("Lock should never be poisoned"); - let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState { - timelines: HashMap::new(), - generation: current_generation, - }); + let tenant_entry = + locked + .tenants + .entry(tenant_id) + .or_insert(lsn_visibility::TenantLsnState { + timelines: HashMap::new(), + generation: current_generation, + }); if tenant_entry.generation != current_generation { // Generation might have changed if we were detached and then re-attached: in this case, @@ -255,7 +233,7 @@ impl DeletionQueueClient { tenant_entry.timelines.insert( timeline_id, - PendingLsn { + lsn_visibility::PendingLsn { projected: lsn, result_slot, }, @@ -419,7 +397,9 @@ impl DeletionQueue { // happen in the backend (persistent), not in this queue. let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16); - let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())); + let lsn_table = Arc::new(std::sync::RwLock::new( + lsn_visibility::VisibleLsnUpdates::new(), + )); // The deletion queue has an independent cancellation token to // the general pageserver shutdown token, because it stays alive a bit @@ -1034,7 +1014,7 @@ pub mod mock { executed: Arc, remote_storage: Option, consumer: std::sync::Mutex, - lsn_table: Arc>, + lsn_table: Arc>, } impl MockDeletionQueue { @@ -1050,7 +1030,9 @@ pub mod mock { executed, remote_storage, consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }), - lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())), + lsn_table: Arc::new(std::sync::RwLock::new( + lsn_visibility::VisibleLsnUpdates::new(), + )), } } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index 7777ded893..88334f5f88 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -13,11 +13,11 @@ use crate::metrics::DELETION_QUEUE_DROPPED; use crate::metrics::DELETION_QUEUE_ERRORS; use super::executor::ExecutorMessage; +use super::lsn_visibility; use super::DeletionHeader; use super::DeletionList; use super::DeletionQueueError; use super::FlushOp; -use super::VisibleLsnUpdates; // After this length of time, do any validation work that is pending, // even if we haven't accumulated many keys to delete. @@ -58,7 +58,7 @@ where // Lsn validation state: we read projected LSNs and write back visible LSNs // after validation. This is the LSN equivalent of `pending_validation_lists`: // it is drained in [`validate`] - lsn_table: Arc>, + lsn_table: Arc>, cancel: CancellationToken, } @@ -72,7 +72,7 @@ where rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, control_plane_client: Option, - lsn_table: Arc>, + lsn_table: Arc>, cancel: CancellationToken, ) -> Self { Self { @@ -122,7 +122,7 @@ where let pending_lsn_updates = { let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned"); - let mut pending_updates = VisibleLsnUpdates::new(); + let mut pending_updates = lsn_visibility::VisibleLsnUpdates::new(); std::mem::swap(&mut pending_updates, &mut lsn_table); pending_updates }; diff --git a/pageserver/src/deletion_queue/lsn_visibility.rs b/pageserver/src/deletion_queue/lsn_visibility.rs new file mode 100644 index 0000000000..33ff78c7f3 --- /dev/null +++ b/pageserver/src/deletion_queue/lsn_visibility.rs @@ -0,0 +1,43 @@ +use utils::id::TenantId; + +use utils::generation::Generation; + +use utils::id::TimelineId; + +use std::collections::HashMap; + +use utils::lsn::AtomicLsn; + +use std::sync::Arc; + +use utils::lsn::Lsn; + +pub(crate) struct PendingLsn { + pub(crate) projected: Lsn, + pub(crate) result_slot: Arc, +} + +pub(crate) struct TenantLsnState { + pub(crate) timelines: HashMap, + + // In what generation was the most recent update proposed? + pub(crate) generation: Generation, +} + +pub(crate) struct VisibleLsnUpdates { + pub(crate) tenants: HashMap, +} + +impl VisibleLsnUpdates { + pub(crate) fn new() -> Self { + Self { + tenants: HashMap::new(), + } + } +} + +impl std::fmt::Debug for VisibleLsnUpdates { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len()) + } +}