pageserver: start refactoring into TenantManager

This commit is contained in:
John Spray
2023-10-11 13:18:04 +01:00
parent ead1931167
commit d422105d88
3 changed files with 46 additions and 40 deletions

View File

@@ -408,7 +408,7 @@ fn start_pageserver(
// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
@@ -418,6 +418,7 @@ fn start_pageserver(
order,
shutdown_pageserver.clone(),
))?;
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx;
@@ -546,6 +547,7 @@ fn start_pageserver(
let router_state = Arc::new(
http::routes::State::new(
conf,
tenant_manager,
http_auth.clone(),
remote_storage.clone(),
broker_client.clone(),

View File

@@ -36,8 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
@@ -64,6 +64,7 @@ use super::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
@@ -75,6 +76,7 @@ pub struct State {
impl State {
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
@@ -87,6 +89,7 @@ impl State {
.collect::<Vec<_>>();
Ok(Self {
conf,
tenant_manager,
auth,
allowlist_routes,
remote_storage,
@@ -1142,20 +1145,14 @@ async fn put_tenant_location_config_handler(
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
state
.tenant_manager
.upsert_location(tenant_id, location_conf, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}

View File

@@ -184,6 +184,12 @@ async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> {
Ok(())
}
pub struct TenantManager {
conf: &'static PageServerConf,
tenants: &'static std::sync::RwLock<TenantsMap>,
resources: TenantSharedResources,
}
fn emergency_generations(
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
) -> HashMap<TenantId, Generation> {
@@ -350,7 +356,7 @@ pub async fn init_tenant_mgr(
resources: TenantSharedResources,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<()> {
) -> anyhow::Result<TenantManager> {
let mut tenants = HashMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
@@ -450,7 +456,12 @@ pub async fn init_tenant_mgr(
let mut tenants_map = TENANTS.write().unwrap();
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
*tenants_map = TenantsMap::Open(tenants);
Ok(())
Ok(TenantManager {
conf: conf,
tenants: &TENANTS,
resources,
})
}
#[allow(clippy::too_many_arguments)]
@@ -750,14 +761,13 @@ pub(crate) async fn set_new_tenant_config(
Ok(())
}
impl TenantManager {
#[instrument(skip_all, fields(%tenant_id))]
pub(crate) async fn upsert_location(
conf: &'static PageServerConf,
&self,
tenant_id: TenantId,
new_location_config: LocationConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
@@ -766,7 +776,7 @@ pub(crate) async fn upsert_location(
// then we do not need to set the slot to InProgress, we can just call into the
// existng tenant.
{
let locked = TENANTS.read().unwrap();
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id)?;
match (&new_location_config.mode, peek_slot) {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
@@ -778,9 +788,9 @@ pub(crate) async fn upsert_location(
new_location_config,
)?);
// Persist the new config in the background, to avoid holding up any
// locks while we do so.
// TODO
// Persist the new config in the background, to avoid holding up any
// locks while we do so.
// TODO
return Ok(());
} else {
@@ -811,7 +821,7 @@ pub(crate) async fn upsert_location(
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
self.resources.deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
@@ -830,21 +840,21 @@ pub(crate) async fn upsert_location(
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => {
let tenant_path = conf.tenant_path(&tenant_id);
let tenant_path = self.conf.tenant_path(&tenant_id);
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Attached(_attach_config) => {
let timelines_path = conf.timelines_path(&tenant_id);
let timelines_path = self.conf.timelines_path(&tenant_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
@@ -853,20 +863,16 @@ pub(crate) async fn upsert_location(
.await
.with_context(|| format!("Creating {timelines_path}"))?;
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant = match Tenant::spawn_attach(
conf,
self.conf,
tenant_id,
TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
},
self.resources.clone(),
AttachedTenantConf::try_from(new_location_config)?,
&TENANTS,
self.tenants,
// The LocationConf API does not use marker files, because we have Secondary
// locations where the directory's existence is not a signal that it contains
// all timelines. See https://github.com/neondatabase/neon/issues/5550
@@ -876,7 +882,7 @@ pub(crate) async fn upsert_location(
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
Tenant::create_broken_tenant(self.conf, tenant_id, format!("{e:#}"))
}
};
@@ -887,6 +893,7 @@ pub(crate) async fn upsert_location(
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum GetTenantError {