diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index cccc64685b..0bf851a8d7 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -153,7 +153,7 @@ impl FlushOp { #[derive(Clone, Debug)] pub struct DeletionQueueClient { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::UnboundedSender, executor_tx: tokio::sync::mpsc::Sender, lsn_table: Arc>, @@ -416,7 +416,7 @@ pub enum DeletionQueueError { impl DeletionQueueClient { pub(crate) fn broken() -> Self { // Channels whose receivers are immediately dropped. - let (tx, _rx) = tokio::sync::mpsc::channel(1); + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1); Self { tx, @@ -428,12 +428,12 @@ impl DeletionQueueClient { /// This is cancel-safe. If you drop the future before it completes, the message /// is not pushed, although in the context of the deletion queue it doesn't matter: once /// we decide to do a deletion the decision is always final. - async fn do_push( + fn do_push( &self, - queue: &tokio::sync::mpsc::Sender, + queue: &tokio::sync::mpsc::UnboundedSender, msg: T, ) -> Result<(), DeletionQueueError> { - match queue.send(msg).await { + match queue.send(msg) { Ok(_) => Ok(()), Err(e) => { // This shouldn't happen, we should shut down all tenants before @@ -445,7 +445,7 @@ impl DeletionQueueClient { } } - pub(crate) async fn recover( + pub(crate) fn recover( &self, attached_tenants: HashMap, ) -> Result<(), DeletionQueueError> { @@ -453,7 +453,6 @@ impl DeletionQueueClient { &self.tx, ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }), ) - .await } /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside @@ -526,6 +525,21 @@ impl DeletionQueueClient { return self.flush_immediate().await; } + self.push_layers_sync(tenant_id, timeline_id, current_generation, layers) + } + + /// When a Tenant has a generation, push_layers is always synchronous because + /// the ListValidator channel is an unbounded channel. + /// + /// This can be merged into push_layers when we remove the Generation-less mode + /// support (``) + pub(crate) fn push_layers_sync( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + current_generation: Generation, + layers: Vec<(LayerFileName, Generation)>, + ) -> Result<(), DeletionQueueError> { metrics::DELETION_QUEUE .keys_submitted .inc_by(layers.len() as u64); @@ -539,17 +553,16 @@ impl DeletionQueueClient { objects: Vec::new(), }), ) - .await } /// This is cancel-safe. If you drop the future the flush may still happen in the background. async fn do_flush( &self, - queue: &tokio::sync::mpsc::Sender, + queue: &tokio::sync::mpsc::UnboundedSender, msg: T, rx: tokio::sync::oneshot::Receiver<()>, ) -> Result<(), DeletionQueueError> { - self.do_push(queue, msg).await?; + self.do_push(queue, msg)?; if rx.await.is_err() { // This shouldn't happen if tenants are shut down before deletion queue. If we // encounter a bug like this, then a flusher will incorrectly believe it has flushed @@ -570,6 +583,18 @@ impl DeletionQueueClient { .await } + /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where + /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant + /// detach where flushing is nice but not necessary. + /// + /// This function provides no guarantees of work being done. + pub fn flush_advisory(&self) { + let (flush_op, _) = FlushOp::new(); + + // Transmit the flush message, ignoring any result (such as a closed channel during shutdown). + drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op))); + } + // Wait until all previous deletions are executed pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> { debug!("flush_execute: flushing to deletion lists..."); @@ -586,9 +611,7 @@ impl DeletionQueueClient { // Flush any immediate-mode deletions (the above backend flush will only flush // the executor if deletions had flowed through the backend) debug!("flush_execute: flushing execution..."); - let (flush_op, rx) = FlushOp::new(); - self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx) - .await?; + self.flush_immediate().await?; debug!("flush_execute: finished flushing execution..."); Ok(()) } @@ -643,8 +666,10 @@ impl DeletionQueue { where C: ControlPlaneGenerationsApi + Send + Sync, { - // Deep channel: it consumes deletions from all timelines and we do not want to block them - let (tx, rx) = tokio::sync::mpsc::channel(16384); + // Unbounded channel: enables non-async functions to submit deletions. The actual length is + // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent + // enough to avoid this taking pathologically large amount of memory. + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16); @@ -957,7 +982,7 @@ mod test { // Basic test that the deletion queue processes the deletions we pass into it let ctx = setup("deletion_queue_smoke").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let tenant_id = ctx.harness.tenant_id; @@ -1025,7 +1050,7 @@ mod test { async fn deletion_queue_validation() -> anyhow::Result<()> { let ctx = setup("deletion_queue_validation").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; // Generation that the control plane thinks is current let latest_generation = Generation::new(0xdeadbeef); @@ -1082,7 +1107,7 @@ mod test { // Basic test that the deletion queue processes the deletions we pass into it let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; let tenant_id = ctx.harness.tenant_id; @@ -1145,9 +1170,7 @@ mod test { drop(client); ctx.restart().await; let client = ctx.deletion_queue.new_client(); - client - .recover(HashMap::from([(tenant_id, now_generation)])) - .await?; + client.recover(HashMap::from([(tenant_id, now_generation)]))?; info!("Flush-executing"); client.flush_execute().await?; @@ -1173,7 +1196,7 @@ pub(crate) mod mock { }; pub struct ConsumerState { - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, executor_rx: tokio::sync::mpsc::Receiver, } @@ -1250,7 +1273,7 @@ pub(crate) mod mock { } pub struct MockDeletionQueue { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::UnboundedSender, executor_tx: tokio::sync::mpsc::Sender, executed: Arc, remote_storage: Option, @@ -1260,7 +1283,7 @@ pub(crate) mod mock { impl MockDeletionQueue { pub fn new(remote_storage: Option) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(16384); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384); let executed = Arc::new(AtomicUsize::new(0)); diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index e846340373..21b8c356cd 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -85,7 +85,7 @@ pub(super) struct ListWriter { conf: &'static PageServerConf, // Incoming frontend requests to delete some keys - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, // Outbound requests to the backend to execute deletion lists we have composed. tx: tokio::sync::mpsc::Sender, @@ -111,7 +111,7 @@ impl ListWriter { pub(super) fn new( conf: &'static PageServerConf, - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, tx: tokio::sync::mpsc::Sender, cancel: CancellationToken, ) -> Self { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e0529aeafa..0597a977c0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -575,9 +575,14 @@ async fn tenant_detach_handler( let state = get_state(&request); let conf = state.conf; - mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false)) - .instrument(info_span!("tenant_detach", %tenant_id)) - .await?; + mgr::detach_tenant( + conf, + tenant_id, + detach_ignored.unwrap_or(false), + &state.deletion_queue_client, + ) + .instrument(info_span!("tenant_detach", %tenant_id)) + .await?; json_response(StatusCode::OK, ()) } @@ -1034,7 +1039,7 @@ async fn put_tenant_location_config_handler( // The `Detached` state is special, it doesn't upsert a tenant, it removes // its local disk content and drops it from memory. if let LocationConfigMode::Detached = request_data.config.mode { - mgr::detach_tenant(conf, tenant_id, true) + mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client) .instrument(info_span!("tenant_detach", %tenant_id)) .await?; return json_response(StatusCode::OK, ()); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 57c1b5f070..264f8a1ee0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -45,6 +45,7 @@ use std::sync::{Mutex, RwLock}; use std::time::{Duration, Instant}; use self::config::AttachedLocationConfig; +use self::config::AttachmentMode; use self::config::LocationConf; use self::config::TenantConf; use self::delete::DeleteTenantFlow; @@ -2076,6 +2077,15 @@ impl Tenant { } } } + + pub(crate) fn get_attach_mode(&self) -> AttachmentMode { + self.tenant_conf + .read() + .unwrap() + .location + .attach_mode + .clone() + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a92fbccdea..35b3be6d61 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -24,7 +24,7 @@ use crate::control_plane_client::{ }; use crate::deletion_queue::DeletionQueueClient; use crate::task_mgr::{self, TaskKind}; -use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt}; +use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::{ create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState, @@ -206,8 +206,7 @@ async fn init_load_generations( if resources.remote_storage.is_some() { resources .deletion_queue_client - .recover(generations.clone()) - .await?; + .recover(generations.clone())?; } Ok(Some(generations)) @@ -695,6 +694,18 @@ pub(crate) async fn upsert_location( if let Some(tenant) = shutdown_tenant { let (_guard, progress) = utils::completion::channel(); + + match tenant.get_attach_mode() { + AttachmentMode::Single | AttachmentMode::Multi => { + // Before we leave our state as the presumed holder of the latest generation, + // flush any outstanding deletions to reduce the risk of leaking objects. + deletion_queue_client.flush_advisory() + } + AttachmentMode::Stale => { + // If we're stale there's not point trying to flush deletions + } + }; + info!("Shutting down attached tenant"); match tenant.shutdown(progress, false).await { Ok(()) => {} @@ -849,8 +860,16 @@ pub async fn detach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, detach_ignored: bool, + deletion_queue_client: &DeletionQueueClient, ) -> Result<(), TenantStateError> { - let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?; + let tmp_path = detach_tenant0( + conf, + &TENANTS, + tenant_id, + detach_ignored, + deletion_queue_client, + ) + .await?; // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory. // After a tenant is detached, there are no more task_mgr tasks for that tenant_id. let task_tenant_id = None; @@ -875,6 +894,7 @@ async fn detach_tenant0( tenants: &tokio::sync::RwLock, tenant_id: TenantId, detach_ignored: bool, + deletion_queue_client: &DeletionQueueClient, ) -> Result { let tenant_dir_rename_operation = |tenant_id_to_clean| async move { let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean); @@ -886,6 +906,10 @@ async fn detach_tenant0( let removal_result = remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await; + // Flush pending deletions, so that they have a good chance of passing validation + // before this tenant is potentially re-attached elsewhere. + deletion_queue_client.flush_advisory(); + // Ignored tenants are not present in memory and will bail the removal from memory operation. // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then. if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {