mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Change tenant_mgr to use TenantState.
It allows to avoid locking entire TENANTS list while one tenant is bootstrapping and prepares the code for remote storage integration.
This commit is contained in:
@@ -9,8 +9,8 @@ use crate::PageServerConf;
|
|||||||
use anyhow::{anyhow, bail, Context, Result};
|
use anyhow::{anyhow, bail, Context, Result};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::{debug, info};
|
use log::{debug, info};
|
||||||
use std::collections::hash_map::Entry;
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::fmt;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@@ -19,13 +19,47 @@ use std::thread::JoinHandle;
|
|||||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref REPOSITORY: Mutex<HashMap<ZTenantId, Arc<dyn Repository>>> =
|
static ref TENANTS: Mutex<HashMap<ZTenantId, Tenant>> = Mutex::new(HashMap::new());
|
||||||
Mutex::new(HashMap::new());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn access_repository() -> MutexGuard<'static, HashMap<ZTenantId, Arc<dyn Repository>>> {
|
struct Tenant {
|
||||||
REPOSITORY.lock().unwrap()
|
state: TenantState,
|
||||||
|
repo: Option<Arc<dyn Repository>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum TenantState {
|
||||||
|
// This tenant only exists in cloud storage. It cannot be accessed.
|
||||||
|
CloudOnly,
|
||||||
|
// This tenant exists in cloud storage, and we are currently downloading it to local disk.
|
||||||
|
// It cannot be accessed yet, not until it's been fully downloaded to local disk.
|
||||||
|
Downloading,
|
||||||
|
// All data for this tenant is complete on local disk, but we haven't loaded the Repository,
|
||||||
|
// Timeline and Layer structs into memory yet, so it cannot be accessed yet.
|
||||||
|
//Ready,
|
||||||
|
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
||||||
|
// The local disk might have some newer files that don't exist in cloud storage yet.
|
||||||
|
Active,
|
||||||
|
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
||||||
|
// The local disk might have some newer files that don't exist in cloud storage yet.
|
||||||
|
// The tenant cannot be accessed anymore for any reason, but graceful shutdown.
|
||||||
|
//Stopping,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for TenantState {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
TenantState::CloudOnly => f.write_str("CloudOnly"),
|
||||||
|
TenantState::Downloading => f.write_str("Downloading"),
|
||||||
|
TenantState::Active => f.write_str("Active"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||||
|
TENANTS.lock().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
struct TenantHandleEntry {
|
struct TenantHandleEntry {
|
||||||
checkpointer_handle: Option<JoinHandle<()>>,
|
checkpointer_handle: Option<JoinHandle<()>>,
|
||||||
gc_handle: Option<JoinHandle<()>>,
|
gc_handle: Option<JoinHandle<()>>,
|
||||||
@@ -41,17 +75,25 @@ lazy_static! {
|
|||||||
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
|
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
|
||||||
|
|
||||||
pub fn init(conf: &'static PageServerConf) {
|
pub fn init(conf: &'static PageServerConf) {
|
||||||
let mut m = access_repository();
|
|
||||||
for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() {
|
for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() {
|
||||||
let tenantid =
|
let tenantid =
|
||||||
ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap();
|
ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap();
|
||||||
let repo = init_repo(conf, tenantid);
|
|
||||||
|
{
|
||||||
|
let mut m = access_tenants();
|
||||||
|
let tenant = Tenant {
|
||||||
|
state: TenantState::CloudOnly,
|
||||||
|
repo: None,
|
||||||
|
};
|
||||||
|
m.insert(tenantid, tenant);
|
||||||
|
}
|
||||||
|
|
||||||
|
init_repo(conf, tenantid);
|
||||||
info!("initialized storage for tenant: {}", &tenantid);
|
info!("initialized storage for tenant: {}", &tenantid);
|
||||||
m.insert(tenantid, repo);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc<LayeredRepository> {
|
fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
|
||||||
// Set up a WAL redo manager, for applying WAL records.
|
// Set up a WAL redo manager, for applying WAL records.
|
||||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||||
|
|
||||||
@@ -74,7 +116,10 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc<Layered
|
|||||||
|
|
||||||
handles.insert(tenant_id, h);
|
handles.insert(tenant_id, h);
|
||||||
|
|
||||||
repo
|
let mut m = access_tenants();
|
||||||
|
let tenant = m.get_mut(&tenant_id).unwrap();
|
||||||
|
tenant.repo = Some(repo);
|
||||||
|
tenant.state = TenantState::Active;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO kb Currently unused function, will later be used when the relish storage downloads a new layer.
|
// TODO kb Currently unused function, will later be used when the relish storage downloads a new layer.
|
||||||
@@ -89,15 +134,23 @@ pub fn register_relish_download(
|
|||||||
tenant_id,
|
tenant_id,
|
||||||
timeline_id
|
timeline_id
|
||||||
);
|
);
|
||||||
match access_repository().entry(tenant_id) {
|
|
||||||
Entry::Occupied(o) => init_timeline(o.get().as_ref(), timeline_id),
|
{
|
||||||
Entry::Vacant(v) => {
|
let mut m = access_tenants();
|
||||||
log::info!("New repo initialized");
|
let mut tenant = m.get_mut(&tenant_id).unwrap();
|
||||||
let new_repo = init_repo(conf, tenant_id);
|
tenant.state = TenantState::Downloading;
|
||||||
init_timeline(new_repo.as_ref(), timeline_id);
|
match &tenant.repo {
|
||||||
v.insert(new_repo);
|
Some(repo) => init_timeline(repo.as_ref(), timeline_id),
|
||||||
|
None => {
|
||||||
|
log::info!("Initialize new repo");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// init repo updates Tenant state
|
||||||
|
init_repo(conf, tenant_id);
|
||||||
|
let new_repo = get_repository_for_tenant(tenant_id).unwrap();
|
||||||
|
init_timeline(new_repo.as_ref(), timeline_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) {
|
fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) {
|
||||||
@@ -125,8 +178,8 @@ pub fn stop_tenant_threads(tenantid: ZTenantId) {
|
|||||||
pub fn shutdown_all_tenants() -> Result<()> {
|
pub fn shutdown_all_tenants() -> Result<()> {
|
||||||
SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed);
|
SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed);
|
||||||
|
|
||||||
let tenants = list_tenants()?;
|
let tenantids = list_tenantids()?;
|
||||||
for tenantid in tenants {
|
for tenantid in tenantids {
|
||||||
stop_tenant_threads(tenantid);
|
stop_tenant_threads(tenantid);
|
||||||
let repo = get_repository_for_tenant(tenantid)?;
|
let repo = get_repository_for_tenant(tenantid)?;
|
||||||
debug!("shutdown tenant {}", tenantid);
|
debug!("shutdown tenant {}", tenantid);
|
||||||
@@ -140,25 +193,40 @@ pub fn create_repository_for_tenant(
|
|||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut m = access_repository();
|
{
|
||||||
|
let mut m = access_tenants();
|
||||||
// First check that the tenant doesn't exist already
|
// First check that the tenant doesn't exist already
|
||||||
if m.get(&tenantid).is_some() {
|
if m.get(&tenantid).is_some() {
|
||||||
bail!("tenant {} already exists", tenantid);
|
bail!("tenant {} already exists", tenantid);
|
||||||
|
}
|
||||||
|
let tenant = Tenant {
|
||||||
|
state: TenantState::CloudOnly,
|
||||||
|
repo: None,
|
||||||
|
};
|
||||||
|
m.insert(tenantid, tenant);
|
||||||
}
|
}
|
||||||
|
|
||||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
||||||
let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?;
|
let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?;
|
||||||
|
|
||||||
m.insert(tenantid, repo);
|
let mut m = access_tenants();
|
||||||
|
let tenant = m.get_mut(&tenantid).unwrap();
|
||||||
|
tenant.repo = Some(repo);
|
||||||
|
tenant.state = TenantState::Active;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
|
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
|
||||||
access_repository()
|
let m = access_tenants();
|
||||||
|
let tenant = m
|
||||||
.get(&tenantid)
|
.get(&tenantid)
|
||||||
.map(Arc::clone)
|
.ok_or_else(|| anyhow!("Tenant not found for tenant {}", tenantid));
|
||||||
.ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid))
|
|
||||||
|
match &tenant.unwrap().repo {
|
||||||
|
Some(repo) => Ok(Arc::clone(repo)),
|
||||||
|
None => anyhow::bail!("Repository for tenant {} is not yet valid", tenantid),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_timeline_for_tenant(
|
pub fn get_timeline_for_tenant(
|
||||||
@@ -170,12 +238,11 @@ pub fn get_timeline_for_tenant(
|
|||||||
.with_context(|| format!("cannot fetch timeline {}", timelineid))
|
.with_context(|| format!("cannot fetch timeline {}", timelineid))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn list_tenants() -> Result<Vec<ZTenantId>> {
|
fn list_tenantids() -> Result<Vec<ZTenantId>> {
|
||||||
let o = &mut REPOSITORY.lock().unwrap();
|
let m = access_tenants();
|
||||||
|
m.iter()
|
||||||
o.iter()
|
.map(|v| {
|
||||||
.map(|tenant| {
|
let (tenantid, _) = v;
|
||||||
let (tenantid, _) = tenant;
|
|
||||||
Ok(*tenantid)
|
Ok(*tenantid)
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
|
|||||||
Reference in New Issue
Block a user