diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 795914107e..ec0242b1a1 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,7 +8,6 @@ use anyhow::{anyhow, Context}; use clap::{Arg, ArgAction, Command}; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; -use pageserver::control_plane_client::ControlPlaneClient; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; use pageserver::task_mgr::WALRECEIVER_RUNTIME; @@ -355,11 +354,7 @@ fn start_pageserver( let remote_storage = create_remote_storage_client(conf)?; // Set up deletion queue - let (deletion_queue, deletion_workers) = DeletionQueue::new( - remote_storage.clone(), - ControlPlaneClient::new(conf, &shutdown_pageserver), - conf, - ); + let (deletion_queue, deletion_workers) = DeletionQueue::new(remote_storage.clone(), conf); if let Some(deletion_workers) = deletion_workers { deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); } diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs deleted file mode 100644 index 0953af0a47..0000000000 --- a/pageserver/src/control_plane_client.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::collections::HashMap; - -use pageserver_api::control_api::{ - ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse, -}; -use serde::{de::DeserializeOwned, Serialize}; -use tokio_util::sync::CancellationToken; -use url::Url; -use utils::{ - backoff, - generation::Generation, - id::{NodeId, TenantId}, -}; - -use crate::config::PageServerConf; - -/// The Pageserver's client for using the control plane API: this is a small subset -/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md) -pub struct ControlPlaneClient { - http_client: reqwest::Client, - base_url: Url, - node_id: NodeId, - cancel: CancellationToken, -} - -#[async_trait::async_trait] -pub trait ControlPlaneGenerationsApi { - async fn re_attach(&self) -> anyhow::Result>; - async fn validate( - &self, - tenants: Vec<(TenantId, Generation)>, - ) -> anyhow::Result>; -} - -impl ControlPlaneClient { - /// A None return value indicates that the input `conf` object does not have control - /// plane API enabled. - pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option { - let mut url = match conf.control_plane_api.as_ref() { - Some(u) => u.clone(), - None => return None, - }; - - if let Ok(mut segs) = url.path_segments_mut() { - // This ensures that `url` ends with a slash if it doesn't already. - // That way, we can subsequently use join() to safely attach extra path elements. - segs.pop_if_empty().push(""); - } - - let client = reqwest::ClientBuilder::new() - .build() - .expect("Failed to construct http client"); - - Some(Self { - http_client: client, - base_url: url, - node_id: conf.id, - cancel: cancel.clone(), - }) - } - - async fn retry_http_forever(&self, url: &url::Url, request: R) -> Result - where - R: Serialize, - T: DeserializeOwned, - { - #[derive(thiserror::Error, Debug)] - enum RemoteAttemptError { - #[error("shutdown")] - Shutdown, - #[error("remote: {0}")] - Remote(reqwest::Error), - } - - match backoff::retry( - || async { - let response = self - .http_client - .post(url.clone()) - .json(&request) - .send() - .await - .map_err(RemoteAttemptError::Remote)?; - - response - .error_for_status_ref() - .map_err(RemoteAttemptError::Remote)?; - response - .json::() - .await - .map_err(RemoteAttemptError::Remote) - }, - |_| false, - 3, - u32::MAX, - "calling control plane generation validation API", - backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown), - ) - .await - { - Err(RemoteAttemptError::Shutdown) => Err(anyhow::anyhow!("Shutting down")), - Err(RemoteAttemptError::Remote(_)) => { - panic!("We retry forever, this should never be reached"); - } - Ok(r) => Ok(r), - } - } -} - -#[async_trait::async_trait] -impl ControlPlaneGenerationsApi for ControlPlaneClient { - /// Block until we get a successful response - async fn re_attach(&self) -> anyhow::Result> { - let re_attach_path = self - .base_url - .join("re-attach") - .expect("Failed to build re-attach path"); - let request = ReAttachRequest { - node_id: self.node_id, - }; - - let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?; - tracing::info!( - "Received re-attach response with {} tenants", - response.tenants.len() - ); - - Ok(response - .tenants - .into_iter() - .map(|t| (t.id, Generation::new(t.generation))) - .collect::>()) - } - - async fn validate( - &self, - tenants: Vec<(TenantId, Generation)>, - ) -> anyhow::Result> { - let re_attach_path = self - .base_url - .join("validate") - .expect("Failed to build validate path"); - - let request = ValidateRequest { - tenants: tenants - .into_iter() - .map(|(id, gen)| ValidateRequestTenant { - id, - gen: gen - .into() - .expect("Generation should always be valid for a Tenant doing deletions"), - }) - .collect(), - }; - - let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?; - - Ok(response - .tenants - .into_iter() - .map(|rt| (rt.id, rt.valid)) - .collect()) - } -} diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index a65e39dcd0..27f5441dbc 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -1,22 +1,8 @@ -mod backend; mod executor; -mod frontend; -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; - -use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::metrics::DELETION_QUEUE_SUBMITTED; use crate::tenant::remote_timeline_client::remote_layer_path; -use crate::tenant::remote_timeline_client::remote_timeline_path; -use anyhow::Context; -use hex::FromHex; use remote_storage::{GenericRemoteStorage, RemotePath}; -use serde::Deserialize; -use serde::Serialize; -use serde_with::serde_as; use thiserror::Error; use tokio; use tokio_util::sync::CancellationToken; @@ -24,17 +10,9 @@ use tracing::Instrument; use tracing::{self, debug, error}; use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; -use utils::lsn::AtomicLsn; -use utils::lsn::Lsn; -use self::backend::BackendQueueWorker; use self::executor::ExecutorWorker; -use self::frontend::DeletionOp; -use self::frontend::FrontendQueueWorker; -use self::frontend::RecoverOp; -use backend::BackendQueueMessage; use executor::ExecutorMessage; -use frontend::FrontendQueueMessage; use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; @@ -81,32 +59,12 @@ pub struct DeletionQueue { /// Opaque wrapper around individual worker tasks, to avoid making the /// worker objects themselves public -pub struct DeletionQueueWorkers -where - C: ControlPlaneGenerationsApi + Send + Sync, -{ - frontend: FrontendQueueWorker, - backend: BackendQueueWorker, +pub struct DeletionQueueWorkers { executor: ExecutorWorker, } -impl DeletionQueueWorkers -where - C: ControlPlaneGenerationsApi + Send + Sync + 'static, -{ +impl DeletionQueueWorkers { pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> { - let jh_frontend = runtime.spawn(async move { - self.frontend - .background() - .instrument(tracing::info_span!(parent:None, "deletion frontend")) - .await - }); - let jh_backend = runtime.spawn(async move { - self.backend - .background() - .instrument(tracing::info_span!(parent:None, "deletion backend")) - .await - }); let jh_executor = runtime.spawn(async move { self.executor .background() @@ -116,8 +74,6 @@ where runtime.spawn({ async move { - jh_frontend.await.expect("error joining frontend worker"); - jh_backend.await.expect("error joining backend worker"); drop(jh_executor.await.expect("error joining executor worker")); } }) @@ -140,273 +96,7 @@ impl FlushOp { #[derive(Clone, Debug)] pub struct DeletionQueueClient { - tx: tokio::sync::mpsc::Sender, executor_tx: tokio::sync::mpsc::Sender, - - lsn_table: Arc>, -} - -#[derive(Debug, Serialize, Deserialize)] -struct TenantDeletionList { - /// For each Timeline, a list of key fragments to append to the timeline remote path - /// when reconstructing a full key - #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] - timelines: HashMap>, - - /// The generation in which this deletion was emitted: note that this may not be the - /// same as the generation of any layers being deleted. The generation of the layer - /// has already been absorbed into the keys in `objects` - generation: Generation, -} - -impl TenantDeletionList { - pub(crate) fn len(&self) -> usize { - self.timelines.values().map(|v| v.len()).sum() - } -} - -/// For HashMaps using a `hex` compatible key, where we would like to encode the key as a string -fn to_hex_map(input: &HashMap, serializer: S) -> Result -where - S: serde::Serializer, - V: Serialize, - I: AsRef<[u8]>, -{ - let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone())); - - transformed - .collect::>() - .serialize(serializer) -} - -/// For HashMaps using a FromHex key, where we would like to decode the key -fn from_hex_map<'de, D, V, I>(deserializer: D) -> Result, D::Error> -where - D: serde::de::Deserializer<'de>, - V: Deserialize<'de>, - I: FromHex + std::hash::Hash + Eq, -{ - let hex_map = HashMap::::deserialize(deserializer)?; - hex_map - .into_iter() - .map(|(k, v)| { - I::from_hex(k) - .map(|k| (k, v)) - .map_err(|_| serde::de::Error::custom("Invalid hex ID")) - }) - .collect() -} - -#[serde_as] -#[derive(Debug, Serialize, Deserialize)] -struct DeletionList { - /// Serialization version, for future use - version: u8, - - /// Used for constructing a unique key for each deletion list we write out. - sequence: u64, - - /// To avoid repeating tenant/timeline IDs in every key, we store keys in - /// nested HashMaps by TenantTimelineID. Each Tenant only appears once - /// with one unique generation ID: if someone tries to push a second generation - /// ID for the same tenant, we will start a new DeletionList. - #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] - tenants: HashMap, - - /// Avoid having to walk `tenants` to calculate the number of keys in - /// the nested deletion lists - size: usize, - - /// Set to true when the list has undergone validation with the control - /// plane and the remaining contents of `tenants` are valid. A list may - /// also be implicitly marked valid by DeletionHeader.validated_sequence - /// advancing to >= DeletionList.sequence - #[serde(default)] - #[serde(skip_serializing_if = "std::ops::Not::not")] - validated: bool, -} - -#[serde_as] -#[derive(Debug, Serialize, Deserialize)] -struct DeletionHeader { - /// Serialization version, for future use - version: u8, - - /// The highest sequence number (inclusive) that has been validated. All deletion - /// lists on disk with a sequence <= this value are safe to execute. - validated_sequence: u64, -} - -impl DeletionHeader { - const VERSION_LATEST: u8 = 1; - - fn new(validated_sequence: u64) -> Self { - Self { - version: Self::VERSION_LATEST, - validated_sequence, - } - } - - async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { - debug!("Saving deletion list header {:?}", self); - let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; - let header_path = conf.deletion_header_path(); - - tokio::fs::write(&header_path, header_bytes) - .await - .map_err(|e| anyhow::anyhow!(e)) - } -} - -impl DeletionList { - const VERSION_LATEST: u8 = 1; - fn new(sequence: u64) -> Self { - Self { - version: Self::VERSION_LATEST, - sequence, - tenants: HashMap::new(), - size: 0, - validated: false, - } - } - - fn drain(&mut self) -> Self { - let mut tenants = HashMap::new(); - std::mem::swap(&mut self.tenants, &mut tenants); - let other = Self { - version: Self::VERSION_LATEST, - sequence: self.sequence, - tenants, - size: self.size, - validated: self.validated, - }; - self.size = 0; - other - } - - fn is_empty(&self) -> bool { - self.tenants.is_empty() - } - - fn len(&self) -> usize { - self.size - } - - /// Returns true if the push was accepted, false if the caller must start a new - /// deletion list. - fn push( - &mut self, - tenant: &TenantId, - timeline: &TimelineId, - generation: Generation, - objects: &mut Vec, - ) -> bool { - if objects.is_empty() { - // Avoid inserting an empty TimelineDeletionList: this preserves the property - // that if we have no keys, then self.objects is empty (used in Self::is_empty) - return true; - } - - let tenant_entry = self - .tenants - .entry(*tenant) - .or_insert_with(|| TenantDeletionList { - timelines: HashMap::new(), - generation, - }); - - if tenant_entry.generation != generation { - // Only one generation per tenant per list: signal to - // caller to start a new list. - return false; - } - - let timeline_entry = tenant_entry - .timelines - .entry(*timeline) - .or_insert_with(Vec::new); - - let timeline_remote_path = remote_timeline_path(tenant, timeline); - - self.size += objects.len(); - timeline_entry.extend(objects.drain(..).map(|p| { - p.strip_prefix(&timeline_remote_path) - .expect("Timeline paths always start with the timeline prefix") - .to_string_lossy() - .to_string() - })); - true - } - - fn drain_paths(&mut self) -> Vec { - let mut tenants = HashMap::new(); - std::mem::swap(&mut tenants, &mut self.tenants); - - let mut result = Vec::new(); - for (tenant, tenant_deletions) in tenants.into_iter() { - for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() { - let timeline_remote_path = remote_timeline_path(&tenant, &timeline); - result.extend( - timeline_layers - .into_iter() - .map(|l| timeline_remote_path.join(&PathBuf::from(l))), - ); - } - } - - result - } - - async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { - let path = conf.deletion_list_path(self.sequence); - - let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - tokio::fs::write(&path, &bytes).await?; - tokio::fs::File::open(&path).await?.sync_all().await?; - Ok(()) - } -} - -impl std::fmt::Display for DeletionList { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "DeletionList", - self.sequence, - self.tenants.len(), - self.size - ) - } -} - -struct PendingLsn { - projected: Lsn, - result_slot: Arc, -} - -struct TenantLsnState { - timelines: HashMap, - - // In what generation was the most recent update proposed? - generation: Generation, -} - -struct VisibleLsnUpdates { - tenants: HashMap, -} - -impl VisibleLsnUpdates { - fn new() -> Self { - Self { - tenants: HashMap::new(), - } - } -} - -impl std::fmt::Debug for VisibleLsnUpdates { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len()) - } } #[derive(Error, Debug)] @@ -418,82 +108,8 @@ pub enum DeletionQueueError { impl DeletionQueueClient { pub(crate) fn broken() -> Self { // Channels whose receivers are immediately dropped. - let (tx, _rx) = tokio::sync::mpsc::channel(1); let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1); - Self { - tx, - executor_tx, - lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())), - } - } - - async fn do_push( - &self, - queue: &tokio::sync::mpsc::Sender, - msg: T, - ) -> Result<(), DeletionQueueError> { - match queue.send(msg).await { - Ok(_) => Ok(()), - Err(e) => { - // This shouldn't happen, we should shut down all tenants before - // we shut down the global delete queue. If we encounter a bug like this, - // we may leak objects as deletions won't be processed. - error!("Deletion queue closed while pushing, shutting down? ({e})"); - Err(DeletionQueueError::ShuttingDown) - } - } - } - - pub(crate) async fn recover( - &self, - attached_tenants: HashMap, - ) -> Result<(), DeletionQueueError> { - self.do_push( - &self.tx, - FrontendQueueMessage::Recover(RecoverOp { attached_tenants }), - ) - .await - } - - /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside - /// world, it must validate its generation number before doing so. Rather than do this synchronously, - /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most - /// recently validated separately. - /// - /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The - /// backend will later wake up and notice that the tenant's generation requires validation. - pub(crate) async fn update_remote_consistent_lsn( - &self, - tenant_id: TenantId, - timeline_id: TimelineId, - current_generation: Generation, - lsn: Lsn, - result_slot: Arc, - ) { - let mut locked = self - .lsn_table - .write() - .expect("Lock should never be poisoned"); - - let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState { - timelines: HashMap::new(), - generation: current_generation, - }); - - if tenant_entry.generation != current_generation { - // Generation might have changed if we were detached and then re-attached: in this case, - // state from the previous generation cannot be trusted. - tenant_entry.timelines.clear(); - tenant_entry.generation = current_generation; - } - - tenant_entry.timelines.insert( - timeline_id, - PendingLsn { - projected: lsn, - result_slot, - }, - ); + Self { executor_tx } } /// Submit a list of layers for deletion: this function will return before the deletion is @@ -510,89 +126,21 @@ impl DeletionQueueClient { current_generation: Generation, layers: Vec<(LayerFileName, Generation)>, ) -> Result<(), DeletionQueueError> { - if current_generation.is_none() { - debug!("Enqueuing deletions in legacy mode, skipping queue"); - let mut layer_paths = Vec::new(); - for (layer, generation) in layers { - layer_paths.push(remote_layer_path( - &tenant_id, - &timeline_id, - &layer, - generation, - )); - } - self.push_immediate(layer_paths).await?; - return self.flush_immediate().await; + if !current_generation.is_none() { + unimplemented!("generation support not yet implemented"); } - - DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64); - self.do_push( - &self.tx, - FrontendQueueMessage::Delete(DeletionOp { - tenant_id, - timeline_id, - layers, - generation: current_generation, - objects: Vec::new(), - }), - ) - .await - } - - async fn do_flush( - &self, - queue: &tokio::sync::mpsc::Sender, - msg: T, - rx: tokio::sync::oneshot::Receiver<()>, - ) -> Result<(), DeletionQueueError> { - self.do_push(queue, msg).await?; - if rx.await.is_err() { - // This shouldn't happen if tenants are shut down before deletion queue. If we - // encounter a bug like this, then a flusher will incorrectly believe it has flushed - // when it hasn't, possibly leading to leaking objects. - error!("Deletion queue dropped flush op while client was still waiting"); - Err(DeletionQueueError::ShuttingDown) - } else { - Ok(()) + debug!("Enqueuing deletions in legacy mode, skipping queue"); + let mut layer_paths = Vec::new(); + for (layer, generation) in layers { + layer_paths.push(remote_layer_path( + &tenant_id, + &timeline_id, + &layer, + generation, + )); } - } - - /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList) - pub async fn flush(&self) -> Result<(), DeletionQueueError> { - let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - self.do_flush(&self.tx, FrontendQueueMessage::Flush(FlushOp { tx }), rx) - .await - } - - // Wait until all previous deletions are executed - pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> { - debug!("flush_execute: flushing to deletion lists..."); - // Flush any buffered work to deletion lists - self.flush().await?; - - // Flush the backend into the executor of deletion lists - let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - debug!("flush_execute: flushing backend..."); - self.do_flush( - &self.tx, - FrontendQueueMessage::FlushExecute(FlushOp { tx }), - rx, - ) - .await?; - debug!("flush_execute: finished flushing backend..."); - - // Flush any immediate-mode deletions (the above backend flush will only flush - // the executor if deletions had flowed through the backend) - debug!("flush_execute: flushing execution..."); - let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - self.do_flush( - &self.executor_tx, - ExecutorMessage::Flush(FlushOp { tx }), - rx, - ) - .await?; - debug!("flush_execute: finished flushing execution..."); - Ok(()) + self.push_immediate(layer_paths).await?; + return self.flush_immediate().await; } /// This interface bypasses the persistent deletion queue, and any validation @@ -635,26 +183,14 @@ impl DeletionQueue { /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice. /// /// If remote_storage is None, then the returned workers will also be None. - pub fn new( + pub fn new( remote_storage: Option, - control_plane_client: Option, - conf: &'static PageServerConf, - ) -> (Self, Option>) - where - C: ControlPlaneGenerationsApi + Send + Sync, - { - // Deep channel: it consumes deletions from all timelines and we do not want to block them - let (tx, rx) = tokio::sync::mpsc::channel(16384); - - // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions - let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16); - + _conf: &'static PageServerConf, + ) -> (Self, Option) { // Shallow channel: it carries lists of paths, and we expect the main queueing to // happen in the backend (persistent), not in this queue. let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16); - let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())); - // The deletion queue has an independent cancellation token to // the general pageserver shutdown token, because it stays alive a bit // longer to flush after Tenants have all been torn down. @@ -664,11 +200,7 @@ impl DeletionQueue { None => { return ( Self { - client: DeletionQueueClient { - tx, - executor_tx, - lsn_table: lsn_table.clone(), - }, + client: DeletionQueueClient { executor_tx }, cancel, }, None, @@ -680,505 +212,23 @@ impl DeletionQueue { ( Self { client: DeletionQueueClient { - tx, executor_tx: executor_tx.clone(), - lsn_table: lsn_table.clone(), }, cancel: cancel.clone(), }, Some(DeletionQueueWorkers { - frontend: FrontendQueueWorker::new(conf, rx, backend_tx, cancel.clone()), - backend: BackendQueueWorker::new( - conf, - backend_rx, - executor_tx, - control_plane_client, - lsn_table.clone(), - cancel.clone(), - ), executor: ExecutorWorker::new(remote_storage, executor_rx, cancel.clone()), }), ) } - - pub async fn shutdown(&mut self, timeout: Duration) { - self.cancel.cancel(); - - match tokio::time::timeout(timeout, self.client.flush()).await { - Ok(flush_r) => { - match flush_r { - Ok(()) => { - tracing::info!("Deletion queue flushed successfully on shutdown") - } - Err(e) => { - match e { - DeletionQueueError::ShuttingDown => { - // This is not harmful for correctness, but is unexpected: the deletion - // queue's workers should stay alive as long as there are any client handles instantiated. - tracing::warn!("Deletion queue stopped prematurely"); - } - } - } - } - } - Err(e) => { - tracing::warn!("Timed out flushing deletion queue on shutdown ({e})") - } - } - } -} - -#[cfg(test)] -mod test { - use hex_literal::hex; - use std::{ - io::ErrorKind, - path::{Path, PathBuf}, - time::Duration, - }; - use tracing::info; - - use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; - use tokio::task::JoinHandle; - - use crate::{ - repository::Key, - tenant::{ - harness::TenantHarness, remote_timeline_client::remote_timeline_path, - storage_layer::DeltaFileName, - }, - }; - - use super::*; - pub const TIMELINE_ID: TimelineId = - TimelineId::from_array(hex!("11223344556677881122334455667788")); - - pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName { - key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF), - lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51), - }); - - // When you need a second layer in a test. - pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName { - key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF), - lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61), - }); - - struct TestSetup { - harness: TenantHarness, - remote_fs_dir: PathBuf, - storage: GenericRemoteStorage, - mock_control_plane: MockControlPlane, - deletion_queue: DeletionQueue, - worker_join: JoinHandle<()>, - } - - impl TestSetup { - /// Simulate a pageserver restart by destroying and recreating the deletion queue - async fn restart(&mut self) { - let (deletion_queue, workers) = DeletionQueue::new( - Some(self.storage.clone()), - Some(self.mock_control_plane.clone()), - self.harness.conf, - ); - - tracing::debug!("Spawning worker for new queue queue"); - let worker_join = workers - .unwrap() - .spawn_with(&tokio::runtime::Handle::current()); - - let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join); - let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue); - - tracing::debug!("Joining worker from previous queue"); - old_deletion_queue.cancel.cancel(); - old_worker_join - .await - .expect("Failed to join workers for previous deletion queue"); - } - - fn set_latest_generation(&self, gen: Generation) { - let tenant_id = self.harness.tenant_id; - self.mock_control_plane - .latest_generation - .lock() - .unwrap() - .insert(tenant_id, gen); - } - - /// Returns remote layer file name, suitable for use in assert_remote_files - fn write_remote_layer( - &self, - file_name: LayerFileName, - gen: Generation, - ) -> anyhow::Result { - let tenant_id = self.harness.tenant_id; - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); - let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path()); - std::fs::create_dir_all(&remote_timeline_path)?; - let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix()); - - let content: Vec = format!("placeholder contents of {file_name}").into(); - - std::fs::write( - remote_timeline_path.join(remote_layer_file_name.clone()), - content, - )?; - - Ok(remote_layer_file_name) - } - } - - #[derive(Debug, Clone)] - struct MockControlPlane { - pub latest_generation: std::sync::Arc>>, - } - - impl MockControlPlane { - fn new() -> Self { - Self { - latest_generation: Arc::new(std::sync::Mutex::new(HashMap::new())), - } - } - } - - unsafe impl Send for MockControlPlane {} - unsafe impl Sync for MockControlPlane {} - - #[async_trait::async_trait] - impl ControlPlaneGenerationsApi for MockControlPlane { - #[allow(clippy::diverging_sub_expression)] // False positive via async_trait - async fn re_attach(&self) -> anyhow::Result> { - unimplemented!() - } - async fn validate( - &self, - tenants: Vec<(TenantId, Generation)>, - ) -> anyhow::Result> { - let mut result = HashMap::new(); - - let latest_generation = self.latest_generation.lock().unwrap(); - - for (tenant_id, generation) in tenants { - if let Some(latest) = latest_generation.get(&tenant_id) { - result.insert(tenant_id, *latest == generation); - } - } - - Ok(result) - } - } - - fn setup(test_name: &str) -> anyhow::Result { - let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}"))); - let harness = TenantHarness::create(test_name)?; - - // We do not load() the harness: we only need its config and remote_storage - - // Set up a GenericRemoteStorage targetting a directory - let remote_fs_dir = harness.conf.workdir.join("remote_fs"); - std::fs::create_dir_all(remote_fs_dir)?; - let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?; - let storage_config = RemoteStorageConfig { - max_concurrent_syncs: std::num::NonZeroUsize::new( - remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, - ) - .unwrap(), - max_sync_errors: std::num::NonZeroU32::new( - remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, - ) - .unwrap(), - storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), - }; - let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); - - let mock_control_plane = MockControlPlane::new(); - - let (deletion_queue, worker) = DeletionQueue::new( - Some(storage.clone()), - Some(mock_control_plane.clone()), - harness.conf, - ); - - let worker = worker.unwrap(); - let worker_join = worker.spawn_with(&tokio::runtime::Handle::current()); - - Ok(TestSetup { - harness, - remote_fs_dir, - storage, - mock_control_plane, - deletion_queue, - worker_join, - }) - } - - // TODO: put this in a common location so that we can share with remote_timeline_client's tests - fn assert_remote_files(expected: &[&str], remote_path: &Path) { - let mut expected: Vec = expected.iter().map(|x| String::from(*x)).collect(); - expected.sort(); - - let mut found: Vec = Vec::new(); - let dir = match std::fs::read_dir(remote_path) { - Ok(d) => d, - Err(e) => { - if e.kind() == ErrorKind::NotFound { - if expected.is_empty() { - // We are asserting prefix is empty: it is expected that the dir is missing - return; - } else { - assert_eq!(expected, Vec::::new()); - unreachable!(); - } - } else { - panic!( - "Unexpected error listing {}: {e}", - remote_path.to_string_lossy() - ); - } - } - }; - - for entry in dir.flatten() { - let entry_name = entry.file_name(); - let fname = entry_name.to_str().unwrap(); - found.push(String::from(fname)); - } - found.sort(); - - assert_eq!(expected, found); - } - - fn assert_local_files(expected: &[&str], directory: &Path) { - let dir = match std::fs::read_dir(directory) { - Ok(d) => d, - Err(_) => { - assert_eq!(expected, &Vec::::new()); - return; - } - }; - let mut found = Vec::new(); - for dentry in dir { - let dentry = dentry.unwrap(); - let file_name = dentry.file_name(); - let file_name_str = file_name.to_string_lossy(); - found.push(file_name_str.to_string()); - } - found.sort(); - assert_eq!(expected, found); - } - - #[tokio::test] - async fn deletion_queue_smoke() -> anyhow::Result<()> { - // Basic test that the deletion queue processes the deletions we pass into it - let ctx = setup("deletion_queue_smoke").expect("Failed test setup"); - let client = ctx.deletion_queue.new_client(); - - let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); - let tenant_id = ctx.harness.tenant_id; - - let content: Vec = "victim1 contents".into(); - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); - let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); - let deletion_prefix = ctx.harness.conf.deletion_prefix(); - - // Exercise the distinction between the generation of the layers - // we delete, and the generation of the running Tenant. - let layer_generation = Generation::new(0xdeadbeef); - let now_generation = Generation::new(0xfeedbeef); - - let remote_layer_file_name_1 = - format!("{}{}", layer_file_name_1, layer_generation.get_suffix()); - - // Set mock control plane state to valid for our generation - ctx.set_latest_generation(now_generation); - - // Inject a victim file to remote storage - info!("Writing"); - std::fs::create_dir_all(&remote_timeline_path)?; - std::fs::write( - remote_timeline_path.join(remote_layer_file_name_1.clone()), - content, - )?; - assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); - - // File should still be there after we push it to the queue (we haven't pushed enough to flush anything) - info!("Pushing"); - client - .push_layers( - tenant_id, - TIMELINE_ID, - now_generation, - [(layer_file_name_1.clone(), layer_generation)].to_vec(), - ) - .await?; - assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); - - assert_local_files(&[], &deletion_prefix); - - // File should still be there after we write a deletion list (we haven't pushed enough to execute anything) - info!("Flushing"); - client.flush().await?; - assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); - assert_local_files(&["0000000000000001-01.list"], &deletion_prefix); - - // File should go away when we execute - info!("Flush-executing"); - client.flush_execute().await?; - assert_remote_files(&[], &remote_timeline_path); - assert_local_files(&["header-01"], &deletion_prefix); - - // Flushing on an empty queue should succeed immediately, and not write any lists - info!("Flush-executing on empty"); - client.flush_execute().await?; - assert_local_files(&["header-01"], &deletion_prefix); - - Ok(()) - } - - #[tokio::test] - async fn deletion_queue_validation() -> anyhow::Result<()> { - let ctx = setup("deletion_queue_validation").expect("Failed test setup"); - let client = ctx.deletion_queue.new_client(); - - // Generation that the control plane thinks is current - let latest_generation = Generation::new(0xdeadbeef); - // Generation that our DeletionQueue thinks the tenant is running with - let stale_generation = latest_generation.previous(); - // Generation that our example layer file was written with - let layer_generation = stale_generation.previous(); - - ctx.set_latest_generation(latest_generation); - - let tenant_id = ctx.harness.tenant_id; - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); - let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); - - // Initial state: a remote layer exists - let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?; - assert_remote_files(&[&remote_layer_name], &remote_timeline_path); - - tracing::debug!("Pushing..."); - client - .push_layers( - tenant_id, - TIMELINE_ID, - stale_generation, - [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), - ) - .await?; - - // We enqueued the operation in a stale generation: it should have failed validation - tracing::debug!("Flushing..."); - tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??; - assert_remote_files(&[&remote_layer_name], &remote_timeline_path); - - tracing::debug!("Pushing..."); - client - .push_layers( - tenant_id, - TIMELINE_ID, - latest_generation, - [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), - ) - .await?; - - // We enqueued the operation in a fresh generation: it should have passed validation - tracing::debug!("Flushing..."); - tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??; - assert_remote_files(&[], &remote_timeline_path); - - Ok(()) - } - - #[tokio::test] - async fn deletion_queue_recovery() -> anyhow::Result<()> { - // Basic test that the deletion queue processes the deletions we pass into it - let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup"); - let client = ctx.deletion_queue.new_client(); - - let tenant_id = ctx.harness.tenant_id; - - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); - let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); - let deletion_prefix = ctx.harness.conf.deletion_prefix(); - - let layer_generation = Generation::new(0xdeadbeef); - let now_generation = Generation::new(0xfeedbeef); - - // Inject a deletion in the generation before generation_now: after restart, - // this deletion should _not_ get executed (only the immediately previous - // generation gets that treatment) - let remote_layer_file_name_historical = - ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?; - client - .push_layers( - tenant_id, - TIMELINE_ID, - now_generation.previous(), - [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), - ) - .await?; - - // Inject a deletion in the generation before generation_now: after restart, - // this deletion should get executed, because we execute deletions in the - // immediately previous generation on the same node. - let remote_layer_file_name_previous = - ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?; - client - .push_layers( - tenant_id, - TIMELINE_ID, - now_generation, - [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(), - ) - .await?; - - client.flush().await?; - assert_remote_files( - &[ - &remote_layer_file_name_historical, - &remote_layer_file_name_previous, - ], - &remote_timeline_path, - ); - - // Different generatinos for the same tenant will cause two separate - // deletion lists to be emitted. - assert_local_files( - &["0000000000000001-01.list", "0000000000000002-01.list"], - &deletion_prefix, - ); - - // Simulate a node restart: the latest generation advances - let now_generation = now_generation.next(); - ctx.set_latest_generation(now_generation); - - // Restart the deletion queue - drop(client); - ctx.restart().await; - let client = ctx.deletion_queue.new_client(); - client - .recover(HashMap::from([(tenant_id, now_generation)])) - .await?; - - info!("Flush-executing"); - client.flush_execute().await?; - // The deletion from immediately prior generation was executed, the one from - // an older generation was not. - assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path); - Ok(()) - } } /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it. #[cfg(test)] -pub mod mock { +pub(crate) mod mock { use tracing::info; - use crate::tenant::remote_timeline_client::remote_layer_path; - use super::*; use std::sync::{ atomic::{AtomicUsize, Ordering}, @@ -1186,7 +236,6 @@ pub mod mock { }; pub struct ConsumerState { - rx: tokio::sync::mpsc::Receiver, executor_rx: tokio::sync::mpsc::Receiver, } @@ -1218,73 +267,28 @@ pub mod mock { } } - while let Ok(msg) = self.rx.try_recv() { - match msg { - FrontendQueueMessage::Delete(op) => { - let mut objects = op.objects; - for (layer, generation) in op.layers { - objects.push(remote_layer_path( - &op.tenant_id, - &op.timeline_id, - &layer, - generation, - )); - } - - for path in objects { - info!("Executing deletion {path}"); - match remote_storage.delete(&path).await { - Ok(_) => { - debug!("Deleted {path}"); - } - Err(e) => { - error!("Failed to delete {path}, leaking object! ({e})"); - } - } - executed += 1; - } - } - FrontendQueueMessage::Flush(op) => { - op.fire(); - } - FrontendQueueMessage::FlushExecute(op) => { - // We have already executed all prior deletions because mock does them inline - op.fire(); - } - FrontendQueueMessage::Recover(_) => { - // no-op in mock - } - } - info!("All pending deletions have been executed"); - } - executed } } pub struct MockDeletionQueue { - tx: tokio::sync::mpsc::Sender, executor_tx: tokio::sync::mpsc::Sender, executed: Arc, remote_storage: Option, consumer: std::sync::Mutex, - lsn_table: Arc>, } impl MockDeletionQueue { pub fn new(remote_storage: Option) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(16384); let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384); let executed = Arc::new(AtomicUsize::new(0)); Self { - tx, executor_tx, executed, remote_storage, - consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }), - lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())), + consumer: std::sync::Mutex::new(ConsumerState { executor_rx }), } } @@ -1305,9 +309,7 @@ pub mod mock { pub(crate) fn new_client(&self) -> DeletionQueueClient { DeletionQueueClient { - tx: self.tx.clone(), executor_tx: self.executor_tx.clone(), - lsn_table: self.lsn_table.clone(), } } } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs deleted file mode 100644 index 7777ded893..0000000000 --- a/pageserver/src/deletion_queue/backend.rs +++ /dev/null @@ -1,349 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - -use tokio_util::sync::CancellationToken; -use tracing::debug; -use tracing::info; -use tracing::warn; - -use crate::config::PageServerConf; -use crate::control_plane_client::ControlPlaneGenerationsApi; -use crate::metrics::DELETION_QUEUE_DROPPED; -use crate::metrics::DELETION_QUEUE_ERRORS; - -use super::executor::ExecutorMessage; -use super::DeletionHeader; -use super::DeletionList; -use super::DeletionQueueError; -use super::FlushOp; -use super::VisibleLsnUpdates; - -// After this length of time, do any validation work that is pending, -// even if we haven't accumulated many keys to delete. -// -// This also causes updates to remote_consistent_lsn to be validated, even -// if there were no deletions enqueued. -const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10); - -// If we have received this number of keys, proceed with attempting to execute -const AUTOFLUSH_KEY_COUNT: usize = 16384; - -#[derive(Debug)] -pub(super) enum BackendQueueMessage { - Delete(DeletionList), - Flush(FlushOp), -} -pub(super) struct BackendQueueWorker -where - C: ControlPlaneGenerationsApi, -{ - conf: &'static PageServerConf, - rx: tokio::sync::mpsc::Receiver, - tx: tokio::sync::mpsc::Sender, - - // Client for calling into control plane API for validation of deletes - control_plane_client: Option, - - // DeletionLists which are waiting generation validation. Not safe to - // execute until [`validate`] has processed them. - pending_lists: Vec, - - // DeletionLists which have passed validation and are ready to execute. - validated_lists: Vec, - - // Sum of all the lengths of lists in pending_lists - pending_key_count: usize, - - // Lsn validation state: we read projected LSNs and write back visible LSNs - // after validation. This is the LSN equivalent of `pending_validation_lists`: - // it is drained in [`validate`] - lsn_table: Arc>, - - cancel: CancellationToken, -} - -impl BackendQueueWorker -where - C: ControlPlaneGenerationsApi, -{ - pub(super) fn new( - conf: &'static PageServerConf, - rx: tokio::sync::mpsc::Receiver, - tx: tokio::sync::mpsc::Sender, - control_plane_client: Option, - lsn_table: Arc>, - cancel: CancellationToken, - ) -> Self { - Self { - conf, - rx, - tx, - control_plane_client, - lsn_table, - pending_lists: Vec::new(), - validated_lists: Vec::new(), - pending_key_count: 0, - cancel, - } - } - - async fn cleanup_lists(&mut self, lists: Vec) { - for list in lists { - let list_path = self.conf.deletion_list_path(list.sequence); - debug!("Removing deletion list {list} at {}", list_path.display()); - - if let Err(e) = tokio::fs::remove_file(&list_path).await { - // Unexpected: we should have permissions and nothing else should - // be touching these files. We will leave the file behind. Subsequent - // pageservers will try and load it again: hopefully whatever storage - // issue (probably permissions) has been fixed by then. - tracing::error!("Failed to delete {}: {e:#}", list_path.display()); - break; - } - } - } - - /// Process any outstanding validations of generations of pending LSN updates or pending - /// DeletionLists. - /// - /// Valid LSN updates propagate back to their result channel immediately, valid DeletionLists - /// go into the queue of ready-to-execute lists. - async fn validate(&mut self) -> Result<(), DeletionQueueError> { - let mut tenant_generations = HashMap::new(); - for list in &self.pending_lists { - for (tenant_id, tenant_list) in &list.tenants { - // Note: DeletionLists are in logical time order, so generation always - // goes up. By doing a simple insert() we will always end up with - // the latest generation seen for a tenant. - tenant_generations.insert(*tenant_id, tenant_list.generation); - } - } - - let pending_lsn_updates = { - let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned"); - let mut pending_updates = VisibleLsnUpdates::new(); - std::mem::swap(&mut pending_updates, &mut lsn_table); - pending_updates - }; - for (tenant_id, update) in &pending_lsn_updates.tenants { - let entry = tenant_generations - .entry(*tenant_id) - .or_insert(update.generation); - if update.generation > *entry { - *entry = update.generation; - } - } - - if tenant_generations.is_empty() { - // No work to do - return Ok(()); - } - - let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client { - control_plane_client - .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect()) - .await - // The only wait a validation call returns an error is when the cancellation token fires - .map_err(|_| DeletionQueueError::ShuttingDown)? - } else { - // Control plane API disabled. In legacy mode we consider everything valid. - tenant_generations.keys().map(|k| (*k, true)).collect() - }; - - let mut validated_sequence: Option = None; - - // Apply the validation results to the pending LSN updates - for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants { - let validated_generation = tenant_generations - .get(&tenant_id) - .expect("Map was built from the same keys we're reading"); - - // If the tenant was missing from the validation response, it has been deleted. We may treat - // deletions as valid as the tenant's remote storage is all to be wiped anyway. - let valid = tenants_valid.get(&tenant_id).copied().unwrap_or(true); - - if valid && *validated_generation == tenant_lsn_state.generation { - for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines { - // Drop result of send: it is legal for the Timeline to have been dropped along - // with its queue receiver while we were doing validation. - pending_lsn.result_slot.store(pending_lsn.projected); - } - } else { - // If we failed validation, then do not apply any of the projected updates - warn!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {0:?}", tenant_lsn_state.generation); - } - } - - // Apply the validation results to the pending deletion lists - for list in &mut self.pending_lists { - // Filter the list based on whether the server responded valid: true. - // If a tenant is omitted in the response, it has been deleted, and we should - // proceed with deletion. - let mut mutated = false; - list.tenants.retain(|tenant_id, tenant| { - let validated_generation = tenant_generations - .get(tenant_id) - .expect("Map was built from the same keys we're reading"); - - // If the tenant was missing from the validation response, it has been deleted. We may treat - // deletions as valid as the tenant's remote storage is all to be wiped anyway. - let valid = tenants_valid.get(tenant_id).copied().unwrap_or(true); - - // A list is valid if it comes from the current _or previous_ generation. - // The previous generation case is due to how we store deletion lists locally: - // if we see the immediately previous generation in a locally stored deletion list, - // it proves that this node's disk was used for both current & previous generations, - // and therefore no other node was involved in between: the two generations may be - // logically treated as the same. - let this_list_valid = valid - && (tenant.generation == *validated_generation); - - if !this_list_valid { - warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation); - DELETION_QUEUE_DROPPED.inc_by(tenant.len() as u64); - mutated = true; - } - this_list_valid - }); - list.validated = true; - - if mutated { - // Save the deletion list if we had to make changes due to stale generations. The - // saved list is valid for execution. - if let Err(e) = list.save(self.conf).await { - // Highly unexpected. Could happen if e.g. disk full. - // If we didn't save the trimmed list, it is _not_ valid to execute. - warn!("Failed to save modified deletion list {list}: {e:#}"); - - // Rather than have a complex retry process, just drop it and leak the objects, - // scrubber will clean up eventually. - list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution. - } - } - - validated_sequence = Some(list.sequence); - } - - if let Some(validated_sequence) = validated_sequence { - // Write the queue header to record how far validation progressed. This avoids having - // to rewrite each DeletionList to set validated=true in it. - let header = DeletionHeader::new(validated_sequence); - - // Drop result because the validated_sequence is an optimization. If we fail to save it, - // then restart, we will drop some deletion lists, creating work for scrubber. - // The save() function logs a warning on error. - if let Err(e) = header.save(self.conf).await { - warn!("Failed to write deletion queue header: {e:#}"); - DELETION_QUEUE_ERRORS - .with_label_values(&["put_header"]) - .inc(); - } - } - - // Transfer the validated lists to the validated queue, for eventual execution - self.validated_lists.append(&mut self.pending_lists); - - Ok(()) - } - - async fn flush(&mut self) -> Result<(), DeletionQueueError> { - tracing::debug!("Flushing with {} pending lists", self.pending_lists.len()); - - // Issue any required generation validation calls to the control plane - self.validate().await?; - - // After successful validation, nothing is pending: any lists that - // made it through validation will be in validated_lists. - assert!(self.pending_lists.is_empty()); - self.pending_key_count = 0; - - tracing::debug!( - "Validation complete, have {} validated lists", - self.validated_lists.len() - ); - - // Return quickly if we have no validated lists to execute. This avoids flushing the - // executor when an idle backend hits its autoflush interval - if self.validated_lists.is_empty() { - return Ok(()); - } - - // Drain `validated_lists` into the executor - let mut executing_lists = Vec::new(); - for mut list in self.validated_lists.drain(..) { - let objects = list.drain_paths(); - self.tx - .send(ExecutorMessage::Delete(objects)) - .await - .map_err(|_| DeletionQueueError::ShuttingDown)?; - executing_lists.push(list); - } - - self.flush_executor().await?; - - // Erase the deletion lists whose keys have all be deleted from remote storage - self.cleanup_lists(executing_lists).await; - - Ok(()) - } - - async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> { - // Flush the executor, so that all the keys referenced by these deletion lists - // are actually removed from remote storage. This is a precondition to deleting - // the deletion lists themselves. - let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - let flush_op = FlushOp { tx }; - self.tx - .send(ExecutorMessage::Flush(flush_op)) - .await - .map_err(|_| DeletionQueueError::ShuttingDown)?; - - rx.await.map_err(|_| DeletionQueueError::ShuttingDown) - } - - pub(super) async fn background(&mut self) { - tracing::info!("Started deletion backend worker"); - - while !self.cancel.is_cancelled() { - let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await { - Ok(Some(m)) => m, - Ok(None) => { - // All queue senders closed - info!("Shutting down"); - break; - } - Err(_) => { - // Timeout, we hit deadline to execute whatever we have in hand. These functions will - // return immediately if no work is pending. - // Drop result, because it' a background flush and we don't care whether it really worked. - drop(self.flush().await); - - continue; - } - }; - - match msg { - BackendQueueMessage::Delete(list) => { - if list.validated { - self.validated_lists.push(list) - } else { - self.pending_key_count += list.len(); - self.pending_lists.push(list); - } - - if self.pending_key_count > AUTOFLUSH_KEY_COUNT { - // Drop possible shutdown error, because we will just fall out of loop if that happens - drop(self.flush().await); - } - } - BackendQueueMessage::Flush(op) => { - if let Ok(()) = self.flush().await { - // If we fail due to shutting down, we will just drop `op` to propagate that status. - op.fire(); - } - } - } - } - } -} diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs deleted file mode 100644 index b9abd1dc99..0000000000 --- a/pageserver/src/deletion_queue/frontend.rs +++ /dev/null @@ -1,430 +0,0 @@ -use super::BackendQueueMessage; -use super::DeletionHeader; -use super::DeletionList; -use super::FlushOp; - -use std::collections::HashMap; -use std::fs::create_dir_all; -use std::time::Duration; - -use regex::Regex; -use remote_storage::RemotePath; -use tokio_util::sync::CancellationToken; -use tracing::debug; -use tracing::info; -use tracing::warn; -use utils::generation::Generation; -use utils::id::TenantId; -use utils::id::TimelineId; - -use crate::config::PageServerConf; -use crate::metrics::DELETION_QUEUE_ERRORS; -use crate::metrics::DELETION_QUEUE_SUBMITTED; -use crate::tenant::remote_timeline_client::remote_layer_path; -use crate::tenant::storage_layer::LayerFileName; - -// The number of keys in a DeletionList before we will proactively persist it -// (without reaching a flush deadline). This aims to deliver objects of the order -// of magnitude 1MB when we are under heavy delete load. -const DELETION_LIST_TARGET_SIZE: usize = 16384; - -// Ordinarily, we only flush to DeletionList periodically, to bound the window during -// which we might leak objects from not flushing a DeletionList after -// the objects are already unlinked from timeline metadata. -const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000); - -// If someone is waiting for a flush to DeletionList, only delay a little to accumulate -// more objects before doing the flush. -const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100); - -#[derive(Debug)] -pub(super) struct DeletionOp { - pub(super) tenant_id: TenantId, - pub(super) timeline_id: TimelineId, - // `layers` and `objects` are both just lists of objects. `layers` is used if you do not - // have a config object handy to project it to a remote key, and need the consuming worker - // to do it for you. - pub(super) layers: Vec<(LayerFileName, Generation)>, - pub(super) objects: Vec, - - /// The _current_ generation of the Tenant attachment in which we are enqueuing - /// this deletion. - pub(super) generation: Generation, -} - -#[derive(Debug)] -pub(super) struct RecoverOp { - pub(super) attached_tenants: HashMap, -} - -#[derive(Debug)] -pub(super) enum FrontendQueueMessage { - Delete(DeletionOp), - // Wait until all prior deletions make it into a persistent DeletionList - Flush(FlushOp), - // Wait until all prior deletions have been executed (i.e. objects are actually deleted) - FlushExecute(FlushOp), - // Call once after re-attaching to control plane, to notify the deletion queue about - // latest attached generations & load any saved deletion lists from disk. - Recover(RecoverOp), -} - -pub(super) struct FrontendQueueWorker { - conf: &'static PageServerConf, - - // Incoming frontend requests to delete some keys - rx: tokio::sync::mpsc::Receiver, - - // Outbound requests to the backend to execute deletion lists we have composed. - tx: tokio::sync::mpsc::Sender, - - // The list we are currently building, contains a buffer of keys to delete - // and our next sequence number - pending: DeletionList, - - // These FlushOps should fire the next time we flush - pending_flushes: Vec, - - // Worker loop is torn down when this fires. - cancel: CancellationToken, -} - -impl FrontendQueueWorker { - // Initially DeletionHeader.validated_sequence is zero. The place we start our - // sequence numbers must be higher than that. - const BASE_SEQUENCE: u64 = 1; - - pub(super) fn new( - conf: &'static PageServerConf, - rx: tokio::sync::mpsc::Receiver, - tx: tokio::sync::mpsc::Sender, - cancel: CancellationToken, - ) -> Self { - Self { - pending: DeletionList::new(Self::BASE_SEQUENCE), - conf, - rx, - tx, - pending_flushes: Vec::new(), - cancel, - } - } - - /// Try to flush `list` to persistent storage - /// - /// This does not return errors, because on failure to flush we do not lose - /// any state: flushing will be retried implicitly on the next deadline - async fn flush(&mut self) { - if self.pending.is_empty() { - for f in self.pending_flushes.drain(..) { - f.fire(); - } - return; - } - - match self.pending.save(self.conf).await { - Ok(_) => { - info!(sequence = self.pending.sequence, "Stored deletion list"); - - for f in self.pending_flushes.drain(..) { - f.fire(); - } - - let onward_list = self.pending.drain(); - - // We have consumed out of pending: reset it for the next incoming deletions to accumulate there - self.pending = DeletionList::new(self.pending.sequence + 1); - - if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await { - // This is allowed to fail: it will only happen if the backend worker is shut down, - // so we can just drop this on the floor. - info!("Deletion list dropped, this is normal during shutdown ({e:#})"); - } - } - Err(e) => { - DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc(); - warn!( - sequence = self.pending.sequence, - "Failed to write deletion list, will retry later ({e:#})" - ); - } - } - } - - /// Load the header, to learn the sequence number up to which deletions - /// have been validated. We will apply validated=true to DeletionLists - /// <= this sequence when loading them. - /// - /// It is not an error for the header to not exist: we return None, and - /// the caller should act as if validated_sequence is 0 - async fn load_validated_sequence(&self) -> Result, anyhow::Error> { - let header_path = self.conf.deletion_header_path(); - match tokio::fs::read(&header_path).await { - Ok(header_bytes) => { - match serde_json::from_slice::(&header_bytes) { - Ok(h) => Ok(Some(h.validated_sequence)), - Err(e) => { - warn!( - "Failed to deserialize deletion header, ignoring {}: {e:#}", - header_path.display() - ); - // This should never happen unless we make a mistake with our serialization. - // Ignoring a deletion header is not consequential for correctnes because all deletions - // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up. - Ok(None) - } - } - } - Err(e) => { - if e.kind() == std::io::ErrorKind::NotFound { - debug!( - "Deletion header {} not found, first start?", - header_path.display() - ); - Ok(None) - } else { - Err(anyhow::anyhow!(e)) - } - } - } - } - - async fn recover( - &mut self, - attached_tenants: HashMap, - ) -> Result<(), anyhow::Error> { - debug!( - "recovering with {} attached tenants", - attached_tenants.len() - ); - - // Load the header - let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0); - - // Start our next deletion list from after the last location validated by - // previous process lifetime, or after the last location found (it is updated - // below after enumerating the deletion lists) - self.pending.sequence = std::cmp::max(self.pending.sequence, validated_sequence + 1); - - let deletion_directory = self.conf.deletion_prefix(); - let mut dir = match tokio::fs::read_dir(&deletion_directory).await { - Ok(d) => d, - Err(e) => { - warn!( - "Failed to open deletion list directory {}: {e:#}", - deletion_directory.display(), - ); - - // Give up: if we can't read the deletion list directory, we probably can't - // write lists into it later, so the queue won't work. - return Err(e.into()); - } - }; - - let list_name_pattern = Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{2}).list").unwrap(); - - let header_path = self.conf.deletion_header_path(); - let mut seqs: Vec = Vec::new(); - while let Some(dentry) = dir.next_entry().await? { - if Some(dentry.file_name().as_os_str()) == header_path.file_name() { - // Don't try and parse the header's name like a list - continue; - } - - let file_name = dentry.file_name().to_owned(); - let basename = file_name.to_string_lossy(); - let seq_part = if let Some(m) = list_name_pattern.captures(&basename) { - m.get(1) - .expect("Non optional group should be present") - .as_str() - } else { - warn!("Unexpected key in deletion queue: {basename}"); - continue; - }; - - let seq: u64 = match u64::from_str_radix(seq_part, 16) { - Ok(s) => s, - Err(e) => { - warn!("Malformed key '{basename}': {e}"); - continue; - } - }; - seqs.push(seq); - } - seqs.sort(); - - // Initialize the next sequence number in the frontend based on the maximum of the highest list we see, - // and the last list that was deleted according to the header. Combined with writing out the header - // prior to deletions, this guarnatees no re-use of sequence numbers. - if let Some(max_list_seq) = seqs.last() { - self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1); - } - - for s in seqs { - let list_path = self.conf.deletion_list_path(s); - - let list_bytes = tokio::fs::read(&list_path).await?; - - let mut deletion_list = match serde_json::from_slice::(&list_bytes) { - Ok(l) => l, - Err(e) => { - // Drop the list on the floor: any objects it referenced will be left behind - // for scrubbing to clean up. This should never happen unless we have a serialization bug. - warn!(sequence = s, "Failed to deserialize deletion list: {e}"); - continue; - } - }; - - if deletion_list.sequence <= validated_sequence { - // If the deletion list falls below valid_seq, we may assume that it was - // already validated the last time this pageserver ran. Otherwise, we still - // load it, as it may still contain content valid in this generation. - deletion_list.validated = true; - } else { - // Special case optimization: if a tenant is still attached, and no other - // generation was issued to another node in the interval while we restarted, - // then we may treat deletion lists from the previous generation as if they - // belong to our currently attached generation, and proceed to validate & execute. - for (tenant_id, tenant_list) in &mut deletion_list.tenants { - if let Some(attached_gen) = attached_tenants.get(tenant_id) { - if attached_gen.previous() == tenant_list.generation { - tenant_list.generation = *attached_gen; - } - } - } - } - - info!( - validated = deletion_list.validated, - sequence = deletion_list.sequence, - "Recovered deletion list" - ); - - // We will drop out of recovery if this fails: it indicates that we are shutting down - // or the backend has panicked - DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64); - self.tx - .send(BackendQueueMessage::Delete(deletion_list)) - .await?; - } - - info!(next_sequence = self.pending.sequence, "Replay complete"); - - Ok(()) - } - - /// This is the front-end ingest, where we bundle up deletion requests into DeletionList - /// and write them out, for later validation by the backend and execution by the executor. - pub(super) async fn background(&mut self) { - info!("Started deletion frontend worker"); - - // Synchronous, but we only do it once per process lifetime so it's tolerable - if let Err(e) = create_dir_all(&self.conf.deletion_prefix()) { - tracing::error!( - "Failed to create deletion list directory {}, deletions will not be executed ({e})", - self.conf.deletion_prefix().display() - ); - return; - } - - while !self.cancel.is_cancelled() { - let timeout = if self.pending_flushes.is_empty() { - FRONTEND_DEFAULT_TIMEOUT - } else { - FRONTEND_FLUSHING_TIMEOUT - }; - - let msg = match tokio::time::timeout(timeout, self.rx.recv()).await { - Ok(Some(msg)) => msg, - Ok(None) => { - // Queue sender destroyed, shutting down - break; - } - Err(_) => { - // Hit deadline, flush. - self.flush().await; - continue; - } - }; - - match msg { - FrontendQueueMessage::Delete(op) => { - debug!( - "Delete: ingesting {} layers, {} other objects", - op.layers.len(), - op.objects.len() - ); - - let mut layer_paths = Vec::new(); - for (layer, generation) in op.layers { - layer_paths.push(remote_layer_path( - &op.tenant_id, - &op.timeline_id, - &layer, - generation, - )); - } - layer_paths.extend(op.objects); - - if !self.pending.push( - &op.tenant_id, - &op.timeline_id, - op.generation, - &mut layer_paths, - ) { - self.flush().await; - let retry_succeeded = self.pending.push( - &op.tenant_id, - &op.timeline_id, - op.generation, - &mut layer_paths, - ); - if !retry_succeeded { - // Unexpected: after we flush, we should have - // drained self.pending, so a conflict on - // generation numbers should be impossible. - tracing::error!( - "Failed to enqueue deletions, leaking objects. This is a bug." - ); - } - } - } - FrontendQueueMessage::Flush(op) => { - if self.pending.is_empty() { - // Execute immediately - debug!("Flush: No pending objects, flushing immediately"); - op.fire() - } else { - // Execute next time we flush - debug!("Flush: adding to pending flush list for next deadline flush"); - self.pending_flushes.push(op); - } - } - FrontendQueueMessage::FlushExecute(op) => { - debug!("FlushExecute: passing through to backend"); - // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute - if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await { - info!("Can't flush, shutting down ({e})"); - // Caller will get error when their oneshot sender was dropped. - } - } - FrontendQueueMessage::Recover(op) => { - if let Err(e) = self.recover(op.attached_tenants).await { - // This should only happen in truly unrecoverable cases, like the recovery finding that the backend - // queue receiver has been dropped, or something is critically broken with - // the local filesystem holding deletion lists. - info!( - "Deletion queue recover aborted, deletion queue will not proceed ({e})" - ); - return; - } - } - } - - if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() { - self.flush().await; - } - } - info!("Deletion queue shut down."); - } -} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e61a9dcf3f..986f86ec93 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; -use futures::TryFutureExt; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; @@ -1147,39 +1146,6 @@ async fn timeline_download_remote_layers_handler_get( json_response(StatusCode::OK, info) } -async fn deletion_queue_flush( - r: Request, - cancel: CancellationToken, -) -> Result, ApiError> { - let state = get_state(&r); - - if state.remote_storage.is_none() { - // Nothing to do if remote storage is disabled. - return json_response(StatusCode::OK, ()); - } - - let execute = parse_query_param(&r, "execute")?.unwrap_or(false); - - let flush = async { - if execute { - state.deletion_queue_client.flush_execute().await - } else { - state.deletion_queue_client.flush().await - } - } - // DeletionQueueError's only case is shutting down. - .map_err(|_| ApiError::ShuttingDown); - - tokio::select! { - res = flush => { - res.map(|()| json_response(StatusCode::OK, ()))? - } - _ = cancel.cancelled() => { - Err(ApiError::ShuttingDown) - } - } -} - async fn active_timeline_of_active_tenant( tenant_id: TenantId, timeline_id: TimelineId, @@ -1514,9 +1480,6 @@ pub fn make_router( .put("/v1/disk_usage_eviction/run", |r| { api_handler(r, disk_usage_eviction_run) }) - .put("/v1/deletion_queue/flush", |r| { - api_handler(r, deletion_queue_flush) - }) .put("/v1/tenant/:tenant_id/break", |r| { testing_api_handler("set tenant state to broken", r, handle_tenant_break) }) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index e370e063ba..0b6d4b2903 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -3,7 +3,6 @@ pub mod basebackup; pub mod config; pub mod consumption_metrics; pub mod context; -pub mod control_plane_client; pub mod deletion_queue; pub mod disk_usage_eviction_task; pub mod http; @@ -51,7 +50,7 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); pub use crate::metrics::preinitialize_metrics; #[tracing::instrument(skip_all, fields(%exit_code))] -pub async fn shutdown_pageserver(deletion_queue: Option, exit_code: i32) { +pub async fn shutdown_pageserver(_deletion_queue: Option, exit_code: i32) { use std::time::Duration; // Shut down the libpq endpoint task. This prevents new connections from // being accepted. @@ -79,11 +78,6 @@ pub async fn shutdown_pageserver(deletion_queue: Option, exit_cod ) .await; - // Best effort to persist any outstanding deletions, to avoid leaking objects - if let Some(mut deletion_queue) = deletion_queue { - deletion_queue.shutdown(Duration::from_secs(5)).await; - } - // Shut down the HTTP endpoint last, so that you can still check the server's // status while it's shutting down. // FIXME: We should probably stop accepting commands like attach/detach earlier. diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index f0e6042ad5..d40b31cb93 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -874,14 +874,6 @@ pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -pub(crate) static DELETION_QUEUE_DROPPED: Lazy = Lazy::new(|| { - register_int_counter!( - "pageserver_deletion_queue_dropped_total", - "Number of object deletions dropped due to stale generation." - ) - .expect("failed to define a metric") -}); - pub(crate) static DELETION_QUEUE_EXECUTED: Lazy = Lazy::new(|| { register_int_counter!( "pageserver_deletion_queue_executed_total", diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a1ca3d8682..d3715a0644 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -20,7 +20,6 @@ use utils::crashsafe; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; -use crate::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi}; use crate::deletion_queue::DeletionQueueClient; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; @@ -65,17 +64,6 @@ impl TenantsMap { } } -/// This is "safe" in that that it won't leave behind a partially deleted directory -/// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting -/// the contents. -/// -/// This is pageserver-specific, as it relies on future processes after a crash to check -/// for TEMP_FILE_SUFFIX when loading things. -async fn safe_remove_tenant_dir_all(path: impl AsRef) -> std::io::Result<()> { - let tmp_path = safe_rename_tenant_dir(path).await?; - fs::remove_dir_all(tmp_path).await -} - async fn safe_rename_tenant_dir(path: impl AsRef) -> std::io::Result { let parent = path .as_ref() @@ -115,22 +103,6 @@ pub async fn init_tenant_mgr( let mut tenants = HashMap::new(); - // If we are configured to use the control plane API, then it is the source of truth for what tenants to load. - let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) { - let result = client.re_attach().await?; - - // Tip off the deletion queue about latest attached generations before starting any Tenants - resources - .deletion_queue_client - .recover(result.clone()) - .await?; - - Some(result) - } else { - info!("Control plane API not configured, tenant generations are disabled"); - None - }; - let mut dir_entries = fs::read_dir(&tenants_dir) .await .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?; @@ -196,31 +168,7 @@ pub async fn init_tenant_mgr( } }; - let generation = if let Some(generations) = &tenant_generations { - // We have a generation map: treat it as the authority for whether - // this tenant is really attached. - if let Some(gen) = generations.get(&tenant_id) { - *gen - } else { - info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response"); - if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await { - error!( - "Failed to remove detached tenant directory '{}': {:?}", - tenant_dir_path.display(), - e - ); - } - continue; - } - } else { - // Legacy mode: no generation information, any tenant present - // on local disk may activate - info!( - "Starting tenant {} in legacy mode, no generation", - tenant_dir_path.display() - ); - Generation::none() - }; + let generation = Generation::none(); match schedule_local_tenant_processing( conf, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 7eb00c0405..4cb6617802 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1196,67 +1196,49 @@ impl RemoteTimelineClient { } // The task has completed successfully. Remove it from the in-progress list. - let lsn_update = { - let mut upload_queue_guard = self.upload_queue.lock().unwrap(); - let upload_queue = match upload_queue_guard.deref_mut() { - UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"), - UploadQueue::Stopped(_stopped) => { - None - }, - UploadQueue::Initialized(qi) => { Some(qi) } - }; - - let upload_queue = match upload_queue { - Some(upload_queue) => upload_queue, - None => { - info!("another concurrent task already stopped the queue"); - return; - } - }; - - upload_queue.inprogress_tasks.remove(&task.task_id); - - let lsn_update = match task.op { - UploadOp::UploadLayer(_, _) => { - upload_queue.num_inprogress_layer_uploads -= 1; - None - } - UploadOp::UploadMetadata(_, lsn) => { - upload_queue.num_inprogress_metadata_uploads -= 1; - // XXX monotonicity check? - - upload_queue.projected_remote_consistent_lsn = Some(lsn); - if self.generation.is_none() { - // Legacy mode: skip validating generation - upload_queue.visible_remote_consistent_lsn.store(lsn); - None - } else { - Some((lsn, upload_queue.visible_remote_consistent_lsn.clone())) - } - } - UploadOp::Delete(_) => { - upload_queue.num_inprogress_deletions -= 1; - None - } - UploadOp::Barrier(_) => unreachable!(), - }; - - // Launch any queued tasks that were unblocked by this one. - self.launch_queued_tasks(upload_queue); - lsn_update + let mut upload_queue_guard = self.upload_queue.lock().unwrap(); + let upload_queue = match upload_queue_guard.deref_mut() { + UploadQueue::Uninitialized => panic!( + "callers are responsible for ensuring this is only called on an initialized queue" + ), + UploadQueue::Stopped(_stopped) => None, + UploadQueue::Initialized(qi) => Some(qi), }; - if let Some((lsn, slot)) = lsn_update { - self.deletion_queue_client - .update_remote_consistent_lsn( - self.tenant_id, - self.timeline_id, - self.generation, - lsn, - slot, - ) - .await; - } + let upload_queue = match upload_queue { + Some(upload_queue) => upload_queue, + None => { + info!("another concurrent task already stopped the queue"); + return; + } + }; + + upload_queue.inprogress_tasks.remove(&task.task_id); + + match task.op { + UploadOp::UploadLayer(_, _) => { + upload_queue.num_inprogress_layer_uploads -= 1; + } + UploadOp::UploadMetadata(_, lsn) => { + upload_queue.num_inprogress_metadata_uploads -= 1; + // XXX monotonicity check? + + upload_queue.projected_remote_consistent_lsn = Some(lsn); + if self.generation.is_none() { + // Legacy mode: skip validating generation + upload_queue.visible_remote_consistent_lsn.store(lsn); + } else { + unimplemented!("no support for generations yet") + } + } + UploadOp::Delete(_) => { + upload_queue.num_inprogress_deletions -= 1; + } + UploadOp::Barrier(_) => unreachable!(), + }; + + // Launch any queued tasks that were unblocked by this one. + self.launch_queued_tasks(upload_queue); self.calls_unfinished_metric_end(&task.op); }