From 3af693749d04f4a811c4adb8006d96eb9c7495a5 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 9 Aug 2023 14:08:12 +0100 Subject: [PATCH] pageserver: wire deletion queue through to Tenant --- pageserver/src/bin/pageserver.rs | 2 ++ pageserver/src/deletion_queue.rs | 8 ++++++++ pageserver/src/http/routes.rs | 9 +++++++++ pageserver/src/tenant.rs | 26 ++++++++++++++++++++++++-- pageserver/src/tenant/mgr.rs | 23 +++++++++++++++++++---- pageserver/src/tenant/timeline.rs | 13 +++++++++++-- 6 files changed, 73 insertions(+), 8 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 966b2b81c0..40bf3869d2 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -389,6 +389,7 @@ fn start_pageserver( conf, broker_client.clone(), remote_storage.clone(), + deletion_queue.clone(), order, ))?; @@ -484,6 +485,7 @@ fn start_pageserver( http_auth, broker_client.clone(), remote_storage, + deletion_queue.clone(), disk_usage_eviction_state, )? .build() diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 0ddb18bcad..124427ee8a 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -13,6 +13,7 @@ use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; /// not compete with the same S3 clients/connections used for higher priority uploads. /// /// DeletionQueue is the frontend that the rest of the pageserver interacts with. +#[derive(Clone)] pub struct DeletionQueue { tx: tokio::sync::mpsc::Sender, } @@ -157,4 +158,11 @@ impl DeletionQueue { }, ) } + + /// A queue to nowhere: attempts to delete will do nothing + #[cfg(test)] + pub fn new_mock() -> Self { + let (tx, _) = tokio::sync::mpsc::channel(16384); + Self { tx } + } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 1e8dada85e..c65cc84a8e 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -23,6 +23,7 @@ use super::models::{ TimelineCreateRequest, TimelineGcRequest, TimelineInfo, }; use crate::context::{DownloadBehavior, RequestContext}; +use crate::deletion_queue::DeletionQueue; use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; @@ -56,6 +57,7 @@ struct State { auth: Option>, allowlist_routes: Vec, remote_storage: Option, + deletion_queue: DeletionQueue, broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, } @@ -65,6 +67,7 @@ impl State { conf: &'static PageServerConf, auth: Option>, remote_storage: Option, + deletion_queue: DeletionQueue, broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, ) -> anyhow::Result { @@ -78,6 +81,7 @@ impl State { allowlist_routes, remote_storage, broker_client, + deletion_queue, disk_usage_eviction_state, }) } @@ -490,6 +494,7 @@ async fn tenant_attach_handler( tenant_conf, state.broker_client.clone(), remote_storage.clone(), + &state.deletion_queue, &ctx, ) .instrument(info_span!("tenant_attach", %tenant_id)) @@ -553,6 +558,7 @@ async fn tenant_load_handler( tenant_id, state.broker_client.clone(), state.remote_storage.clone(), + &state.deletion_queue, &ctx, ) .instrument(info_span!("load", %tenant_id)) @@ -878,6 +884,7 @@ async fn tenant_create_handler( target_tenant_id, state.broker_client.clone(), state.remote_storage.clone(), + &state.deletion_queue, &ctx, ) .instrument(info_span!("tenant_create", tenant_id = %target_tenant_id)) @@ -1334,6 +1341,7 @@ pub fn make_router( auth: Option>, broker_client: BrokerClientChannel, remote_storage: Option, + deletion_queue: DeletionQueue, disk_usage_eviction_state: Arc, ) -> anyhow::Result> { let spec = include_bytes!("openapi_spec.yml"); @@ -1363,6 +1371,7 @@ pub fn make_router( conf, auth, remote_storage, + deletion_queue, broker_client, disk_usage_eviction_state, ) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cedb381ccc..cf159160c0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -57,6 +57,8 @@ use self::timeline::uninit::UninitializedTimeline; use self::timeline::EvictionTaskTenantState; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; +use crate::deletion_queue::DeletionQueue; +use crate::deletion_queue::DeletionQueueClient; use crate::import_datadir; use crate::is_uninit_mark; use crate::metrics::TENANT_ACTIVATION; @@ -185,6 +187,9 @@ pub struct Tenant { // provides access to timeline data sitting in the remote storage remote_storage: Option, + // Access to global deletion queue for when this tenant wants to schedule a deletion + deletion_queue_client: Option, + /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`]. cached_logical_sizes: tokio::sync::Mutex>, cached_synthetic_tenant_size: Arc, @@ -503,6 +508,7 @@ impl Tenant { tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, + deletion_queue: &DeletionQueue, ctx: &RequestContext, ) -> anyhow::Result> { // TODO dedup with spawn_load @@ -517,6 +523,7 @@ impl Tenant { wal_redo_manager, tenant_id, Some(remote_storage), + Some(deletion_queue.new_client()), )); // Do all the hard work in the background @@ -756,6 +763,7 @@ impl Tenant { wal_redo_manager, tenant_id, None, + None, )) } @@ -774,6 +782,7 @@ impl Tenant { tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + deletion_queue: &DeletionQueue, init_order: Option, tenants: &'static tokio::sync::RwLock, ctx: &RequestContext, @@ -796,6 +805,7 @@ impl Tenant { wal_redo_manager, tenant_id, remote_storage.clone(), + Some(deletion_queue.new_client()), ); let tenant = Arc::new(tenant); @@ -2191,7 +2201,16 @@ impl Tenant { walredo_mgr: Arc, tenant_id: TenantId, remote_storage: Option, + deletion_queue_client: Option, ) -> Tenant { + #[cfg(not(test))] + match state { + TenantState::Broken { .. } => {} + _ => { + // Non-broken tenants must be constructed with a deletion queue + assert!(deletion_queue_client.is_some()); + } + } let (state, mut rx) = watch::channel(state); tokio::spawn(async move { @@ -2257,6 +2276,7 @@ impl Tenant { gc_cs: tokio::sync::Mutex::new(()), walredo_mgr, remote_storage, + deletion_queue_client, state, cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), @@ -3389,7 +3409,7 @@ pub mod harness { pub async fn load(&self) -> (Arc, RequestContext) { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); ( - self.try_load(&ctx, None) + self.try_load(&ctx, None, None) .await .expect("failed to load test tenant"), ctx, @@ -3400,6 +3420,7 @@ pub mod harness { &self, ctx: &RequestContext, remote_storage: Option, + deletion_queue: Option<&DeletionQueue>, ) -> anyhow::Result> { let walredo_mgr = Arc::new(TestRedoManager); @@ -3410,6 +3431,7 @@ pub mod harness { walredo_mgr, self.tenant_id, remote_storage, + deletion_queue.map(|q| q.new_client()), )); tenant .load(None, ctx) @@ -3949,7 +3971,7 @@ mod tests { std::fs::write(metadata_path, metadata_bytes)?; let err = harness - .try_load(&ctx, None) + .try_load(&ctx, None, None) .await .err() .expect("should fail"); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index bb8a0d7089..b0f925bdb7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -18,6 +18,7 @@ use utils::crashsafe; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; +use crate::deletion_queue::DeletionQueue; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; use crate::tenant::delete::DeleteTenantFlow; @@ -68,6 +69,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + deletion_queue: DeletionQueue, init_order: InitializationOrder, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants @@ -127,6 +129,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), + &deletion_queue, Some(init_order.clone()), &TENANTS, &ctx, @@ -164,6 +167,7 @@ pub(crate) fn schedule_local_tenant_processing( tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + deletion_queue: &DeletionQueue, init_order: Option, tenants: &'static tokio::sync::RwLock, ctx: &RequestContext, @@ -201,7 +205,14 @@ pub(crate) fn schedule_local_tenant_processing( let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() { info!("tenant {tenant_id} has attaching mark file, resuming its attach operation"); if let Some(remote_storage) = remote_storage { - match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) { + match Tenant::spawn_attach( + conf, + tenant_id, + broker_client, + remote_storage, + deletion_queue, + ctx, + ) { Ok(tenant) => tenant, Err(e) => { error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}"); @@ -224,6 +235,7 @@ pub(crate) fn schedule_local_tenant_processing( tenant_id, broker_client, remote_storage, + deletion_queue, init_order, tenants, ctx, @@ -352,6 +364,7 @@ pub async fn create_tenant( tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + deletion_queue: &DeletionQueue, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { tenant_map_insert(tenant_id, || { @@ -363,7 +376,7 @@ pub async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?; + schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, deletion_queue, None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -512,6 +525,7 @@ pub async fn load_tenant( tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + deletion_queue: &DeletionQueue, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { tenant_map_insert(tenant_id, || { @@ -522,7 +536,7 @@ pub async fn load_tenant( .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; } - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx) + let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, deletion_queue, None, &TENANTS, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -589,6 +603,7 @@ pub async fn attach_tenant( tenant_conf: TenantConfOpt, broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, + deletion_queue: &DeletionQueue, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { // Temporary solution, proper one would be to resume deletion, but that needs more plumbing around Tenant::load/Tenant::attach @@ -609,7 +624,7 @@ pub async fn attach_tenant( .context("check for attach marker file existence")?; anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), deletion_queue, None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 502e5ed44e..dd16116792 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4882,6 +4882,7 @@ mod tests { use utils::{id::TimelineId, lsn::Lsn}; + use crate::deletion_queue::DeletionQueue; use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer}; use super::{EvictionError, Timeline}; @@ -4904,9 +4905,13 @@ mod tests { }; GenericRemoteStorage::from_config(&config).unwrap() }; + let deletion_queue = DeletionQueue::new_mock(); let ctx = any_context(); - let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap(); + let tenant = harness + .try_load(&ctx, Some(remote_storage), Some(&deletion_queue)) + .await + .unwrap(); let timeline = tenant .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) .await @@ -4969,9 +4974,13 @@ mod tests { }; GenericRemoteStorage::from_config(&config).unwrap() }; + let deletion_queue = DeletionQueue::new_mock(); let ctx = any_context(); - let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap(); + let tenant = harness + .try_load(&ctx, Some(remote_storage), Some(&deletion_queue)) + .await + .unwrap(); let timeline = tenant .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) .await