diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 61028e23fe..ea112afe68 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -159,6 +159,9 @@ impl From for ApiError { // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls). ApiError::InternalServerError(anyhow::Error::new(e)) } + e @ GetTenantError::NotLoaded(_, _) => { + ApiError::InternalServerError(anyhow::Error::new(e)) + } } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9e9285a009..175f9458ce 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1179,6 +1179,7 @@ async fn get_active_tenant_with_timeout( let tenant = match mgr::get_tenant(tenant_id, false).await { Ok(tenant) => tenant, Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)), + Err(e @ GetTenantError::NotLoaded(_, _)) => return Err(GetActiveTenantError::NotFound(e)), Err(GetTenantError::NotActive(_)) => { unreachable!("we're calling get_tenant with active=false") } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d3cd914037..de46b2ab8e 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -9,7 +9,7 @@ use tokio::fs; use anyhow::Context; use once_cell::sync::Lazy; -use tokio::sync::RwLock; +use tokio::sync::{OnceCell, RwLock}; use tokio::task::JoinSet; use tracing::*; @@ -29,6 +29,49 @@ use utils::completion; use utils::fs_ext::PathExt; use utils::id::{TenantId, TimelineId}; +struct LazyTenantsMap { + conf: &'static PageServerConf, + map: HashMap>>, + broker_client: storage_broker::BrokerClientChannel, + remote_storage: Option, +} + +impl LazyTenantsMap { + fn load_tenant(&self, tenant_id: &TenantId) -> anyhow::Result> { + let tenant_path = self.conf.tenant_path(tenant_id); + let tenant_ignore_mark = self.conf.tenant_ignore_mark_file_path(*tenant_id); + if tenant_ignore_mark.exists() { + std::fs::remove_file(&tenant_ignore_mark) + .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; + } + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); + schedule_local_tenant_processing( + self.conf, + &tenant_path, + self.broker_client.clone(), + self.remote_storage.clone(), + None, + &ctx, + ) + .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}")) + } + + async fn try_load_tenant(&self, tenant_id: &TenantId) -> Result, GetTenantError> { + self.load_tenant(tenant_id) + .map_err(|e| GetTenantError::NotLoaded(*tenant_id, e)) + } + + async fn get(&self, tenant_id: &TenantId) -> Result<&Arc, GetTenantError> { + let tenant = self + .map + .get(tenant_id) + .ok_or(GetTenantError::NotFound(*tenant_id))?; + Ok(tenant + .get_or_try_init(|| self.try_load_tenant(tenant_id)) + .await?) + } +} + /// The tenants known to the pageserver. /// The enum variants are used to distinguish the different states that the pageserver can be in. enum TenantsMap { @@ -36,23 +79,27 @@ enum TenantsMap { Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. /// New tenants can be added using [`tenant_map_insert`]. - Open(HashMap>), + Open(LazyTenantsMap), /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`]. /// Existing tenants are still accessible, but no new tenants can be created. ShuttingDown(HashMap>), } impl TenantsMap { - fn get(&self, tenant_id: &TenantId) -> Option<&Arc> { + async fn get(&self, tenant_id: &TenantId) -> Result<&Arc, GetTenantError> { match self { - TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id), + TenantsMap::Initializing => Err(GetTenantError::NotFound(*tenant_id)), + TenantsMap::Open(m) => m.get(tenant_id).await, + TenantsMap::ShuttingDown(m) => { + m.get(tenant_id).ok_or(GetTenantError::NotFound(*tenant_id)) + } } } - fn remove(&mut self, tenant_id: &TenantId) -> Option> { + fn remove(&mut self, tenant_id: &TenantId) -> bool { match self { - TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), + TenantsMap::Initializing => false, + TenantsMap::Open(m) => m.map.remove(tenant_id).is_some(), + TenantsMap::ShuttingDown(m) => m.remove(tenant_id).is_some(), } } } @@ -67,7 +114,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done: (completion::Completion, completion::Barrier), + _init_done: (completion::Completion, completion::Barrier), ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -78,8 +125,6 @@ pub async fn init_tenant_mgr( .await .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?; - let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn); - loop { match dir_entries.next_entry().await { Ok(None) => break, @@ -119,21 +164,15 @@ pub async fn init_tenant_mgr( continue; } - match schedule_local_tenant_processing( - conf, - &tenant_dir_path, - broker_client.clone(), - remote_storage.clone(), - Some(init_done.clone()), - &ctx, - ) { - Ok(tenant) => { - tenants.insert(tenant.tenant_id(), tenant); - } - Err(e) => { - error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}"); - } - } + let tenant_id = tenant_dir_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!("Could not parse tenant id out of the tenant dir name in path {tenant_dir_path:?}") + })?; + tenants.insert(tenant_id, OnceCell::new()); } } Err(e) => { @@ -151,7 +190,12 @@ pub async fn init_tenant_mgr( let mut tenants_map = TENANTS.write().await; assert!(matches!(&*tenants_map, &TenantsMap::Initializing)); - *tenants_map = TenantsMap::Open(tenants); + *tenants_map = TenantsMap::Open(LazyTenantsMap { + conf, + broker_client, + remote_storage, + map: tenants, + }); Ok(()) } @@ -247,10 +291,17 @@ pub async fn shutdown_all_tenants() { info!("tenants map is empty"); return; } - TenantsMap::Open(tenants) => { - let tenants_clone = tenants.clone(); - *m = TenantsMap::ShuttingDown(std::mem::take(tenants)); - tenants_clone + TenantsMap::Open(lazy) => { + let online_tenants: Vec> = lazy + .map + .iter() + .filter_map(|(_, v)| v.get()) + .cloned() + .collect(); + *m = TenantsMap::ShuttingDown(HashMap::from_iter( + online_tenants.iter().map(|t| (t.tenant_id(), t.clone())), + )); + online_tenants } TenantsMap::ShuttingDown(_) => { error!("already shutting down, this function isn't supposed to be called more than once"); @@ -277,7 +328,8 @@ pub async fn shutdown_all_tenants() { // It's mesed up. let mut join_set = JoinSet::new(); let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len()); - for (tenant_id, tenant) in tenants_to_shut_down { + for tenant in tenants_to_shut_down { + let tenant_id = tenant.tenant_id(); join_set.spawn( async move { match tenant.set_stopping().await { @@ -421,6 +473,8 @@ pub enum GetTenantError { NotFound(TenantId), #[error("Tenant {0} is not active")] NotActive(TenantId), + #[error("Tenant {0} can not be loaded")] + NotLoaded(TenantId, anyhow::Error), } /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query. @@ -430,9 +484,7 @@ pub async fn get_tenant( active_only: bool, ) -> Result, GetTenantError> { let m = TENANTS.read().await; - let tenant = m - .get(&tenant_id) - .ok_or(GetTenantError::NotFound(tenant_id))?; + let tenant = m.get(&tenant_id).await?; if active_only && !tenant.is_active() { Err(GetTenantError::NotActive(tenant_id)) } else { @@ -560,13 +612,18 @@ pub enum TenantMapListError { /// pub async fn list_tenants() -> Result, TenantMapListError> { let tenants = TENANTS.read().await; - let m = match &*tenants { - TenantsMap::Initializing => return Err(TenantMapListError::Initializing), - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m, - }; - Ok(m.iter() - .map(|(id, tenant)| (*id, tenant.current_state())) - .collect()) + match &*tenants { + TenantsMap::Initializing => Err(TenantMapListError::Initializing), + TenantsMap::Open(m) => Ok(m + .map + .iter() + .filter_map(|(k, v)| v.get().map(|tenant| (*k, tenant.current_state()))) + .collect()), + TenantsMap::ShuttingDown(m) => Ok(m + .iter() + .map(|(id, tenant)| (*id, tenant.current_state())) + .collect()), + } } /// Execute Attach mgmt API command. @@ -634,22 +691,23 @@ where F: FnOnce() -> anyhow::Result>, { let mut guard = TENANTS.write().await; - let m = match &mut *guard { - TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), - TenantsMap::Open(m) => m, - }; - match m.entry(tenant_id) { - hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists( - tenant_id, - e.get().current_state(), - )), - hash_map::Entry::Vacant(v) => match insert_fn() { - Ok(tenant) => { - v.insert(tenant.clone()); - Ok(tenant) - } - Err(e) => Err(TenantMapInsertError::Closure(e)), + match &mut *guard { + TenantsMap::Initializing => Err(TenantMapInsertError::StillInitializing), + TenantsMap::ShuttingDown(_) => Err(TenantMapInsertError::ShuttingDown), + TenantsMap::Open(m) => match m.map.entry(tenant_id) { + hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists( + tenant_id, + e.get() + .get() + .map_or(TenantState::Loading, |tenant| tenant.current_state()), + )), + hash_map::Entry::Vacant(v) => match insert_fn() { + Ok(tenant) => { + v.insert(OnceCell::new_with(Some(tenant.clone()))); + Ok(tenant) + } + Err(e) => Err(TenantMapInsertError::Closure(e)), + }, }, } } @@ -671,8 +729,8 @@ where // avoid holding the lock for the entire process. { let tenants_accessor = TENANTS.write().await; - match tenants_accessor.get(&tenant_id) { - Some(tenant) => { + match tenants_accessor.get(&tenant_id).await { + Ok(tenant) => { let tenant = Arc::clone(tenant); // don't hold TENANTS lock while set_stopping waits for activation to finish drop(tenants_accessor); @@ -689,7 +747,7 @@ where } } } - None => return Err(TenantStateError::NotFound(tenant_id)), + Err(_) => return Err(TenantStateError::NotFound(tenant_id)), } } @@ -704,18 +762,18 @@ where { Ok(hook_value) => { let mut tenants_accessor = TENANTS.write().await; - if tenants_accessor.remove(&tenant_id).is_none() { + if !tenants_accessor.remove(&tenant_id) { warn!("Tenant {tenant_id} got removed from memory before operation finished"); } Ok(hook_value) } Err(e) => { let tenants_accessor = TENANTS.read().await; - match tenants_accessor.get(&tenant_id) { - Some(tenant) => { + match tenants_accessor.get(&tenant_id).await { + Ok(tenant) => { tenant.set_broken(e.to_string()).await; } - None => { + Err(_) => { warn!("Tenant {tenant_id} got removed from memory"); return Err(TenantStateError::NotFound(tenant_id)); } @@ -736,13 +794,7 @@ pub async fn immediate_gc( gc_req: TimelineGcRequest, ctx: &RequestContext, ) -> Result>, ApiError> { - let guard = TENANTS.read().await; - let tenant = guard - .get(&tenant_id) - .map(Arc::clone) - .with_context(|| format!("tenant {tenant_id}")) - .map_err(ApiError::NotFound)?; - + let tenant = get_tenant(tenant_id, false).await?; let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon()); // Use tenant's pitr setting let pitr = tenant.get_pitr_interval(); @@ -772,10 +824,6 @@ pub async fn immediate_gc( Ok(()) } ); - - // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task - drop(guard); - Ok(wait_task_done) } @@ -789,6 +837,7 @@ pub async fn immediate_compact( let tenant = guard .get(&tenant_id) + .await .map(Arc::clone) .with_context(|| format!("tenant {tenant_id}")) .map_err(ApiError::NotFound)?;