mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
storage controller: eliminate ensure_attached (#8875)
## Problem This is a followup to #8783 - The old blocking ensure_attached function had been retained to handle the case where a shard had a None generation_pageserver, but this wasn't really necessary. - There was a subtle `.1` in the code where a struct would have been clearer Closes #8819 ## Summary of changes - Add ShardGenerationState to represent the results of peek_generation - Instead of calling ensure_attached when a tenant has a non-attached shard, check the shard's policy and return 409 if it isn't Attached, else return 503 if the shard's policy is attached but it hasn't been reconciled yet (i.e. has a None generation_pageserver)
This commit is contained in:
@@ -122,6 +122,13 @@ pub(crate) enum TenantFilter {
|
||||
Shard(TenantShardId),
|
||||
}
|
||||
|
||||
/// Represents the results of looking up generation+pageserver for the shards of a tenant
|
||||
pub(crate) struct ShardGenerationState {
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) generation: Option<Generation>,
|
||||
pub(crate) generation_pageserver: Option<NodeId>,
|
||||
}
|
||||
|
||||
impl Persistence {
|
||||
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
|
||||
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
|
||||
@@ -540,7 +547,7 @@ impl Persistence {
|
||||
pub(crate) async fn peek_generations(
|
||||
&self,
|
||||
filter_tenant_id: TenantId,
|
||||
) -> Result<Vec<(TenantShardId, Option<Generation>, Option<NodeId>)>, DatabaseError> {
|
||||
) -> Result<Vec<ShardGenerationState>, DatabaseError> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
let rows = self
|
||||
.with_measured_conn(DatabaseOperation::PeekGenerations, move |conn| {
|
||||
@@ -555,13 +562,12 @@ impl Persistence {
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|p| {
|
||||
(
|
||||
p.get_tenant_shard_id()
|
||||
.expect("Corrupt tenant shard id in database"),
|
||||
p.generation.map(|g| Generation::new(g as u32)),
|
||||
p.generation_pageserver.map(|n| NodeId(n as u64)),
|
||||
)
|
||||
.map(|p| ShardGenerationState {
|
||||
tenant_shard_id: p
|
||||
.get_tenant_shard_id()
|
||||
.expect("Corrupt tenant shard id in database"),
|
||||
generation: p.generation.map(|g| Generation::new(g as u32)),
|
||||
generation_pageserver: p.generation_pageserver.map(|n| NodeId(n as u64)),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::{
|
||||
peer_client::GlobalObservedState,
|
||||
persistence::{
|
||||
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
|
||||
TenantFilter,
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
@@ -3106,20 +3106,44 @@ impl Service {
|
||||
// will still be the latest when we're done: we will check generations again at the end of
|
||||
// this function to handle that.
|
||||
let generations = self.persistence.peek_generations(tenant_id).await?;
|
||||
let generations = if generations.iter().any(|i| i.1.is_none()) {
|
||||
// One or more shards is not attached to anything: maybe this is a new tenant? Wait for
|
||||
// it to reconcile.
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
self.persistence.peek_generations(tenant_id).await?
|
||||
} else {
|
||||
generations
|
||||
};
|
||||
|
||||
if generations
|
||||
.iter()
|
||||
.any(|i| i.generation.is_none() || i.generation_pageserver.is_none())
|
||||
{
|
||||
// One or more shards has not been attached to a pageserver. Check if this is because it's configured
|
||||
// to be detached (409: caller should give up), or because it's meant to be attached but isn't yet (503: caller should retry)
|
||||
let locked = self.inner.read().unwrap();
|
||||
for (shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
match shard.policy {
|
||||
PlacementPolicy::Attached(_) => {
|
||||
// This shard is meant to be attached: the caller is not wrong to try and
|
||||
// use this function, but we can't service the request right now.
|
||||
}
|
||||
PlacementPolicy::Secondary | PlacementPolicy::Detached => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Shard {shard_id} tenant has policy {:?}",
|
||||
shard.policy
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
"One or more shards in tenant is not yet attached".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let locked = self.inner.read().unwrap();
|
||||
for (tenant_shard_id, generation, generation_pageserver) in generations {
|
||||
let node_id = generation_pageserver.ok_or(ApiError::Conflict(
|
||||
"Tenant not currently attached".to_string(),
|
||||
))?;
|
||||
for ShardGenerationState {
|
||||
tenant_shard_id,
|
||||
generation,
|
||||
generation_pageserver,
|
||||
} in generations
|
||||
{
|
||||
let node_id = generation_pageserver.expect("We checked for None above");
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
@@ -3141,7 +3165,13 @@ impl Service {
|
||||
let latest_generations = self.persistence.peek_generations(tenant_id).await?;
|
||||
if latest_generations
|
||||
.into_iter()
|
||||
.map(|g| (g.0, g.1))
|
||||
.map(
|
||||
|ShardGenerationState {
|
||||
tenant_shard_id,
|
||||
generation,
|
||||
generation_pageserver: _,
|
||||
}| (tenant_shard_id, generation),
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
!= target_gens
|
||||
.into_iter()
|
||||
@@ -5280,72 +5310,6 @@ impl Service {
|
||||
))
|
||||
}
|
||||
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
fn ensure_attached_schedule(
|
||||
&self,
|
||||
mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
|
||||
let mut waiters = Vec::new();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for (tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
shard.schedule(scheduler, &mut schedule_context)?;
|
||||
|
||||
// The shard's policies may not result in an attached location being scheduled: this
|
||||
// is an error because our caller needs it attached somewhere.
|
||||
if shard.intent.get_attached().is_none() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Tenant {tenant_id} not scheduled to be attached"
|
||||
));
|
||||
};
|
||||
|
||||
if shard.stably_attached().is_some() {
|
||||
// We do not require the shard to be totally up to date on reconciliation: we just require
|
||||
// that it has been attached on the intended node. Other dirty state such as unattached secondary
|
||||
// locations, or compute hook notifications can be ignored.
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
|
||||
tracing::info!("Waiting for shard {tenant_shard_id} to reconcile, in order to ensure it is attached");
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
Ok(waiters)
|
||||
}
|
||||
|
||||
async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
|
||||
let ensure_waiters = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
|
||||
// Check if the tenant is splitting: in this case, even if it is attached,
|
||||
// we must act as if it is not: this blocks e.g. timeline creation/deletion
|
||||
// operations during the split.
|
||||
for (_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
|
||||
if !matches!(shard.splitting, SplitState::Idle) {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
"Tenant shards are currently splitting".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
self.ensure_attached_schedule(locked, tenant_id)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
};
|
||||
|
||||
let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
|
||||
for waiter in ensure_waiters {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
waiter.wait_timeout(timeout).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Like [`Self::maybe_configured_reconcile_shard`], but uses the default reconciler
|
||||
/// configuration
|
||||
fn maybe_reconcile_shard(
|
||||
|
||||
Reference in New Issue
Block a user