mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
move lsn visibility related stuff into separate module
This commit is contained in:
@@ -139,41 +139,13 @@ pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
lsn_table: Arc<std::sync::RwLock<lsn_visibility::VisibleLsnUpdates>>,
|
||||
}
|
||||
|
||||
mod deletion_list;
|
||||
use deletion_list::*;
|
||||
|
||||
struct PendingLsn {
|
||||
projected: Lsn,
|
||||
result_slot: Arc<AtomicLsn>,
|
||||
}
|
||||
|
||||
struct TenantLsnState {
|
||||
timelines: HashMap<TimelineId, PendingLsn>,
|
||||
|
||||
// In what generation was the most recent update proposed?
|
||||
generation: Generation,
|
||||
}
|
||||
|
||||
struct VisibleLsnUpdates {
|
||||
tenants: HashMap<TenantId, TenantLsnState>,
|
||||
}
|
||||
|
||||
impl VisibleLsnUpdates {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
tenants: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for VisibleLsnUpdates {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
|
||||
}
|
||||
}
|
||||
mod lsn_visibility;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DeletionQueueError {
|
||||
@@ -189,7 +161,9 @@ impl DeletionQueueClient {
|
||||
Self {
|
||||
tx,
|
||||
executor_tx,
|
||||
lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
|
||||
lsn_table: Arc::new(std::sync::RwLock::new(
|
||||
lsn_visibility::VisibleLsnUpdates::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,10 +215,14 @@ impl DeletionQueueClient {
|
||||
.write()
|
||||
.expect("Lock should never be poisoned");
|
||||
|
||||
let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState {
|
||||
timelines: HashMap::new(),
|
||||
generation: current_generation,
|
||||
});
|
||||
let tenant_entry =
|
||||
locked
|
||||
.tenants
|
||||
.entry(tenant_id)
|
||||
.or_insert(lsn_visibility::TenantLsnState {
|
||||
timelines: HashMap::new(),
|
||||
generation: current_generation,
|
||||
});
|
||||
|
||||
if tenant_entry.generation != current_generation {
|
||||
// Generation might have changed if we were detached and then re-attached: in this case,
|
||||
@@ -255,7 +233,7 @@ impl DeletionQueueClient {
|
||||
|
||||
tenant_entry.timelines.insert(
|
||||
timeline_id,
|
||||
PendingLsn {
|
||||
lsn_visibility::PendingLsn {
|
||||
projected: lsn,
|
||||
result_slot,
|
||||
},
|
||||
@@ -419,7 +397,9 @@ impl DeletionQueue {
|
||||
// happen in the backend (persistent), not in this queue.
|
||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
|
||||
let lsn_table = Arc::new(std::sync::RwLock::new(
|
||||
lsn_visibility::VisibleLsnUpdates::new(),
|
||||
));
|
||||
|
||||
// The deletion queue has an independent cancellation token to
|
||||
// the general pageserver shutdown token, because it stays alive a bit
|
||||
@@ -1034,7 +1014,7 @@ pub mod mock {
|
||||
executed: Arc<AtomicUsize>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
consumer: std::sync::Mutex<ConsumerState>,
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
lsn_table: Arc<std::sync::RwLock<lsn_visibility::VisibleLsnUpdates>>,
|
||||
}
|
||||
|
||||
impl MockDeletionQueue {
|
||||
@@ -1050,7 +1030,9 @@ pub mod mock {
|
||||
executed,
|
||||
remote_storage,
|
||||
consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
|
||||
lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
|
||||
lsn_table: Arc::new(std::sync::RwLock::new(
|
||||
lsn_visibility::VisibleLsnUpdates::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,11 +13,11 @@ use crate::metrics::DELETION_QUEUE_DROPPED;
|
||||
use crate::metrics::DELETION_QUEUE_ERRORS;
|
||||
|
||||
use super::executor::ExecutorMessage;
|
||||
use super::lsn_visibility;
|
||||
use super::DeletionHeader;
|
||||
use super::DeletionList;
|
||||
use super::DeletionQueueError;
|
||||
use super::FlushOp;
|
||||
use super::VisibleLsnUpdates;
|
||||
|
||||
// After this length of time, do any validation work that is pending,
|
||||
// even if we haven't accumulated many keys to delete.
|
||||
@@ -58,7 +58,7 @@ where
|
||||
// Lsn validation state: we read projected LSNs and write back visible LSNs
|
||||
// after validation. This is the LSN equivalent of `pending_validation_lists`:
|
||||
// it is drained in [`validate`]
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
lsn_table: Arc<std::sync::RwLock<lsn_visibility::VisibleLsnUpdates>>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
@@ -72,7 +72,7 @@ where
|
||||
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
control_plane_client: Option<C>,
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
lsn_table: Arc<std::sync::RwLock<lsn_visibility::VisibleLsnUpdates>>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -122,7 +122,7 @@ where
|
||||
|
||||
let pending_lsn_updates = {
|
||||
let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
|
||||
let mut pending_updates = VisibleLsnUpdates::new();
|
||||
let mut pending_updates = lsn_visibility::VisibleLsnUpdates::new();
|
||||
std::mem::swap(&mut pending_updates, &mut lsn_table);
|
||||
pending_updates
|
||||
};
|
||||
|
||||
43
pageserver/src/deletion_queue/lsn_visibility.rs
Normal file
43
pageserver/src/deletion_queue/lsn_visibility.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
use utils::id::TenantId;
|
||||
|
||||
use utils::generation::Generation;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use utils::lsn::AtomicLsn;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(crate) struct PendingLsn {
|
||||
pub(crate) projected: Lsn,
|
||||
pub(crate) result_slot: Arc<AtomicLsn>,
|
||||
}
|
||||
|
||||
pub(crate) struct TenantLsnState {
|
||||
pub(crate) timelines: HashMap<TimelineId, PendingLsn>,
|
||||
|
||||
// In what generation was the most recent update proposed?
|
||||
pub(crate) generation: Generation,
|
||||
}
|
||||
|
||||
pub(crate) struct VisibleLsnUpdates {
|
||||
pub(crate) tenants: HashMap<TenantId, TenantLsnState>,
|
||||
}
|
||||
|
||||
impl VisibleLsnUpdates {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
tenants: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for VisibleLsnUpdates {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user