control_plane: basic balancer API

This commit is contained in:
John Spray
2024-03-04 15:01:22 +00:00
parent 59523444cc
commit 27815678e7
2 changed files with 172 additions and 0 deletions

View File

@@ -438,6 +438,24 @@ async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiEr
state.service.tenants_dump()
}
async fn handle_balance_all(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
service.balance_all()?;
json_response(StatusCode::OK, ())
}
async fn handle_balance_attached(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
service.balance_attached()?;
json_response(StatusCode::OK, ())
}
async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -572,6 +590,12 @@ pub fn make_router(
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
tenant_service_handler(r, handle_tenant_shard_split)
})
.post("/control/v1/balance/all", |r| {
tenant_service_handler(r, handle_balance_all)
})
.post("/control/v1/balance/attached", |r| {
tenant_service_handler(r, handle_balance_attached)
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.

View File

@@ -2931,6 +2931,154 @@ impl Service {
Ok(())
}
/// Switch shards between primary and secondary locations if it improves balance
/// of attachments across nodes. This is useful after e.g. a node goes offline
/// and returns, since its attachments will have moved away.
pub(crate) fn balance_attached(&self) -> Result<(), ApiError> {
let mut attachment_counts: HashMap<NodeId, usize> = HashMap::new();
{
let locked = self.inner.read().unwrap();
for shard in locked.tenants.values() {
if let Some(node_id) = shard.intent.get_attached() {
let entry = attachment_counts.entry(*node_id).or_insert(0);
*entry += 1;
}
}
// Exclude un-schedulable nodes from our set of those we consider overloaded: we
// will aim to balance the expect average shards/node among those elegible for scheduling.
for (node_id, node) in locked.nodes.iter() {
if !node.may_schedule() {
attachment_counts.remove(node_id);
}
}
}
let avg: usize = attachment_counts.values().sum::<usize>() / attachment_counts.len();
// For any shards that are attached to a node which is more over-loaded than
// their secondary location, switch to the secondary location
{
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
for shard in tenants.values_mut() {
if let (Some(primary), Some(secondary)) = (
*shard.intent.get_attached(),
shard.intent.get_secondary().iter().next().copied(),
) {
if let (Some(primary_count), Some(secondary_count)) = (
attachment_counts.get(&primary),
attachment_counts.get(&secondary),
) {
if *primary_count > avg + 1 && primary_count > secondary_count {
shard.intent.promote_attached(scheduler, secondary);
shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
*attachment_counts.get_mut(&primary).unwrap() -= 1;
*attachment_counts.get_mut(&secondary).unwrap() -= 1;
}
}
}
}
}
Ok(())
}
pub(crate) fn balance_all(&self) -> Result<(), ApiError> {
// Our to-do list is a map of nodes to how many excess locations they have. We will
// improve balance by de-scheduling secondary locations from nodes that have too many
// locations. When we latter call schedule() on these shards, they will implicitly
// be assigned to locations that are under-loaded.
let mut overloads: HashMap<NodeId, usize> = HashMap::new();
{
let locked = self.inner.read().unwrap();
for shard in locked.tenants.values() {
for node_id in shard.intent.all_pageservers() {
let entry = overloads.entry(node_id).or_insert(0);
*entry += 1;
}
}
// Exclude un-schedulable nodes from our set of those we consider overloaded: we
// will aim to balance the expect average shards/node among those elegible for scheduling.
for (node_id, node) in locked.nodes.iter() {
if !node.may_schedule() {
overloads.remove(node_id);
}
}
}
let avg: usize = overloads.values().sum::<usize>() / overloads.len();
for count in overloads.values_mut() {
*count = count.saturating_sub(avg - 1)
}
// De-schedule from overloaded nodes
{
let mut locked = self.inner.write().unwrap();
let (_nodes, tenants, scheduler) = locked.parts_mut();
for shard in tenants.values_mut() {
// We will only consider one secondary location per shard, or the attached location
// if it has no secondaries.
if let Some(candidate) = shard.intent.get_secondary().iter().next() {
if let Some(node_count) = overloads.get_mut(candidate) {
if *node_count > 0 {
*node_count -= 1;
shard.intent.remove_secondary(scheduler, *candidate);
}
}
} else if let Some(candidate) = shard.intent.get_attached() {
if let Some(node_count) = overloads.get_mut(candidate) {
if *node_count > 0 {
*node_count -= 1;
shard.intent.set_attached(scheduler, None);
}
}
}
}
}
// Call schedule on all shards to allocate new locations on under-loaded nodes
{
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (shard_id, shard) in tenants {
if let Err(e) = shard.schedule(scheduler) {
tracing::warn!("Could not schedule {shard_id}: {e}");
// If we couldn't schedule one shard, we probably won't be able to schedule
// the rest: log warning once and drop out.
break;
}
shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
}
}
Ok(())
}
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.