tenant::mgr: explicit tracking of initializing & shutting-down states

This patch wrap the tenants hashmap into an enum that represents the
tenant manager's three major states:
- Initializing
- Open for business
- Shutting down.
See the enum doc comments for details.

In response, all the users of `TENANTS` are now forced to distinguish
those states.
The only major change is in `run_if_no_tenant_in_memory`, which,
before this patch, was used by the /attach and /load endpoints.
This patch rewrites that method under the name `tenant_map_insert`,
replacing the anyhow::Result with a std Result and a dedicated error
type.
Introducing this error types allows using `tenant_map_insert` in
`tenant_create`, thereby unifying all code paths that create tenants
objects to use `tenant_map_insert`.

This is beneficial because we can now systematically prevent tenants
from being created, attached, or `/load`ed during pageserver shutdown.
The management API remains available, but the endpoints that create
new tenants will fail with an error.
More work would need to be done to properly distinguish these errors
through HTTP status codes such as 503.
This commit is contained in:
Christian Schwarz
2023-01-19 13:29:19 +01:00
committed by Christian Schwarz
parent cd5732d9d8
commit dc64962ffc
3 changed files with 179 additions and 82 deletions

View File

@@ -115,7 +115,7 @@ pub async fn collect_metrics_iteration(
);
// get list of tenants
let tenants = mgr::list_tenants().await;
let tenants = mgr::list_tenants().await?;
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
@@ -276,7 +276,13 @@ pub async fn calculate_synthetic_size_worker(
},
_ = ticker.tick() => {
let tenants = mgr::list_tenants().await;
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(e) => {
warn!("cannot get tenant list: {e:#}");
continue;
}
};
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {

View File

@@ -16,6 +16,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::TenantMapInsertError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
@@ -99,6 +100,18 @@ fn apierror_from_prerror(err: PageReconstructError) -> ApiError {
}
}
fn apierror_from_tenant_map_insert_error(e: TenantMapInsertError) -> ApiError {
match e {
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
}
TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
}
}
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
timeline: &Arc<Timeline>,
@@ -356,11 +369,10 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
let state = get_state(&request);
if let Some(remote_storage) = &state.remote_storage {
// FIXME: distinguish between "Tenant already exists" and other errors
mgr::attach_tenant(state.conf, tenant_id, remote_storage.clone(), &ctx)
.instrument(info_span!("tenant_attach", tenant = %tenant_id))
.await
.map_err(ApiError::InternalServerError)?;
.map_err(apierror_from_tenant_map_insert_error)?;
} else {
return Err(ApiError::BadRequest(anyhow!(
"attach_tenant is not possible because pageserver was configured without remote storage"
@@ -414,7 +426,7 @@ async fn tenant_load_handler(request: Request<Body>) -> Result<Response<Body>, A
mgr::load_tenant(state.conf, tenant_id, state.remote_storage.clone(), &ctx)
.instrument(info_span!("load", tenant = %tenant_id))
.await
.map_err(ApiError::InternalServerError)?;
.map_err(apierror_from_tenant_map_insert_error)?;
json_response(StatusCode::ACCEPTED, ())
}
@@ -441,6 +453,8 @@ async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, A
let response_data = mgr::list_tenants()
.instrument(info_span!("tenant_list"))
.await
.map_err(anyhow::Error::new)
.map_err(ApiError::InternalServerError)?
.iter()
.map(|(id, state)| TenantInfo {
id: *id,
@@ -638,31 +652,24 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
)
.instrument(info_span!("tenant_create", tenant = ?target_tenant_id))
.await
// FIXME: `create_tenant` can fail from both user and internal errors. Replace this
// with better error handling once the type permits it
.map_err(ApiError::InternalServerError)?;
.map_err(apierror_from_tenant_map_insert_error)?;
Ok(match new_tenant {
Some(tenant) => {
// We created the tenant. Existing API semantics are that the tenant
// is Active when this function returns.
if let res @ Err(_) = tenant.wait_to_become_active().await {
// This shouldn't happen because we just created the tenant directory
// in tenant::mgr::create_tenant, and there aren't any remote timelines
// to load, so, nothing can really fail during load.
// Don't do cleanup because we don't know how we got here.
// The tenant will likely be in `Broken` state and subsequent
// calls will fail.
res.context("created tenant failed to become active")
.map_err(ApiError::InternalServerError)?;
}
json_response(
StatusCode::CREATED,
TenantCreateResponse(tenant.tenant_id()),
)?
}
None => json_response(StatusCode::CONFLICT, ())?,
})
// We created the tenant. Existing API semantics are that the tenant
// is Active when this function returns.
if let res @ Err(_) = new_tenant.wait_to_become_active().await {
// This shouldn't happen because we just created the tenant directory
// in tenant::mgr::create_tenant, and there aren't any remote timelines
// to load, so, nothing can really fail during load.
// Don't do cleanup because we don't know how we got here.
// The tenant will likely be in `Broken` state and subsequent
// calls will fail.
res.context("created tenant failed to become active")
.map_err(ApiError::InternalServerError)?;
}
json_response(
StatusCode::CREATED,
TenantCreateResponse(new_tenant.tenant_id()),
)
}
async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -25,8 +25,35 @@ use crate::IGNORED_TENANT_FILE_NAME;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
static TENANTS: Lazy<RwLock<HashMap<TenantId, Arc<Tenant>>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
enum TenantsMap {
/// [`init_tenant_mgr`] is not done yet.
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_insert`].
Open(HashMap<TenantId, Arc<Tenant>>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
}
impl TenantsMap {
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
}
}
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
}
}
}
static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
@@ -37,9 +64,10 @@ pub async fn init_tenant_mgr(
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let mut number_of_tenants = 0;
let tenants_dir = conf.tenants_path();
let mut tenants = HashMap::new();
let mut dir_entries = fs::read_dir(&tenants_dir)
.await
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
@@ -92,8 +120,7 @@ pub async fn init_tenant_mgr(
&ctx,
) {
Ok(tenant) => {
TENANTS.write().await.insert(tenant.tenant_id(), tenant);
number_of_tenants += 1;
tenants.insert(tenant.tenant_id(), tenant);
}
Err(e) => {
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
@@ -112,7 +139,11 @@ pub async fn init_tenant_mgr(
}
}
info!("Processed {number_of_tenants} local tenants at startup");
info!("Processed {} local tenants at startup", tenants.len());
let mut tenants_map = TENANTS.write().await;
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
*tenants_map = TenantsMap::Open(tenants);
Ok(())
}
@@ -171,21 +202,44 @@ pub fn schedule_local_tenant_processing(
///
/// Shut down all tenants. This runs as part of pageserver shutdown.
///
/// NB: We leave the tenants in the map, so that they remain accessible through
/// the management API until we shut it down. If we removed the shut-down tenants
/// from the tenants map, the management API would return 404 for these tenants,
/// because TenantsMap::get() now returns `None`.
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
pub async fn shutdown_all_tenants() {
// Prevent new tenants from being created.
let tenants_to_shut_down = {
let mut m = TENANTS.write().await;
let mut tenants_to_shut_down = Vec::with_capacity(m.len());
for (_, tenant) in m.drain() {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping();
tenants_to_shut_down.push(tenant)
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(HashMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let tenants_clone = tenants.clone();
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
tenants_clone
}
TenantsMap::ShuttingDown(_) => {
error!("already shutting down, this function isn't supposed to be called more than once");
return;
}
}
drop(m);
tenants_to_shut_down
};
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (_, tenant) in tenants_to_shut_down {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping();
tenants_to_freeze_and_flush.push(tenant);
}
}
// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
@@ -197,7 +251,7 @@ pub async fn shutdown_all_tenants() {
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
for tenant in tenants_to_shut_down {
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
@@ -213,27 +267,22 @@ pub async fn create_tenant(
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> anyhow::Result<Option<Arc<Tenant>>> {
match TENANTS.write().await.entry(tenant_id) {
hash_map::Entry::Occupied(_) => {
debug!("tenant {tenant_id} already exists");
Ok(None)
}
hash_map::Entry::Vacant(v) => {
// Hold the write_tenants() lock, since all of this is local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`.
let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id)?;
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
let crated_tenant_id = created_tenant.tenant_id();
anyhow::ensure!(
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, |vacant_entry| {
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id)?;
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
let crated_tenant_id = created_tenant.tenant_id();
anyhow::ensure!(
tenant_id == crated_tenant_id,
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
);
v.insert(Arc::clone(&created_tenant));
Ok(Some(created_tenant))
}
}
vacant_entry.insert(Arc::clone(&created_tenant));
Ok(created_tenant)
}).await
}
pub async fn update_tenant_config(
@@ -302,8 +351,8 @@ pub async fn load_tenant(
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
run_if_no_tenant_in_memory(tenant_id, |vacant_entry| {
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, |vacant_entry| {
let tenant_path = conf.tenant_path(&tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(tenant_id);
if tenant_ignore_mark.exists() {
@@ -340,16 +389,24 @@ pub async fn ignore_tenant(
.await
}
#[derive(Debug, thiserror::Error)]
pub enum TenantMapListError {
#[error("tenant map is still initiailizing")]
Initializing,
}
///
/// Get list of tenants, for the mgmt API
///
pub async fn list_tenants() -> Vec<(TenantId, TenantState)> {
TENANTS
.read()
.await
.iter()
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect()
.collect())
}
/// Execute Attach mgmt API command.
@@ -361,8 +418,8 @@ pub async fn attach_tenant(
tenant_id: TenantId,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<()> {
run_if_no_tenant_in_memory(tenant_id, |vacant_entry| {
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, |vacant_entry| {
let tenant_path = conf.tenant_path(&tenant_id);
anyhow::ensure!(
!tenant_path.exists(),
@@ -371,24 +428,51 @@ pub async fn attach_tenant(
let tenant = Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx);
vacant_entry.insert(tenant);
Ok(())
})
.await
}
async fn run_if_no_tenant_in_memory<F, V>(tenant_id: TenantId, run: F) -> anyhow::Result<V>
#[derive(Debug, thiserror::Error)]
pub enum TenantMapInsertError {
#[error("tenant map is still initializing")]
StillInitializing,
#[error("tenant map is shutting down")]
ShuttingDown,
#[error("tenant {0} already exists, state: {1:?}")]
TenantAlreadyExists(TenantId, TenantState),
#[error(transparent)]
Closure(#[from] anyhow::Error),
}
/// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that
/// entry is vacant. The closure is responsible for creating the tenant object and inserting
/// it into the tenants map through the vacnt entry that it receives as argument.
///
/// NB: the closure should return quickly because the current implementation of tenants map
/// serializes access through an `RwLock`.
async fn tenant_map_insert<F, V>(
tenant_id: TenantId,
insert_fn: F,
) -> Result<V, TenantMapInsertError>
where
F: FnOnce(hash_map::VacantEntry<TenantId, Arc<Tenant>>) -> anyhow::Result<V>,
{
match TENANTS.write().await.entry(tenant_id) {
hash_map::Entry::Occupied(e) => {
anyhow::bail!(
"tenant {tenant_id} already exists, state: {:?}",
e.get().current_state()
)
}
hash_map::Entry::Vacant(v) => run(v),
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Vacant(v) => match insert_fn(v) {
Ok(v) => Ok(v),
Err(e) => Err(TenantMapInsertError::Closure(e)),
},
}
}