From 0dc7a3fc1595fae0eb05d344d1c4409886acdea4 Mon Sep 17 00:00:00 2001 From: anastasia Date: Mon, 18 Oct 2021 13:27:25 +0300 Subject: [PATCH] Change tenant_mgr to use TenantState. It allows to avoid locking entire TENANTS list while one tenant is bootstrapping and prepares the code for remote storage integration. --- pageserver/src/tenant_mgr.rs | 135 ++++++++++++++++++++++++++--------- 1 file changed, 101 insertions(+), 34 deletions(-) diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 1712cf1b8a..be3a36fda4 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -9,8 +9,8 @@ use crate::PageServerConf; use anyhow::{anyhow, bail, Context, Result}; use lazy_static::lazy_static; use log::{debug, info}; -use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::fmt; use std::fs; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; @@ -19,13 +19,47 @@ use std::thread::JoinHandle; use zenith_utils::zid::{ZTenantId, ZTimelineId}; lazy_static! { - static ref REPOSITORY: Mutex>> = - Mutex::new(HashMap::new()); + static ref TENANTS: Mutex> = Mutex::new(HashMap::new()); } -fn access_repository() -> MutexGuard<'static, HashMap>> { - REPOSITORY.lock().unwrap() +struct Tenant { + state: TenantState, + repo: Option>, } + +#[derive(Debug)] +enum TenantState { + // This tenant only exists in cloud storage. It cannot be accessed. + CloudOnly, + // This tenant exists in cloud storage, and we are currently downloading it to local disk. + // It cannot be accessed yet, not until it's been fully downloaded to local disk. + Downloading, + // All data for this tenant is complete on local disk, but we haven't loaded the Repository, + // Timeline and Layer structs into memory yet, so it cannot be accessed yet. + //Ready, + // This tenant exists on local disk, and the layer map has been loaded into memory. + // The local disk might have some newer files that don't exist in cloud storage yet. + Active, + // This tenant exists on local disk, and the layer map has been loaded into memory. + // The local disk might have some newer files that don't exist in cloud storage yet. + // The tenant cannot be accessed anymore for any reason, but graceful shutdown. + //Stopping, +} + +impl fmt::Display for TenantState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TenantState::CloudOnly => f.write_str("CloudOnly"), + TenantState::Downloading => f.write_str("Downloading"), + TenantState::Active => f.write_str("Active"), + } + } +} + +fn access_tenants() -> MutexGuard<'static, HashMap> { + TENANTS.lock().unwrap() +} + struct TenantHandleEntry { checkpointer_handle: Option>, gc_handle: Option>, @@ -41,17 +75,25 @@ lazy_static! { static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); pub fn init(conf: &'static PageServerConf) { - let mut m = access_repository(); for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() { let tenantid = ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap(); - let repo = init_repo(conf, tenantid); + + { + let mut m = access_tenants(); + let tenant = Tenant { + state: TenantState::CloudOnly, + repo: None, + }; + m.insert(tenantid, tenant); + } + + init_repo(conf, tenantid); info!("initialized storage for tenant: {}", &tenantid); - m.insert(tenantid, repo); } } -fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc { +fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) { // Set up a WAL redo manager, for applying WAL records. let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); @@ -74,7 +116,10 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc init_timeline(o.get().as_ref(), timeline_id), - Entry::Vacant(v) => { - log::info!("New repo initialized"); - let new_repo = init_repo(conf, tenant_id); - init_timeline(new_repo.as_ref(), timeline_id); - v.insert(new_repo); + + { + let mut m = access_tenants(); + let mut tenant = m.get_mut(&tenant_id).unwrap(); + tenant.state = TenantState::Downloading; + match &tenant.repo { + Some(repo) => init_timeline(repo.as_ref(), timeline_id), + None => { + log::info!("Initialize new repo"); + } } } + + // init repo updates Tenant state + init_repo(conf, tenant_id); + let new_repo = get_repository_for_tenant(tenant_id).unwrap(); + init_timeline(new_repo.as_ref(), timeline_id); } fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) { @@ -125,8 +178,8 @@ pub fn stop_tenant_threads(tenantid: ZTenantId) { pub fn shutdown_all_tenants() -> Result<()> { SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed); - let tenants = list_tenants()?; - for tenantid in tenants { + let tenantids = list_tenantids()?; + for tenantid in tenantids { stop_tenant_threads(tenantid); let repo = get_repository_for_tenant(tenantid)?; debug!("shutdown tenant {}", tenantid); @@ -140,25 +193,40 @@ pub fn create_repository_for_tenant( conf: &'static PageServerConf, tenantid: ZTenantId, ) -> Result<()> { - let mut m = access_repository(); - - // First check that the tenant doesn't exist already - if m.get(&tenantid).is_some() { - bail!("tenant {} already exists", tenantid); + { + let mut m = access_tenants(); + // First check that the tenant doesn't exist already + if m.get(&tenantid).is_some() { + bail!("tenant {} already exists", tenantid); + } + let tenant = Tenant { + state: TenantState::CloudOnly, + repo: None, + }; + m.insert(tenantid, tenant); } + let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?; - m.insert(tenantid, repo); + let mut m = access_tenants(); + let tenant = m.get_mut(&tenantid).unwrap(); + tenant.repo = Some(repo); + tenant.state = TenantState::Active; Ok(()) } pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result> { - access_repository() + let m = access_tenants(); + let tenant = m .get(&tenantid) - .map(Arc::clone) - .ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid)) + .ok_or_else(|| anyhow!("Tenant not found for tenant {}", tenantid)); + + match &tenant.unwrap().repo { + Some(repo) => Ok(Arc::clone(repo)), + None => anyhow::bail!("Repository for tenant {} is not yet valid", tenantid), + } } pub fn get_timeline_for_tenant( @@ -170,12 +238,11 @@ pub fn get_timeline_for_tenant( .with_context(|| format!("cannot fetch timeline {}", timelineid)) } -fn list_tenants() -> Result> { - let o = &mut REPOSITORY.lock().unwrap(); - - o.iter() - .map(|tenant| { - let (tenantid, _) = tenant; +fn list_tenantids() -> Result> { + let m = access_tenants(); + m.iter() + .map(|v| { + let (tenantid, _) = v; Ok(*tenantid) }) .collect()