mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
Refactor remote_consistent_lsn updates to use an atomic instead of a
channel
This commit is contained in:
@@ -24,6 +24,7 @@ use tracing::Instrument;
|
||||
use tracing::{self, debug, error};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::AtomicLsn;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use self::backend::BackendQueueWorker;
|
||||
@@ -374,7 +375,7 @@ impl std::fmt::Display for DeletionList {
|
||||
|
||||
struct PendingLsn {
|
||||
projected: Lsn,
|
||||
result_tx: tokio::sync::mpsc::Sender<Lsn>,
|
||||
result_slot: Arc<AtomicLsn>,
|
||||
}
|
||||
|
||||
struct TenantLsnState {
|
||||
@@ -382,10 +383,6 @@ struct TenantLsnState {
|
||||
|
||||
// In what generation was the most recent update proposed?
|
||||
generation: Generation,
|
||||
|
||||
// Any timelines' LSNs projected since this flag was last cleared?
|
||||
// (optimization so that reader doesn't have to walk timelines)
|
||||
dirty: bool,
|
||||
}
|
||||
|
||||
struct VisibleLsnUpdates {
|
||||
@@ -465,7 +462,7 @@ impl DeletionQueueClient {
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
lsn: Lsn,
|
||||
result_tx: tokio::sync::mpsc::Sender<Lsn>,
|
||||
result_slot: Arc<AtomicLsn>,
|
||||
) {
|
||||
let mut locked = self
|
||||
.lsn_table
|
||||
@@ -475,7 +472,6 @@ impl DeletionQueueClient {
|
||||
let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState {
|
||||
timelines: HashMap::new(),
|
||||
generation: current_generation,
|
||||
dirty: true,
|
||||
});
|
||||
|
||||
if tenant_entry.generation != current_generation {
|
||||
@@ -485,13 +481,11 @@ impl DeletionQueueClient {
|
||||
tenant_entry.generation = current_generation;
|
||||
}
|
||||
|
||||
tenant_entry.dirty = true;
|
||||
|
||||
tenant_entry.timelines.insert(
|
||||
timeline_id,
|
||||
PendingLsn {
|
||||
projected: lsn,
|
||||
result_tx,
|
||||
result_slot,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -161,7 +161,7 @@ impl BackendQueueWorker {
|
||||
for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines {
|
||||
// Drop result of send: it is legal for the Timeline to have been dropped along
|
||||
// with its queue receiver while we were doing validation.
|
||||
drop(pending_lsn.result_tx.send(pending_lsn.projected).await);
|
||||
pending_lsn.result_slot.store(pending_lsn.projected);
|
||||
}
|
||||
} else {
|
||||
// If we failed validation, then do not apply any of the projected updates
|
||||
|
||||
@@ -434,10 +434,11 @@ impl RemoteTimelineClient {
|
||||
pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_visible(),
|
||||
UploadQueue::Stopped(q) => q
|
||||
.upload_queue_for_deletion
|
||||
.get_last_remote_consistent_lsn_visible(),
|
||||
UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
|
||||
UploadQueue::Stopped(q) => Some(
|
||||
q.upload_queue_for_deletion
|
||||
.get_last_remote_consistent_lsn_visible(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1227,10 +1228,10 @@ impl RemoteTimelineClient {
|
||||
upload_queue.projected_remote_consistent_lsn = Some(lsn);
|
||||
if self.generation.is_none() {
|
||||
// Legacy mode: skip validating generation
|
||||
upload_queue.visible_remote_consistent_lsn = Some(lsn);
|
||||
upload_queue.visible_remote_consistent_lsn.store(lsn);
|
||||
None
|
||||
} else {
|
||||
Some((lsn, upload_queue.visible_remote_consistent_lsn_tx.clone()))
|
||||
Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
|
||||
}
|
||||
}
|
||||
UploadOp::Delete(_) => {
|
||||
@@ -1245,14 +1246,14 @@ impl RemoteTimelineClient {
|
||||
lsn_update
|
||||
};
|
||||
|
||||
if let Some((lsn, tx)) = lsn_update {
|
||||
if let Some((lsn, slot)) = lsn_update {
|
||||
self.deletion_queue_client
|
||||
.update_remote_consistent_lsn(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
lsn,
|
||||
tx,
|
||||
slot,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -1340,17 +1341,15 @@ impl RemoteTimelineClient {
|
||||
// In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
|
||||
// but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
|
||||
// Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
|
||||
let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) =
|
||||
tokio::sync::mpsc::channel(16);
|
||||
let upload_queue_for_deletion = UploadQueueInitialized {
|
||||
task_counter: 0,
|
||||
latest_files: initialized.latest_files.clone(),
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: initialized.latest_metadata.clone(),
|
||||
projected_remote_consistent_lsn: initialized.visible_remote_consistent_lsn,
|
||||
visible_remote_consistent_lsn: initialized.visible_remote_consistent_lsn,
|
||||
visible_remote_consistent_lsn_tx,
|
||||
visible_remote_consistent_lsn_rx,
|
||||
projected_remote_consistent_lsn: None,
|
||||
visible_remote_consistent_lsn: initialized
|
||||
.visible_remote_consistent_lsn
|
||||
.clone(),
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::fmt::Debug;
|
||||
use chrono::NaiveDateTime;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
use utils::lsn::AtomicLsn;
|
||||
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -58,11 +59,9 @@ pub(crate) struct UploadQueueInitialized {
|
||||
/// Safekeeper can rely on it to make decisions for WAL storage.
|
||||
///
|
||||
/// visible_remote_consistent_lsn is only updated after our generation has been validated with
|
||||
/// the control plane: this is coordinated via the channel defined below.
|
||||
/// the control plane.
|
||||
pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
|
||||
pub(crate) visible_remote_consistent_lsn: Option<Lsn>,
|
||||
pub(crate) visible_remote_consistent_lsn_rx: tokio::sync::mpsc::Receiver<Lsn>,
|
||||
pub(crate) visible_remote_consistent_lsn_tx: tokio::sync::mpsc::Sender<Lsn>,
|
||||
pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
|
||||
|
||||
// Breakdown of different kinds of tasks currently in-progress
|
||||
pub(crate) num_inprogress_layer_uploads: usize,
|
||||
@@ -86,12 +85,8 @@ impl UploadQueueInitialized {
|
||||
self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
|
||||
}
|
||||
|
||||
pub(super) fn get_last_remote_consistent_lsn_visible(&mut self) -> Option<Lsn> {
|
||||
while let Ok(lsn) = self.visible_remote_consistent_lsn_rx.try_recv() {
|
||||
self.visible_remote_consistent_lsn = Some(lsn);
|
||||
}
|
||||
|
||||
self.visible_remote_consistent_lsn
|
||||
pub(super) fn get_last_remote_consistent_lsn_visible(&mut self) -> Lsn {
|
||||
self.visible_remote_consistent_lsn.load()
|
||||
}
|
||||
|
||||
pub(super) fn get_last_remote_consistent_lsn_projected(&mut self) -> Option<Lsn> {
|
||||
@@ -125,18 +120,13 @@ impl UploadQueue {
|
||||
|
||||
info!("initializing upload queue for empty remote");
|
||||
|
||||
let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) =
|
||||
tokio::sync::mpsc::channel(16);
|
||||
|
||||
let state = UploadQueueInitialized {
|
||||
// As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
|
||||
latest_files: HashMap::new(),
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: metadata.clone(),
|
||||
projected_remote_consistent_lsn: None,
|
||||
visible_remote_consistent_lsn: None,
|
||||
visible_remote_consistent_lsn_tx,
|
||||
visible_remote_consistent_lsn_rx,
|
||||
visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
|
||||
// what follows are boring default initializations
|
||||
task_counter: 0,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
@@ -174,16 +164,14 @@ impl UploadQueue {
|
||||
index_part.metadata.disk_consistent_lsn()
|
||||
);
|
||||
|
||||
let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) =
|
||||
tokio::sync::mpsc::channel(16);
|
||||
let state = UploadQueueInitialized {
|
||||
latest_files: files,
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: index_part.metadata.clone(),
|
||||
projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
|
||||
visible_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
|
||||
visible_remote_consistent_lsn_tx,
|
||||
visible_remote_consistent_lsn_rx,
|
||||
visible_remote_consistent_lsn: Arc::new(
|
||||
index_part.metadata.disk_consistent_lsn().into(),
|
||||
),
|
||||
// what follows are boring default initializations
|
||||
task_counter: 0,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
|
||||
Reference in New Issue
Block a user