pageserver: add InProgress top level state & make TenantsMap lock synchronous

This commit is contained in:
John Spray
2023-10-10 12:34:48 +01:00
parent 9c35e1e6e5
commit c5300e55f6
10 changed files with 654 additions and 405 deletions

View File

@@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker(
continue;
}
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
if let Ok(tenant) = mgr::get_tenant(tenant_id, true) {
// TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
// We can put in some prioritization for consumption metrics.
// Same for the loop that fetches computed metrics.

View File

@@ -206,7 +206,6 @@ pub(super) async fn collect_all_metrics(
None
} else {
crate::tenant::mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
}

View File

@@ -0,0 +1,2 @@
Checking pageserver v0.1.0 (/home/neon/neon/pageserver)
Finished dev [optimized + debuginfo] target(s) in 7.62s

View File

@@ -545,7 +545,7 @@ async fn collect_eviction_candidates(
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await {
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it

View File

@@ -36,7 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
@@ -147,28 +148,60 @@ impl From<PageReconstructError> for ApiError {
impl From<TenantMapInsertError> for ApiError {
fn from(tmie: TenantMapInsertError) -> ApiError {
match tmie {
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::ResourceUnavailable(format!("{tmie}").into())
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
}
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
}
TenantMapInsertError::SlotError(e) => e.into(),
TenantMapInsertError::SlotUpsertError(e) => e.into(),
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}
impl From<TenantSlotError> for ApiError {
fn from(e: TenantSlotError) -> ApiError {
use TenantSlotError::*;
match e {
NotFound(tenant_id) => {
ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
}
e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
e @ Conflict(_) => ApiError::Conflict(format!("{e}")),
InProgress => {
ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
}
MapState(e) => e.into(),
}
}
}
impl From<TenantSlotUpsertError> for ApiError {
fn from(e: TenantSlotUpsertError) -> ApiError {
use TenantSlotUpsertError::*;
match e {
InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
MapState(e) => e.into(),
}
}
}
impl From<TenantMapError> for ApiError {
fn from(e: TenantMapError) -> ApiError {
use TenantMapError::*;
match e {
StillInitializing | ShuttingDown => {
ApiError::ResourceUnavailable(format!("{e}").into())
}
}
}
}
impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
TenantStateError::SlotError(e) => e.into(),
TenantStateError::SlotUpsertError(e) => e.into(),
TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
}
}
}
@@ -243,6 +276,9 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
Get(g) => ApiError::from(g),
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
Timeline(t) => ApiError::from(t),
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
SlotError(e) => e.into(),
SlotUpsertError(e) => e.into(),
Other(o) => ApiError::InternalServerError(o),
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
}
@@ -369,7 +405,7 @@ async fn timeline_create_handler(
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -416,7 +452,7 @@ async fn timeline_list_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
let timelines = tenant.list_timelines();
let mut response_data = Vec::with_capacity(timelines.len());
@@ -455,7 +491,7 @@ async fn timeline_detail_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline_info = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
let timeline = tenant
.get_timeline(timeline_id, false)
@@ -713,7 +749,7 @@ async fn tenant_status(
check_permission(&request, Some(tenant_id))?;
let tenant_info = async {
let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
@@ -776,7 +812,7 @@ async fn tenant_size_handler(
let headers = request.headers();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
// this can be long operation
let inputs = tenant
@@ -1035,7 +1071,7 @@ async fn get_tenant_config_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;
let response = HashMap::from([
(
@@ -1094,7 +1130,7 @@ async fn put_tenant_location_config_handler(
.await
{
match e {
TenantStateError::NotFound(_) => {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
// This API is idempotent: a NotFound on a detach is fine.
}
_ => return Err(e.into()),
@@ -1132,7 +1168,6 @@ async fn handle_tenant_break(
let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned()).await;
@@ -1437,7 +1472,7 @@ async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))

View File

@@ -1314,7 +1314,7 @@ async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false).await {
let tenant = match mgr::get_tenant(tenant_id, false) {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {

View File

@@ -254,6 +254,12 @@ pub struct Tenant {
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
}
impl std::fmt::Debug for Tenant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.tenant_id, self.current_state())
}
}
pub(crate) enum WalRedoManager {
Prod(PostgresRedoManager),
#[cfg(test)]
@@ -526,7 +532,7 @@ impl Tenant {
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -1833,6 +1839,7 @@ impl Tenant {
}
Err(SetStoppingError::AlreadyStopping(other)) => {
// give caller the option to wait for this this shutdown
info!("Tenant::shutdown: AlreadyStopping");
return Err(other);
}
};
@@ -2110,6 +2117,9 @@ where
}
impl Tenant {
pub fn get_tenant_id(&self) -> TenantId {
self.tenant_id
}
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
}
@@ -4236,11 +4246,7 @@ mod tests {
metadata_bytes[8] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
let err = harness
.try_load_local(&ctx)
.await
.err()
.expect("should fail");
let err = harness.try_load_local(&ctx).await.expect_err("should fail");
// get all the stack with all .context, not only the last one
let message = format!("{err:#}");
let expected = "failed to load metadata";

View File

@@ -21,7 +21,7 @@ use crate::{
};
use super::{
mgr::{GetTenantError, TenantsMap},
mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap},
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
span,
timeline::delete::DeleteTimelineFlow,
@@ -33,12 +33,21 @@ pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
#[error("Tenant not attached")]
NotAttached,
#[error("Invalid state {0}. Expected Active or Broken")]
InvalidState(TenantState),
#[error("Tenant deletion is already in progress")]
AlreadyInProgress,
#[error("Tenant map slot error {0}")]
SlotError(#[from] TenantSlotError),
#[error("Tenant map slot upsert error {0}")]
SlotUpsertError(#[from] TenantSlotUpsertError),
#[error("Timeline {0}")]
Timeline(#[from] DeleteTimelineError),
@@ -273,12 +282,12 @@ impl DeleteTenantFlow {
pub(crate) async fn run(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
span::debug_assert_current_span_has_tenant_id();
let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?;
let mut guard = Self::prepare(&tenant).await?;
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
tenant.set_broken(format!("{e:#}")).await;
@@ -378,7 +387,7 @@ impl DeleteTenantFlow {
guard: DeletionGuard,
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
@@ -405,15 +414,8 @@ impl DeleteTenantFlow {
}
async fn prepare(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(Arc<Tenant>, tokio::sync::OwnedMutexGuard<Self>), DeleteTenantError> {
let m = tenants.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
tenant: &Arc<Tenant>,
) -> Result<tokio::sync::OwnedMutexGuard<Self>, DeleteTenantError> {
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
// so at least for now allow deletions only for active tenants. TODO recheck
// Broken and Stopping is needed for retries.
@@ -447,14 +449,14 @@ impl DeleteTenantFlow {
)));
}
Ok((Arc::clone(tenant), guard))
Ok(guard)
}
fn schedule_background(
guard: OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) {
let tenant_id = tenant.tenant_id;
@@ -487,7 +489,7 @@ impl DeleteTenantFlow {
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: &Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
@@ -535,7 +537,7 @@ impl DeleteTenantFlow {
.await
.context("cleanup_remaining_fs_traces")?;
let mut locked = tenants.write().await;
let mut locked = tenants.write().unwrap();
if locked.remove(&tenant.tenant_id).is_none() {
warn!("Tenant got removed from tenants map during deletion");
};

File diff suppressed because it is too large Load Diff

View File

@@ -344,20 +344,7 @@ impl Timeline {
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
// imitated accesses that the winner made.
//
// It is critical we are responsive to cancellation here. Otherwise, we deadlock with
// tenant deletion (holds TENANTS in read mode) any other task that attempts to
// acquire TENANTS in write mode before we here call get_tenant.
// See https://github.com/neondatabase/neon/issues/5284.
let res = tokio::select! {
_ = cancel.cancelled() => {
return ControlFlow::Break(());
}
res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => {
res
}
};
let tenant = match res {
let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) {
Ok(t) => t,
Err(_) => {
return ControlFlow::Break(());