mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Lazy loadig of tenants on pageserver startup
This commit is contained in:
@@ -159,6 +159,9 @@ impl From<GetTenantError> for ApiError {
|
||||
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
e @ GetTenantError::NotLoaded(_, _) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1179,6 +1179,7 @@ async fn get_active_tenant_with_timeout(
|
||||
let tenant = match mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||
Err(e @ GetTenantError::NotLoaded(_, _)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||
Err(GetTenantError::NotActive(_)) => {
|
||||
unreachable!("we're calling get_tenant with active=false")
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use tokio::fs;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{OnceCell, RwLock};
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::*;
|
||||
|
||||
@@ -29,6 +29,49 @@ use utils::completion;
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
struct LazyTenantsMap {
|
||||
conf: &'static PageServerConf,
|
||||
map: HashMap<TenantId, OnceCell<Arc<Tenant>>>,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
}
|
||||
|
||||
impl LazyTenantsMap {
|
||||
fn load_tenant(&self, tenant_id: &TenantId) -> anyhow::Result<Arc<Tenant>> {
|
||||
let tenant_path = self.conf.tenant_path(tenant_id);
|
||||
let tenant_ignore_mark = self.conf.tenant_ignore_mark_file_path(*tenant_id);
|
||||
if tenant_ignore_mark.exists() {
|
||||
std::fs::remove_file(&tenant_ignore_mark)
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
schedule_local_tenant_processing(
|
||||
self.conf,
|
||||
&tenant_path,
|
||||
self.broker_client.clone(),
|
||||
self.remote_storage.clone(),
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))
|
||||
}
|
||||
|
||||
async fn try_load_tenant(&self, tenant_id: &TenantId) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
self.load_tenant(tenant_id)
|
||||
.map_err(|e| GetTenantError::NotLoaded(*tenant_id, e))
|
||||
}
|
||||
|
||||
async fn get(&self, tenant_id: &TenantId) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
let tenant = self
|
||||
.map
|
||||
.get(tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(*tenant_id))?;
|
||||
Ok(tenant
|
||||
.get_or_try_init(|| self.try_load_tenant(tenant_id))
|
||||
.await?)
|
||||
}
|
||||
}
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
enum TenantsMap {
|
||||
@@ -36,23 +79,27 @@ enum TenantsMap {
|
||||
Initializing,
|
||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||
/// New tenants can be added using [`tenant_map_insert`].
|
||||
Open(HashMap<TenantId, Arc<Tenant>>),
|
||||
Open(LazyTenantsMap),
|
||||
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
|
||||
/// Existing tenants are still accessible, but no new tenants can be created.
|
||||
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
||||
async fn get(&self, tenant_id: &TenantId) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
|
||||
TenantsMap::Initializing => Err(GetTenantError::NotFound(*tenant_id)),
|
||||
TenantsMap::Open(m) => m.get(tenant_id).await,
|
||||
TenantsMap::ShuttingDown(m) => {
|
||||
m.get(tenant_id).ok_or(GetTenantError::NotFound(*tenant_id))
|
||||
}
|
||||
}
|
||||
}
|
||||
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
||||
fn remove(&mut self, tenant_id: &TenantId) -> bool {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
||||
TenantsMap::Initializing => false,
|
||||
TenantsMap::Open(m) => m.map.remove(tenant_id).is_some(),
|
||||
TenantsMap::ShuttingDown(m) => m.remove(tenant_id).is_some(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,7 +114,7 @@ pub async fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_done: (completion::Completion, completion::Barrier),
|
||||
_init_done: (completion::Completion, completion::Barrier),
|
||||
) -> anyhow::Result<()> {
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenants_dir = conf.tenants_path();
|
||||
@@ -78,8 +125,6 @@ pub async fn init_tenant_mgr(
|
||||
.await
|
||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||
|
||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||
|
||||
loop {
|
||||
match dir_entries.next_entry().await {
|
||||
Ok(None) => break,
|
||||
@@ -119,21 +164,15 @@ pub async fn init_tenant_mgr(
|
||||
continue;
|
||||
}
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
&tenant_dir_path,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
Some(init_done.clone()),
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant.tenant_id(), tenant);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
|
||||
}
|
||||
}
|
||||
let tenant_id = tenant_dir_path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
.with_context(|| {
|
||||
format!("Could not parse tenant id out of the tenant dir name in path {tenant_dir_path:?}")
|
||||
})?;
|
||||
tenants.insert(tenant_id, OnceCell::new());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -151,7 +190,12 @@ pub async fn init_tenant_mgr(
|
||||
|
||||
let mut tenants_map = TENANTS.write().await;
|
||||
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
|
||||
*tenants_map = TenantsMap::Open(tenants);
|
||||
*tenants_map = TenantsMap::Open(LazyTenantsMap {
|
||||
conf,
|
||||
broker_client,
|
||||
remote_storage,
|
||||
map: tenants,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -247,10 +291,17 @@ pub async fn shutdown_all_tenants() {
|
||||
info!("tenants map is empty");
|
||||
return;
|
||||
}
|
||||
TenantsMap::Open(tenants) => {
|
||||
let tenants_clone = tenants.clone();
|
||||
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
|
||||
tenants_clone
|
||||
TenantsMap::Open(lazy) => {
|
||||
let online_tenants: Vec<Arc<Tenant>> = lazy
|
||||
.map
|
||||
.iter()
|
||||
.filter_map(|(_, v)| v.get())
|
||||
.cloned()
|
||||
.collect();
|
||||
*m = TenantsMap::ShuttingDown(HashMap::from_iter(
|
||||
online_tenants.iter().map(|t| (t.tenant_id(), t.clone())),
|
||||
));
|
||||
online_tenants
|
||||
}
|
||||
TenantsMap::ShuttingDown(_) => {
|
||||
error!("already shutting down, this function isn't supposed to be called more than once");
|
||||
@@ -277,7 +328,8 @@ pub async fn shutdown_all_tenants() {
|
||||
// It's mesed up.
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
||||
for (tenant_id, tenant) in tenants_to_shut_down {
|
||||
for tenant in tenants_to_shut_down {
|
||||
let tenant_id = tenant.tenant_id();
|
||||
join_set.spawn(
|
||||
async move {
|
||||
match tenant.set_stopping().await {
|
||||
@@ -421,6 +473,8 @@ pub enum GetTenantError {
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
#[error("Tenant {0} can not be loaded")]
|
||||
NotLoaded(TenantId, anyhow::Error),
|
||||
}
|
||||
|
||||
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
||||
@@ -430,9 +484,7 @@ pub async fn get_tenant(
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
let m = TENANTS.read().await;
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(tenant_id))?;
|
||||
let tenant = m.get(&tenant_id).await?;
|
||||
if active_only && !tenant.is_active() {
|
||||
Err(GetTenantError::NotActive(tenant_id))
|
||||
} else {
|
||||
@@ -560,13 +612,18 @@ pub enum TenantMapListError {
|
||||
///
|
||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().await;
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
|
||||
};
|
||||
Ok(m.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect())
|
||||
match &*tenants {
|
||||
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) => Ok(m
|
||||
.map
|
||||
.iter()
|
||||
.filter_map(|(k, v)| v.get().map(|tenant| (*k, tenant.current_state())))
|
||||
.collect()),
|
||||
TenantsMap::ShuttingDown(m) => Ok(m
|
||||
.iter()
|
||||
.map(|(id, tenant)| (*id, tenant.current_state()))
|
||||
.collect()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute Attach mgmt API command.
|
||||
@@ -634,22 +691,23 @@ where
|
||||
F: FnOnce() -> anyhow::Result<Arc<Tenant>>,
|
||||
{
|
||||
let mut guard = TENANTS.write().await;
|
||||
let m = match &mut *guard {
|
||||
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
|
||||
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
match m.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
|
||||
tenant_id,
|
||||
e.get().current_state(),
|
||||
)),
|
||||
hash_map::Entry::Vacant(v) => match insert_fn() {
|
||||
Ok(tenant) => {
|
||||
v.insert(tenant.clone());
|
||||
Ok(tenant)
|
||||
}
|
||||
Err(e) => Err(TenantMapInsertError::Closure(e)),
|
||||
match &mut *guard {
|
||||
TenantsMap::Initializing => Err(TenantMapInsertError::StillInitializing),
|
||||
TenantsMap::ShuttingDown(_) => Err(TenantMapInsertError::ShuttingDown),
|
||||
TenantsMap::Open(m) => match m.map.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
|
||||
tenant_id,
|
||||
e.get()
|
||||
.get()
|
||||
.map_or(TenantState::Loading, |tenant| tenant.current_state()),
|
||||
)),
|
||||
hash_map::Entry::Vacant(v) => match insert_fn() {
|
||||
Ok(tenant) => {
|
||||
v.insert(OnceCell::new_with(Some(tenant.clone())));
|
||||
Ok(tenant)
|
||||
}
|
||||
Err(e) => Err(TenantMapInsertError::Closure(e)),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -671,8 +729,8 @@ where
|
||||
// avoid holding the lock for the entire process.
|
||||
{
|
||||
let tenants_accessor = TENANTS.write().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
match tenants_accessor.get(&tenant_id).await {
|
||||
Ok(tenant) => {
|
||||
let tenant = Arc::clone(tenant);
|
||||
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
||||
drop(tenants_accessor);
|
||||
@@ -689,7 +747,7 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
Err(_) => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -704,18 +762,18 @@ where
|
||||
{
|
||||
Ok(hook_value) => {
|
||||
let mut tenants_accessor = TENANTS.write().await;
|
||||
if tenants_accessor.remove(&tenant_id).is_none() {
|
||||
if !tenants_accessor.remove(&tenant_id) {
|
||||
warn!("Tenant {tenant_id} got removed from memory before operation finished");
|
||||
}
|
||||
Ok(hook_value)
|
||||
}
|
||||
Err(e) => {
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
match tenants_accessor.get(&tenant_id).await {
|
||||
Ok(tenant) => {
|
||||
tenant.set_broken(e.to_string()).await;
|
||||
}
|
||||
None => {
|
||||
Err(_) => {
|
||||
warn!("Tenant {tenant_id} got removed from memory");
|
||||
return Err(TenantStateError::NotFound(tenant_id));
|
||||
}
|
||||
@@ -736,13 +794,7 @@ pub async fn immediate_gc(
|
||||
gc_req: TimelineGcRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
|
||||
let guard = TENANTS.read().await;
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let tenant = get_tenant(tenant_id, false).await?;
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
// Use tenant's pitr setting
|
||||
let pitr = tenant.get_pitr_interval();
|
||||
@@ -772,10 +824,6 @@ pub async fn immediate_gc(
|
||||
Ok(())
|
||||
}
|
||||
);
|
||||
|
||||
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
|
||||
drop(guard);
|
||||
|
||||
Ok(wait_task_done)
|
||||
}
|
||||
|
||||
@@ -789,6 +837,7 @@ pub async fn immediate_compact(
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.await
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
Reference in New Issue
Block a user