From 960dd9a206fe7917d7bef8f1211ee05e3026e4e0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 5 Sep 2023 10:24:59 +0100 Subject: [PATCH] pageserver: use deferred updates to remote_consistent_lsn --- libs/pageserver_api/src/models.rs | 7 ++ pageserver/src/http/routes.rs | 10 ++- .../src/tenant/remote_timeline_client.rs | 68 +++++++++++++++---- pageserver/src/tenant/timeline.rs | 16 ++++- .../walreceiver/walreceiver_connection.rs | 5 +- pageserver/src/tenant/upload_queue.rs | 37 ++++++++-- 6 files changed, 120 insertions(+), 23 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index f354296be2..68620787bb 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -363,8 +363,15 @@ pub struct TimelineInfo { pub latest_gc_cutoff_lsn: Lsn, #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, + + /// The LSN that we have succesfully uploaded to remote storage #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, + + /// The LSN that we are advertizing to safekeepers + #[serde_as(as = "DisplayFromStr")] + pub remote_consistent_lsn_visible: Lsn, + pub current_logical_size: Option, // is None when timeline is Unloaded /// Sum of the size of all layer files. /// If a layer is present in both local FS and S3, it counts only once. diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 22100a13ab..986f86ec93 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -295,7 +295,12 @@ async fn build_timeline_info_common( }; let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); - let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); + let remote_consistent_lsn_projected = timeline + .get_remote_consistent_lsn_projected() + .unwrap_or(Lsn(0)); + let remote_consistent_lsn_visible = timeline + .get_remote_consistent_lsn_visible() + .unwrap_or(Lsn(0)); let walreceiver_status = timeline.walreceiver_status(); @@ -305,7 +310,8 @@ async fn build_timeline_info_common( ancestor_timeline_id, ancestor_lsn, disk_consistent_lsn: timeline.get_disk_consistent_lsn(), - remote_consistent_lsn, + remote_consistent_lsn: remote_consistent_lsn_projected, + remote_consistent_lsn_visible, last_record_lsn, prev_record_lsn: Some(timeline.get_prev_record_lsn()), latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(), diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 0f435308af..c13e0d1723 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -116,8 +116,12 @@ //! # Completion //! //! Once an operation has completed, we update -//! [`UploadQueueInitialized::last_uploaded_consistent_lsn`] which indicates -//! to safekeepers that they can delete the WAL up to that LSN. +//! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately, +//! and submit a request through the DeletionQueue to update +//! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has +//! validated that our generation is not stale. It is this visible value +//! that is advertized to safekeepers as a signal that that they can +//! delete the WAL up to that LSN. //! //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait //! for all pending operations to complete. It does not prevent more @@ -417,13 +421,23 @@ impl RemoteTimelineClient { Ok(()) } - pub fn last_uploaded_consistent_lsn(&self) -> Option { - match &*self.upload_queue.lock().unwrap() { + pub fn remote_consistent_lsn_projected(&self) -> Option { + match &mut *self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, - UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn), - UploadQueue::Stopped(q) => { - Some(q.upload_queue_for_deletion.last_uploaded_consistent_lsn) - } + UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(), + UploadQueue::Stopped(q) => q + .upload_queue_for_deletion + .get_last_remote_consistent_lsn_projected(), + } + } + + pub fn remote_consistent_lsn_visible(&self) -> Option { + match &mut *self.upload_queue.lock().unwrap() { + UploadQueue::Uninitialized => None, + UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_visible(), + UploadQueue::Stopped(q) => q + .upload_queue_for_deletion + .get_last_remote_consistent_lsn_visible(), } } @@ -1178,7 +1192,7 @@ impl RemoteTimelineClient { } // The task has completed successfully. Remove it from the in-progress list. - { + let lsn_update = { let mut upload_queue_guard = self.upload_queue.lock().unwrap(); let upload_queue = match upload_queue_guard.deref_mut() { UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"), @@ -1198,23 +1212,48 @@ impl RemoteTimelineClient { upload_queue.inprogress_tasks.remove(&task.task_id); - match task.op { + let lsn_update = match task.op { UploadOp::UploadLayer(_, _) => { upload_queue.num_inprogress_layer_uploads -= 1; + None } UploadOp::UploadMetadata(_, lsn) => { upload_queue.num_inprogress_metadata_uploads -= 1; - upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check? + // XXX monotonicity check? + + upload_queue.projected_remote_consistent_lsn = Some(lsn); + if self.generation.is_none() { + // Legacy mode: skip validating generation + upload_queue.visible_remote_consistent_lsn = Some(lsn); + None + } else { + Some((lsn, upload_queue.visible_remote_consistent_lsn_tx.clone())) + } } UploadOp::Delete(_) => { upload_queue.num_inprogress_deletions -= 1; + None } UploadOp::Barrier(_) => unreachable!(), }; // Launch any queued tasks that were unblocked by this one. self.launch_queued_tasks(upload_queue); + lsn_update + }; + + if let Some((lsn, tx)) = lsn_update { + self.deletion_queue_client + .update_remote_consistent_lsn( + self.tenant_id, + self.timeline_id, + self.generation, + lsn, + tx, + ) + .await; } + self.calls_unfinished_metric_end(&task.op); } @@ -1298,12 +1337,17 @@ impl RemoteTimelineClient { // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point. // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it. + let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) = + tokio::sync::mpsc::channel(16); let upload_queue_for_deletion = UploadQueueInitialized { task_counter: 0, latest_files: initialized.latest_files.clone(), latest_files_changes_since_metadata_upload_scheduled: 0, latest_metadata: initialized.latest_metadata.clone(), - last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn, + projected_remote_consistent_lsn: initialized.visible_remote_consistent_lsn, + visible_remote_consistent_lsn: initialized.visible_remote_consistent_lsn, + visible_remote_consistent_lsn_tx, + visible_remote_consistent_lsn_rx, num_inprogress_layer_uploads: 0, num_inprogress_metadata_uploads: 0, num_inprogress_deletions: 0, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2329dc25bc..73dafa4322 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -523,9 +523,21 @@ impl Timeline { self.disk_consistent_lsn.load() } - pub fn get_remote_consistent_lsn(&self) -> Option { + /// remote_consistent_lsn for objects uploaded in the tenant's current generation + pub fn get_remote_consistent_lsn_projected(&self) -> Option { if let Some(remote_client) = &self.remote_client { - remote_client.last_uploaded_consistent_lsn() + remote_client.remote_consistent_lsn_projected() + } else { + None + } + } + + /// remote_consistent_lsn which the tenant is guaranteed not to go backward from, + /// i.e. a value of remote_consistent_lsn_projected which has undergone + /// generation validation in the deletion queue. + pub fn get_remote_consistent_lsn_visible(&self) -> Option { + if let Some(remote_client) = &self.remote_client { + remote_client.remote_consistent_lsn_visible() } else { None } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 7d1e9b4a39..0831b9ceda 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -370,8 +370,9 @@ pub(super) async fn handle_walreceiver_connection( })?; if let Some(last_lsn) = status_update { - let timeline_remote_consistent_lsn = - timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); + let timeline_remote_consistent_lsn = timeline + .get_remote_consistent_lsn_visible() + .unwrap_or(Lsn(0)); // The last LSN we processed. It is not guaranteed to survive pageserver crash. let last_received_lsn = last_lsn; diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index cfa5e9db27..3327392c0a 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -56,7 +56,13 @@ pub(crate) struct UploadQueueInitialized { /// uploaded. `Lsn(0)` if nothing was uploaded yet. /// Unlike `latest_files` or `latest_metadata`, this value is never ahead. /// Safekeeper can rely on it to make decisions for WAL storage. - pub(crate) last_uploaded_consistent_lsn: Lsn, + /// + /// visible_remote_consistent_lsn is only updated after our generation has been validated with + /// the control plane: this is coordinated via the channel defined below. + pub(crate) projected_remote_consistent_lsn: Option, + pub(crate) visible_remote_consistent_lsn: Option, + pub(crate) visible_remote_consistent_lsn_rx: tokio::sync::mpsc::Receiver, + pub(crate) visible_remote_consistent_lsn_tx: tokio::sync::mpsc::Sender, // Breakdown of different kinds of tasks currently in-progress pub(crate) num_inprogress_layer_uploads: usize, @@ -79,6 +85,18 @@ impl UploadQueueInitialized { pub(super) fn no_pending_work(&self) -> bool { self.inprogress_tasks.is_empty() && self.queued_operations.is_empty() } + + pub(super) fn get_last_remote_consistent_lsn_visible(&mut self) -> Option { + while let Ok(lsn) = self.visible_remote_consistent_lsn_rx.try_recv() { + self.visible_remote_consistent_lsn = Some(lsn); + } + + self.visible_remote_consistent_lsn + } + + pub(super) fn get_last_remote_consistent_lsn_projected(&mut self) -> Option { + self.projected_remote_consistent_lsn + } } #[derive(Clone, Copy)] @@ -107,14 +125,18 @@ impl UploadQueue { info!("initializing upload queue for empty remote"); + let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) = + tokio::sync::mpsc::channel(16); + let state = UploadQueueInitialized { // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead. latest_files: HashMap::new(), latest_files_changes_since_metadata_upload_scheduled: 0, latest_metadata: metadata.clone(), - // We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent - // safekeepers from garbage-collecting anything. - last_uploaded_consistent_lsn: Lsn(0), + projected_remote_consistent_lsn: None, + visible_remote_consistent_lsn: None, + visible_remote_consistent_lsn_tx, + visible_remote_consistent_lsn_rx, // what follows are boring default initializations task_counter: 0, num_inprogress_layer_uploads: 0, @@ -152,11 +174,16 @@ impl UploadQueue { index_part.metadata.disk_consistent_lsn() ); + let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) = + tokio::sync::mpsc::channel(16); let state = UploadQueueInitialized { latest_files: files, latest_files_changes_since_metadata_upload_scheduled: 0, latest_metadata: index_part.metadata.clone(), - last_uploaded_consistent_lsn: index_part.metadata.disk_consistent_lsn(), + projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()), + visible_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()), + visible_remote_consistent_lsn_tx, + visible_remote_consistent_lsn_rx, // what follows are boring default initializations task_counter: 0, num_inprogress_layer_uploads: 0,