mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
pageserver: refactoring in TenantManager to reduce duplication (#6732)
## Problem Followup to https://github.com/neondatabase/neon/pull/6725 In that PR, code for purging local files from a tenant shard was duplicated. ## Summary of changes - Refactor detach code into TenantManager - `spawn_background_purge` method can now be common between detach and split operations
This commit is contained in:
@@ -885,14 +885,16 @@ async fn tenant_detach_handler(
|
||||
|
||||
let state = get_state(&request);
|
||||
let conf = state.conf;
|
||||
mgr::detach_tenant(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
detach_ignored.unwrap_or(false),
|
||||
&state.deletion_queue_client,
|
||||
)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
|
||||
.await?;
|
||||
state
|
||||
.tenant_manager
|
||||
.detach_tenant(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
detach_ignored.unwrap_or(false),
|
||||
&state.deletion_queue_client,
|
||||
)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -1403,7 +1405,9 @@ async fn update_tenant_config_handler(
|
||||
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
let state = get_state(&request);
|
||||
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
state
|
||||
.tenant_manager
|
||||
.set_new_tenant_config(tenant_conf, tenant_id)
|
||||
.instrument(info_span!("tenant_config", %tenant_id))
|
||||
.await?;
|
||||
|
||||
@@ -1428,13 +1432,14 @@ async fn put_tenant_location_config_handler(
|
||||
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
||||
// its local disk content and drops it from memory.
|
||||
if let LocationConfigMode::Detached = request_data.config.mode {
|
||||
if let Err(e) =
|
||||
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
|
||||
.instrument(info_span!("tenant_detach",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug()
|
||||
))
|
||||
.await
|
||||
if let Err(e) = state
|
||||
.tenant_manager
|
||||
.detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
|
||||
.instrument(info_span!("tenant_detach",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug()
|
||||
))
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
|
||||
|
||||
@@ -633,7 +633,7 @@ pub async fn init_tenant_mgr(
|
||||
/// Wrapper for Tenant::spawn that checks invariants before running, and inserts
|
||||
/// a broken tenant in the map if Tenant::spawn fails.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn tenant_spawn(
|
||||
fn tenant_spawn(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
tenant_path: &Utf8Path,
|
||||
@@ -825,40 +825,6 @@ pub(crate) enum SetNewTenantConfigError {
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn set_new_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), SetNewTenantConfigError> {
|
||||
// Legacy API: does not support sharding
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
if !tenant.tenant_shard_id().shard_count.is_unsharded() {
|
||||
// Note that we use ShardParameters::default below.
|
||||
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
|
||||
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
|
||||
)));
|
||||
}
|
||||
|
||||
// This is a legacy API that only operates on attached tenants: the preferred
|
||||
// API to use is the location_config/ endpoint, which lets the caller provide
|
||||
// the full LocationConf.
|
||||
let location_conf = LocationConf::attached_single(
|
||||
new_tenant_conf.clone(),
|
||||
tenant.generation,
|
||||
&ShardParameters::default(),
|
||||
);
|
||||
|
||||
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum UpsertLocationError {
|
||||
#[error("Bad config request: {0}")]
|
||||
@@ -1661,19 +1627,7 @@ impl TenantManager {
|
||||
let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
|
||||
.await
|
||||
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MgmtRequest,
|
||||
None,
|
||||
None,
|
||||
"tenant_files_delete",
|
||||
false,
|
||||
async move {
|
||||
fs::remove_dir_all(tmp_path.as_path())
|
||||
.await
|
||||
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
|
||||
},
|
||||
);
|
||||
self.spawn_background_purge(tmp_path);
|
||||
|
||||
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
@@ -1827,6 +1781,134 @@ impl TenantManager {
|
||||
|
||||
shutdown_all_tenants0(self.tenants).await
|
||||
}
|
||||
|
||||
/// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
|
||||
/// the background, and thereby avoid blocking any API requests on this deletion completing.
|
||||
fn spawn_background_purge(&self, tmp_path: Utf8PathBuf) {
|
||||
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
||||
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
||||
let task_tenant_id = None;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MgmtRequest,
|
||||
task_tenant_id,
|
||||
None,
|
||||
"tenant_files_delete",
|
||||
false,
|
||||
async move {
|
||||
fs::remove_dir_all(tmp_path.as_path())
|
||||
.await
|
||||
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) async fn detach_tenant(
|
||||
&self,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tmp_path = self
|
||||
.detach_tenant0(
|
||||
conf,
|
||||
&TENANTS,
|
||||
tenant_shard_id,
|
||||
detach_ignored,
|
||||
deletion_queue_client,
|
||||
)
|
||||
.await?;
|
||||
self.spawn_background_purge(tmp_path);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn detach_tenant0(
|
||||
&self,
|
||||
conf: &'static PageServerConf,
|
||||
tenants: &std::sync::RwLock<TenantsMap>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
||||
safe_rename_tenant_dir(&local_tenant_directory)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("local tenant directory {local_tenant_directory:?} rename")
|
||||
})
|
||||
};
|
||||
|
||||
let removal_result = remove_tenant_from_memory(
|
||||
tenants,
|
||||
tenant_shard_id,
|
||||
tenant_dir_rename_operation(tenant_shard_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Flush pending deletions, so that they have a good chance of passing validation
|
||||
// before this tenant is potentially re-attached elsewhere.
|
||||
deletion_queue_client.flush_advisory();
|
||||
|
||||
// Ignored tenants are not present in memory and will bail the removal from memory operation.
|
||||
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
|
||||
if detach_ignored
|
||||
&& matches!(
|
||||
removal_result,
|
||||
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
|
||||
)
|
||||
{
|
||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
|
||||
if tenant_ignore_mark.exists() {
|
||||
info!("Detaching an ignored tenant");
|
||||
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Ignored tenant {tenant_shard_id} local directory rename")
|
||||
})?;
|
||||
return Ok(tmp_path);
|
||||
}
|
||||
}
|
||||
|
||||
removal_result
|
||||
}
|
||||
|
||||
pub(crate) async fn set_new_tenant_config(
|
||||
&self,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), SetNewTenantConfigError> {
|
||||
// Legacy API: does not support sharding
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
if !tenant.tenant_shard_id().shard_count.is_unsharded() {
|
||||
// Note that we use ShardParameters::default below.
|
||||
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
|
||||
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
|
||||
)));
|
||||
}
|
||||
|
||||
// This is a legacy API that only operates on attached tenants: the preferred
|
||||
// API to use is the location_config/ endpoint, which lets the caller provide
|
||||
// the full LocationConf.
|
||||
let location_conf = LocationConf::attached_single(
|
||||
new_tenant_conf.clone(),
|
||||
tenant.generation,
|
||||
&ShardParameters::default(),
|
||||
);
|
||||
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &location_conf)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -2028,87 +2110,6 @@ pub(crate) enum TenantStateError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn detach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tmp_path = detach_tenant0(
|
||||
conf,
|
||||
&TENANTS,
|
||||
tenant_shard_id,
|
||||
detach_ignored,
|
||||
deletion_queue_client,
|
||||
)
|
||||
.await?;
|
||||
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
||||
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
||||
let task_tenant_id = None;
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MgmtRequest,
|
||||
task_tenant_id,
|
||||
None,
|
||||
"tenant_files_delete",
|
||||
false,
|
||||
async move {
|
||||
fs::remove_dir_all(tmp_path.as_path())
|
||||
.await
|
||||
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
|
||||
},
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn detach_tenant0(
|
||||
conf: &'static PageServerConf,
|
||||
tenants: &std::sync::RwLock<TenantsMap>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
||||
safe_rename_tenant_dir(&local_tenant_directory)
|
||||
.await
|
||||
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
|
||||
};
|
||||
|
||||
let removal_result = remove_tenant_from_memory(
|
||||
tenants,
|
||||
tenant_shard_id,
|
||||
tenant_dir_rename_operation(tenant_shard_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Flush pending deletions, so that they have a good chance of passing validation
|
||||
// before this tenant is potentially re-attached elsewhere.
|
||||
deletion_queue_client.flush_advisory();
|
||||
|
||||
// Ignored tenants are not present in memory and will bail the removal from memory operation.
|
||||
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
|
||||
if detach_ignored
|
||||
&& matches!(
|
||||
removal_result,
|
||||
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
|
||||
)
|
||||
{
|
||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
|
||||
if tenant_ignore_mark.exists() {
|
||||
info!("Detaching an ignored tenant");
|
||||
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Ignored tenant {tenant_shard_id} local directory rename")
|
||||
})?;
|
||||
return Ok(tmp_path);
|
||||
}
|
||||
}
|
||||
|
||||
removal_result
|
||||
}
|
||||
|
||||
pub(crate) async fn load_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
|
||||
Reference in New Issue
Block a user