mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
pageserver: use deferred updates to remote_consistent_lsn
This commit is contained in:
@@ -363,8 +363,15 @@ pub struct TimelineInfo {
|
||||
pub latest_gc_cutoff_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
|
||||
/// The LSN that we have succesfully uploaded to remote storage
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
|
||||
/// The LSN that we are advertizing to safekeepers
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn_visible: Lsn,
|
||||
|
||||
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
/// Sum of the size of all layer files.
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
|
||||
@@ -295,7 +295,12 @@ async fn build_timeline_info_common(
|
||||
};
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
let remote_consistent_lsn_projected = timeline
|
||||
.get_remote_consistent_lsn_projected()
|
||||
.unwrap_or(Lsn(0));
|
||||
let remote_consistent_lsn_visible = timeline
|
||||
.get_remote_consistent_lsn_visible()
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
let walreceiver_status = timeline.walreceiver_status();
|
||||
|
||||
@@ -305,7 +310,8 @@ async fn build_timeline_info_common(
|
||||
ancestor_timeline_id,
|
||||
ancestor_lsn,
|
||||
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
|
||||
remote_consistent_lsn,
|
||||
remote_consistent_lsn: remote_consistent_lsn_projected,
|
||||
remote_consistent_lsn_visible,
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
|
||||
@@ -116,8 +116,12 @@
|
||||
//! # Completion
|
||||
//!
|
||||
//! Once an operation has completed, we update
|
||||
//! [`UploadQueueInitialized::last_uploaded_consistent_lsn`] which indicates
|
||||
//! to safekeepers that they can delete the WAL up to that LSN.
|
||||
//! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately,
|
||||
//! and submit a request through the DeletionQueue to update
|
||||
//! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
|
||||
//! validated that our generation is not stale. It is this visible value
|
||||
//! that is advertized to safekeepers as a signal that that they can
|
||||
//! delete the WAL up to that LSN.
|
||||
//!
|
||||
//! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
|
||||
//! for all pending operations to complete. It does not prevent more
|
||||
@@ -417,13 +421,23 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
|
||||
match &*self.upload_queue.lock().unwrap() {
|
||||
pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
|
||||
UploadQueue::Stopped(q) => {
|
||||
Some(q.upload_queue_for_deletion.last_uploaded_consistent_lsn)
|
||||
}
|
||||
UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
|
||||
UploadQueue::Stopped(q) => q
|
||||
.upload_queue_for_deletion
|
||||
.get_last_remote_consistent_lsn_projected(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_visible(),
|
||||
UploadQueue::Stopped(q) => q
|
||||
.upload_queue_for_deletion
|
||||
.get_last_remote_consistent_lsn_visible(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1178,7 +1192,7 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
// The task has completed successfully. Remove it from the in-progress list.
|
||||
{
|
||||
let lsn_update = {
|
||||
let mut upload_queue_guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = match upload_queue_guard.deref_mut() {
|
||||
UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
|
||||
@@ -1198,23 +1212,48 @@ impl RemoteTimelineClient {
|
||||
|
||||
upload_queue.inprogress_tasks.remove(&task.task_id);
|
||||
|
||||
match task.op {
|
||||
let lsn_update = match task.op {
|
||||
UploadOp::UploadLayer(_, _) => {
|
||||
upload_queue.num_inprogress_layer_uploads -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::UploadMetadata(_, lsn) => {
|
||||
upload_queue.num_inprogress_metadata_uploads -= 1;
|
||||
upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check?
|
||||
// XXX monotonicity check?
|
||||
|
||||
upload_queue.projected_remote_consistent_lsn = Some(lsn);
|
||||
if self.generation.is_none() {
|
||||
// Legacy mode: skip validating generation
|
||||
upload_queue.visible_remote_consistent_lsn = Some(lsn);
|
||||
None
|
||||
} else {
|
||||
Some((lsn, upload_queue.visible_remote_consistent_lsn_tx.clone()))
|
||||
}
|
||||
}
|
||||
UploadOp::Delete(_) => {
|
||||
upload_queue.num_inprogress_deletions -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::Barrier(_) => unreachable!(),
|
||||
};
|
||||
|
||||
// Launch any queued tasks that were unblocked by this one.
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
lsn_update
|
||||
};
|
||||
|
||||
if let Some((lsn, tx)) = lsn_update {
|
||||
self.deletion_queue_client
|
||||
.update_remote_consistent_lsn(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
lsn,
|
||||
tx,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
self.calls_unfinished_metric_end(&task.op);
|
||||
}
|
||||
|
||||
@@ -1298,12 +1337,17 @@ impl RemoteTimelineClient {
|
||||
// In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
|
||||
// but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
|
||||
// Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
|
||||
let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) =
|
||||
tokio::sync::mpsc::channel(16);
|
||||
let upload_queue_for_deletion = UploadQueueInitialized {
|
||||
task_counter: 0,
|
||||
latest_files: initialized.latest_files.clone(),
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: initialized.latest_metadata.clone(),
|
||||
last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn,
|
||||
projected_remote_consistent_lsn: initialized.visible_remote_consistent_lsn,
|
||||
visible_remote_consistent_lsn: initialized.visible_remote_consistent_lsn,
|
||||
visible_remote_consistent_lsn_tx,
|
||||
visible_remote_consistent_lsn_rx,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
|
||||
@@ -523,9 +523,21 @@ impl Timeline {
|
||||
self.disk_consistent_lsn.load()
|
||||
}
|
||||
|
||||
pub fn get_remote_consistent_lsn(&self) -> Option<Lsn> {
|
||||
/// remote_consistent_lsn for objects uploaded in the tenant's current generation
|
||||
pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.last_uploaded_consistent_lsn()
|
||||
remote_client.remote_consistent_lsn_projected()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
|
||||
/// i.e. a value of remote_consistent_lsn_projected which has undergone
|
||||
/// generation validation in the deletion queue.
|
||||
pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.remote_consistent_lsn_visible()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -370,8 +370,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
})?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let timeline_remote_consistent_lsn =
|
||||
timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
let timeline_remote_consistent_lsn = timeline
|
||||
.get_remote_consistent_lsn_visible()
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
|
||||
let last_received_lsn = last_lsn;
|
||||
|
||||
@@ -56,7 +56,13 @@ pub(crate) struct UploadQueueInitialized {
|
||||
/// uploaded. `Lsn(0)` if nothing was uploaded yet.
|
||||
/// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
|
||||
/// Safekeeper can rely on it to make decisions for WAL storage.
|
||||
pub(crate) last_uploaded_consistent_lsn: Lsn,
|
||||
///
|
||||
/// visible_remote_consistent_lsn is only updated after our generation has been validated with
|
||||
/// the control plane: this is coordinated via the channel defined below.
|
||||
pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
|
||||
pub(crate) visible_remote_consistent_lsn: Option<Lsn>,
|
||||
pub(crate) visible_remote_consistent_lsn_rx: tokio::sync::mpsc::Receiver<Lsn>,
|
||||
pub(crate) visible_remote_consistent_lsn_tx: tokio::sync::mpsc::Sender<Lsn>,
|
||||
|
||||
// Breakdown of different kinds of tasks currently in-progress
|
||||
pub(crate) num_inprogress_layer_uploads: usize,
|
||||
@@ -79,6 +85,18 @@ impl UploadQueueInitialized {
|
||||
pub(super) fn no_pending_work(&self) -> bool {
|
||||
self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
|
||||
}
|
||||
|
||||
pub(super) fn get_last_remote_consistent_lsn_visible(&mut self) -> Option<Lsn> {
|
||||
while let Ok(lsn) = self.visible_remote_consistent_lsn_rx.try_recv() {
|
||||
self.visible_remote_consistent_lsn = Some(lsn);
|
||||
}
|
||||
|
||||
self.visible_remote_consistent_lsn
|
||||
}
|
||||
|
||||
pub(super) fn get_last_remote_consistent_lsn_projected(&mut self) -> Option<Lsn> {
|
||||
self.projected_remote_consistent_lsn
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -107,14 +125,18 @@ impl UploadQueue {
|
||||
|
||||
info!("initializing upload queue for empty remote");
|
||||
|
||||
let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) =
|
||||
tokio::sync::mpsc::channel(16);
|
||||
|
||||
let state = UploadQueueInitialized {
|
||||
// As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
|
||||
latest_files: HashMap::new(),
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: metadata.clone(),
|
||||
// We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent
|
||||
// safekeepers from garbage-collecting anything.
|
||||
last_uploaded_consistent_lsn: Lsn(0),
|
||||
projected_remote_consistent_lsn: None,
|
||||
visible_remote_consistent_lsn: None,
|
||||
visible_remote_consistent_lsn_tx,
|
||||
visible_remote_consistent_lsn_rx,
|
||||
// what follows are boring default initializations
|
||||
task_counter: 0,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
@@ -152,11 +174,16 @@ impl UploadQueue {
|
||||
index_part.metadata.disk_consistent_lsn()
|
||||
);
|
||||
|
||||
let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) =
|
||||
tokio::sync::mpsc::channel(16);
|
||||
let state = UploadQueueInitialized {
|
||||
latest_files: files,
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: index_part.metadata.clone(),
|
||||
last_uploaded_consistent_lsn: index_part.metadata.disk_consistent_lsn(),
|
||||
projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
|
||||
visible_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
|
||||
visible_remote_consistent_lsn_tx,
|
||||
visible_remote_consistent_lsn_rx,
|
||||
// what follows are boring default initializations
|
||||
task_counter: 0,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
|
||||
Reference in New Issue
Block a user