diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 2526786821..7e321ca744 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -24,6 +24,7 @@ use tracing::Instrument; use tracing::{self, debug, error}; use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; +use utils::lsn::AtomicLsn; use utils::lsn::Lsn; use self::backend::BackendQueueWorker; @@ -374,7 +375,7 @@ impl std::fmt::Display for DeletionList { struct PendingLsn { projected: Lsn, - result_tx: tokio::sync::mpsc::Sender, + result_slot: Arc, } struct TenantLsnState { @@ -382,10 +383,6 @@ struct TenantLsnState { // In what generation was the most recent update proposed? generation: Generation, - - // Any timelines' LSNs projected since this flag was last cleared? - // (optimization so that reader doesn't have to walk timelines) - dirty: bool, } struct VisibleLsnUpdates { @@ -465,7 +462,7 @@ impl DeletionQueueClient { timeline_id: TimelineId, current_generation: Generation, lsn: Lsn, - result_tx: tokio::sync::mpsc::Sender, + result_slot: Arc, ) { let mut locked = self .lsn_table @@ -475,7 +472,6 @@ impl DeletionQueueClient { let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState { timelines: HashMap::new(), generation: current_generation, - dirty: true, }); if tenant_entry.generation != current_generation { @@ -485,13 +481,11 @@ impl DeletionQueueClient { tenant_entry.generation = current_generation; } - tenant_entry.dirty = true; - tenant_entry.timelines.insert( timeline_id, PendingLsn { projected: lsn, - result_tx, + result_slot, }, ); } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index f214616be9..f4b67d3d20 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -161,7 +161,7 @@ impl BackendQueueWorker { for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines { // Drop result of send: it is legal for the Timeline to have been dropped along // with its queue receiver while we were doing validation. - drop(pending_lsn.result_tx.send(pending_lsn.projected).await); + pending_lsn.result_slot.store(pending_lsn.projected); } } else { // If we failed validation, then do not apply any of the projected updates diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 874304bfe3..7eb00c0405 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -434,10 +434,11 @@ impl RemoteTimelineClient { pub fn remote_consistent_lsn_visible(&self) -> Option { match &mut *self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, - UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_visible(), - UploadQueue::Stopped(q) => q - .upload_queue_for_deletion - .get_last_remote_consistent_lsn_visible(), + UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()), + UploadQueue::Stopped(q) => Some( + q.upload_queue_for_deletion + .get_last_remote_consistent_lsn_visible(), + ), } } @@ -1227,10 +1228,10 @@ impl RemoteTimelineClient { upload_queue.projected_remote_consistent_lsn = Some(lsn); if self.generation.is_none() { // Legacy mode: skip validating generation - upload_queue.visible_remote_consistent_lsn = Some(lsn); + upload_queue.visible_remote_consistent_lsn.store(lsn); None } else { - Some((lsn, upload_queue.visible_remote_consistent_lsn_tx.clone())) + Some((lsn, upload_queue.visible_remote_consistent_lsn.clone())) } } UploadOp::Delete(_) => { @@ -1245,14 +1246,14 @@ impl RemoteTimelineClient { lsn_update }; - if let Some((lsn, tx)) = lsn_update { + if let Some((lsn, slot)) = lsn_update { self.deletion_queue_client .update_remote_consistent_lsn( self.tenant_id, self.timeline_id, self.generation, lsn, - tx, + slot, ) .await; } @@ -1340,17 +1341,15 @@ impl RemoteTimelineClient { // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point. // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it. - let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) = - tokio::sync::mpsc::channel(16); let upload_queue_for_deletion = UploadQueueInitialized { task_counter: 0, latest_files: initialized.latest_files.clone(), latest_files_changes_since_metadata_upload_scheduled: 0, latest_metadata: initialized.latest_metadata.clone(), - projected_remote_consistent_lsn: initialized.visible_remote_consistent_lsn, - visible_remote_consistent_lsn: initialized.visible_remote_consistent_lsn, - visible_remote_consistent_lsn_tx, - visible_remote_consistent_lsn_rx, + projected_remote_consistent_lsn: None, + visible_remote_consistent_lsn: initialized + .visible_remote_consistent_lsn + .clone(), num_inprogress_layer_uploads: 0, num_inprogress_metadata_uploads: 0, num_inprogress_deletions: 0, diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 3327392c0a..f8cac1e681 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -9,6 +9,7 @@ use std::fmt::Debug; use chrono::NaiveDateTime; use std::sync::Arc; use tracing::info; +use utils::lsn::AtomicLsn; use std::sync::atomic::AtomicU32; use utils::lsn::Lsn; @@ -58,11 +59,9 @@ pub(crate) struct UploadQueueInitialized { /// Safekeeper can rely on it to make decisions for WAL storage. /// /// visible_remote_consistent_lsn is only updated after our generation has been validated with - /// the control plane: this is coordinated via the channel defined below. + /// the control plane. pub(crate) projected_remote_consistent_lsn: Option, - pub(crate) visible_remote_consistent_lsn: Option, - pub(crate) visible_remote_consistent_lsn_rx: tokio::sync::mpsc::Receiver, - pub(crate) visible_remote_consistent_lsn_tx: tokio::sync::mpsc::Sender, + pub(crate) visible_remote_consistent_lsn: Arc, // Breakdown of different kinds of tasks currently in-progress pub(crate) num_inprogress_layer_uploads: usize, @@ -86,12 +85,8 @@ impl UploadQueueInitialized { self.inprogress_tasks.is_empty() && self.queued_operations.is_empty() } - pub(super) fn get_last_remote_consistent_lsn_visible(&mut self) -> Option { - while let Ok(lsn) = self.visible_remote_consistent_lsn_rx.try_recv() { - self.visible_remote_consistent_lsn = Some(lsn); - } - - self.visible_remote_consistent_lsn + pub(super) fn get_last_remote_consistent_lsn_visible(&mut self) -> Lsn { + self.visible_remote_consistent_lsn.load() } pub(super) fn get_last_remote_consistent_lsn_projected(&mut self) -> Option { @@ -125,18 +120,13 @@ impl UploadQueue { info!("initializing upload queue for empty remote"); - let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) = - tokio::sync::mpsc::channel(16); - let state = UploadQueueInitialized { // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead. latest_files: HashMap::new(), latest_files_changes_since_metadata_upload_scheduled: 0, latest_metadata: metadata.clone(), projected_remote_consistent_lsn: None, - visible_remote_consistent_lsn: None, - visible_remote_consistent_lsn_tx, - visible_remote_consistent_lsn_rx, + visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)), // what follows are boring default initializations task_counter: 0, num_inprogress_layer_uploads: 0, @@ -174,16 +164,14 @@ impl UploadQueue { index_part.metadata.disk_consistent_lsn() ); - let (visible_remote_consistent_lsn_tx, visible_remote_consistent_lsn_rx) = - tokio::sync::mpsc::channel(16); let state = UploadQueueInitialized { latest_files: files, latest_files_changes_since_metadata_upload_scheduled: 0, latest_metadata: index_part.metadata.clone(), projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()), - visible_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()), - visible_remote_consistent_lsn_tx, - visible_remote_consistent_lsn_rx, + visible_remote_consistent_lsn: Arc::new( + index_part.metadata.disk_consistent_lsn().into(), + ), // what follows are boring default initializations task_counter: 0, num_inprogress_layer_uploads: 0,