diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 798b9f258b..2acd47e422 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -408,7 +408,7 @@ fn start_pageserver( // Scan the local 'tenants/' directory and start loading the tenants let deletion_queue_client = deletion_queue.new_client(); - BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( + let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, TenantSharedResources { broker_client: broker_client.clone(), @@ -418,6 +418,7 @@ fn start_pageserver( order, shutdown_pageserver.clone(), ))?; + let tenant_manager = Arc::new(tenant_manager); BACKGROUND_RUNTIME.spawn({ let init_done_rx = init_done_rx; @@ -546,6 +547,7 @@ fn start_pageserver( let router_state = Arc::new( http::routes::State::new( conf, + tenant_manager, http_auth.clone(), remote_storage.clone(), broker_client.clone(), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 591e20ae9a..7d39badfdc 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -36,8 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::{LocationConf, TenantConfOpt}; use crate::tenant::mgr::{ - GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError, - TenantSlotUpsertError, TenantStateError, + GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError, + TenantSlotError, TenantSlotUpsertError, TenantStateError, }; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; @@ -64,6 +64,7 @@ use super::models::ConfigureFailpointsRequest; pub struct State { conf: &'static PageServerConf, + tenant_manager: Arc, auth: Option>, allowlist_routes: Vec, remote_storage: Option, @@ -75,6 +76,7 @@ pub struct State { impl State { pub fn new( conf: &'static PageServerConf, + tenant_manager: Arc, auth: Option>, remote_storage: Option, broker_client: storage_broker::BrokerClientChannel, @@ -87,6 +89,7 @@ impl State { .collect::>(); Ok(Self { conf, + tenant_manager, auth, allowlist_routes, remote_storage, @@ -1142,20 +1145,14 @@ async fn put_tenant_location_config_handler( let location_conf = LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?; - mgr::upsert_location( - state.conf, - tenant_id, - location_conf, - state.broker_client.clone(), - state.remote_storage.clone(), - state.deletion_queue_client.clone(), - &ctx, - ) - .await - // TODO: badrequest assumes the caller was asking for something unreasonable, but in - // principle we might have hit something like concurrent API calls to the same tenant, - // which is not a 400 but a 409. - .map_err(ApiError::BadRequest)?; + state + .tenant_manager + .upsert_location(tenant_id, location_conf, &ctx) + .await + // TODO: badrequest assumes the caller was asking for something unreasonable, but in + // principle we might have hit something like concurrent API calls to the same tenant, + // which is not a 400 but a 409. + .map_err(ApiError::BadRequest)?; json_response(StatusCode::OK, ()) } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 7bdebcb2bf..284a85df6e 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -184,6 +184,12 @@ async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> { Ok(()) } +pub struct TenantManager { + conf: &'static PageServerConf, + tenants: &'static std::sync::RwLock, + resources: TenantSharedResources, +} + fn emergency_generations( tenant_confs: &HashMap>, ) -> HashMap { @@ -350,7 +356,7 @@ pub async fn init_tenant_mgr( resources: TenantSharedResources, init_order: InitializationOrder, cancel: CancellationToken, -) -> anyhow::Result<()> { +) -> anyhow::Result { let mut tenants = HashMap::new(); let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn); @@ -450,7 +456,12 @@ pub async fn init_tenant_mgr( let mut tenants_map = TENANTS.write().unwrap(); assert!(matches!(&*tenants_map, &TenantsMap::Initializing)); *tenants_map = TenantsMap::Open(tenants); - Ok(()) + + Ok(TenantManager { + conf: conf, + tenants: &TENANTS, + resources, + }) } #[allow(clippy::too_many_arguments)] @@ -750,14 +761,13 @@ pub(crate) async fn set_new_tenant_config( Ok(()) } +impl TenantManager { + #[instrument(skip_all, fields(%tenant_id))] pub(crate) async fn upsert_location( - conf: &'static PageServerConf, + &self, tenant_id: TenantId, new_location_config: LocationConf, - broker_client: storage_broker::BrokerClientChannel, - remote_storage: Option, - deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), anyhow::Error> { info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); @@ -766,7 +776,7 @@ pub(crate) async fn upsert_location( // then we do not need to set the slot to InProgress, we can just call into the // existng tenant. { - let locked = TENANTS.read().unwrap(); + let locked = self.tenants.read().unwrap(); let peek_slot = tenant_map_peek_slot(&locked, &tenant_id)?; match (&new_location_config.mode, peek_slot) { (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { @@ -778,9 +788,9 @@ pub(crate) async fn upsert_location( new_location_config, )?); - // Persist the new config in the background, to avoid holding up any - // locks while we do so. - // TODO + // Persist the new config in the background, to avoid holding up any + // locks while we do so. + // TODO return Ok(()); } else { @@ -811,7 +821,7 @@ pub(crate) async fn upsert_location( AttachmentMode::Single | AttachmentMode::Multi => { // Before we leave our state as the presumed holder of the latest generation, // flush any outstanding deletions to reduce the risk of leaking objects. - deletion_queue_client.flush_advisory() + self.resources.deletion_queue_client.flush_advisory() } AttachmentMode::Stale => { // If we're stale there's not point trying to flush deletions @@ -830,21 +840,21 @@ pub(crate) async fn upsert_location( let new_slot = match &new_location_config.mode { LocationMode::Secondary(_) => { - let tenant_path = conf.tenant_path(&tenant_id); + let tenant_path = self.conf.tenant_path(&tenant_id); // Directory doesn't need to be fsync'd because if we crash it can // safely be recreated next time this tenant location is configured. unsafe_create_dir_all(&tenant_path) .await .with_context(|| format!("Creating {tenant_path}"))?; - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config) .await .map_err(SetNewTenantConfigError::Persist)?; TenantSlot::Secondary } LocationMode::Attached(_attach_config) => { - let timelines_path = conf.timelines_path(&tenant_id); + let timelines_path = self.conf.timelines_path(&tenant_id); // Directory doesn't need to be fsync'd because we do not depend on // it to exist after crashes: it may be recreated when tenant is @@ -853,20 +863,16 @@ pub(crate) async fn upsert_location( .await .with_context(|| format!("Creating {timelines_path}"))?; - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config) .await .map_err(SetNewTenantConfigError::Persist)?; let tenant = match Tenant::spawn_attach( - conf, + self.conf, tenant_id, - TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client, - }, + self.resources.clone(), AttachedTenantConf::try_from(new_location_config)?, - &TENANTS, + self.tenants, // The LocationConf API does not use marker files, because we have Secondary // locations where the directory's existence is not a signal that it contains // all timelines. See https://github.com/neondatabase/neon/issues/5550 @@ -876,7 +882,7 @@ pub(crate) async fn upsert_location( Ok(tenant) => tenant, Err(e) => { error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}"); - Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}")) + Tenant::create_broken_tenant(self.conf, tenant_id, format!("{e:#}")) } }; @@ -887,6 +893,7 @@ pub(crate) async fn upsert_location( Ok(()) } +} #[derive(Debug, thiserror::Error)] pub(crate) enum GetTenantError {