storage controller: quality of life improvements for AZ handling (#10379)

## Problem

Since https://github.com/neondatabase/neon/pull/9916, the preferred AZ
of a tenant is much more impactful, and we would like to make it more
visible in tooling.

## Summary of changes

- Include AZ in node describe API
- Include AZ info in node & tenant outputs in CLI
- Add metrics for per-node shard counts, labelled by AZ
- Add a CLI for setting preferred AZ on a tenant
- Extend AZ-setting API+CLI to handle None for clearing preferred AZ
This commit is contained in:
John Spray
2025-01-14 15:30:43 +00:00
committed by GitHub
parent 2466a2f977
commit aa7323a384
8 changed files with 175 additions and 18 deletions

View File

@@ -1,12 +1,16 @@
use futures::StreamExt;
use std::{str::FromStr, time::Duration};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
time::Duration,
};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
SafekeeperDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -153,6 +157,12 @@ enum Command {
#[arg(long)]
tenant_id: TenantId,
},
TenantSetPreferredAz {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
preferred_az: Option<String>,
},
/// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
/// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
TenantDrop {
@@ -402,11 +412,12 @@ async fn main() -> anyhow::Result<()> {
resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
let mut table = comfy_table::Table::new();
table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
for node in resp {
table.add_row([
format!("{}", node.id),
node.listen_http_addr,
node.availability_zone_id,
format!("{:?}", node.scheduling),
format!("{:?}", node.availability),
]);
@@ -479,6 +490,7 @@ async fn main() -> anyhow::Result<()> {
let mut table = comfy_table::Table::new();
table.set_header([
"TenantId",
"Preferred AZ",
"ShardCount",
"StripeSize",
"Placement",
@@ -488,6 +500,11 @@ async fn main() -> anyhow::Result<()> {
let shard_zero = tenant.shards.into_iter().next().unwrap();
table.add_row([
format!("{}", tenant.tenant_id),
shard_zero
.preferred_az_id
.as_ref()
.cloned()
.unwrap_or("".to_string()),
format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
format!("{:?}", tenant.stripe_size),
format!("{:?}", tenant.policy),
@@ -614,6 +631,19 @@ async fn main() -> anyhow::Result<()> {
None,
)
.await?;
let nodes = storcon_client
.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"control/v1/node".to_string(),
None,
)
.await?;
let nodes = nodes
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<_, _>>();
println!("Tenant {tenant_id}");
let mut table = comfy_table::Table::new();
table.add_row(["Policy", &format!("{:?}", policy)]);
@@ -622,7 +652,14 @@ async fn main() -> anyhow::Result<()> {
println!("{table}");
println!("Shards:");
let mut table = comfy_table::Table::new();
table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
table.set_header([
"Shard",
"Attached",
"Attached AZ",
"Secondary",
"Last error",
"status",
]);
for shard in shards {
let secondary = shard
.node_secondary
@@ -645,11 +682,18 @@ async fn main() -> anyhow::Result<()> {
}
let status = status_parts.join(",");
let attached_node = shard
.node_attached
.as_ref()
.map(|id| nodes.get(id).expect("Shard references nonexistent node"));
table.add_row([
format!("{}", shard.tenant_shard_id),
shard
.node_attached
.map(|n| format!("{}", n))
attached_node
.map(|n| format!("{} ({})", n.listen_http_addr, n.id))
.unwrap_or(String::new()),
attached_node
.map(|n| n.availability_zone_id.clone())
.unwrap_or(String::new()),
secondary,
shard.last_error,
@@ -658,6 +702,66 @@ async fn main() -> anyhow::Result<()> {
}
println!("{table}");
}
Command::TenantSetPreferredAz {
tenant_id,
preferred_az,
} => {
// First learn about the tenant's shards
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await?;
// Learn about nodes to validate the AZ ID
let nodes = storcon_client
.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"control/v1/node".to_string(),
None,
)
.await?;
if let Some(preferred_az) = &preferred_az {
let azs = nodes
.into_iter()
.map(|n| (n.availability_zone_id))
.collect::<HashSet<_>>();
if !azs.contains(preferred_az) {
anyhow::bail!(
"AZ {} not found on any node: known AZs are: {:?}",
preferred_az,
azs
);
}
} else {
// Make it obvious to the user that since they've omitted an AZ, we're clearing it
eprintln!("Clearing preferred AZ for tenant {}", tenant_id);
}
// Construct a request that modifies all the tenant's shards
let req = ShardsPreferredAzsRequest {
preferred_az_ids: describe_response
.shards
.into_iter()
.map(|s| {
(
s.tenant_shard_id,
preferred_az.clone().map(AvailabilityZone),
)
})
.collect(),
};
storcon_client
.dispatch::<ShardsPreferredAzsRequest, ()>(
Method::PUT,
"control/v1/preferred_azs".to_string(),
Some(req),
)
.await?;
}
Command::TenantWarmup { tenant_id } => {
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(

View File

@@ -87,7 +87,7 @@ impl Display for AvailabilityZone {
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, AvailabilityZone>,
pub preferred_az_ids: HashMap<TenantShardId, Option<AvailabilityZone>>,
}
#[derive(Serialize, Deserialize)]
@@ -144,6 +144,8 @@ pub struct NodeDescribeResponse {
pub availability: NodeAvailabilityWrapper,
pub scheduling: NodeSchedulingPolicy,
pub availability_zone_id: String,
pub listen_http_addr: String,
pub listen_http_port: u16,

View File

@@ -53,6 +53,16 @@ pub(crate) struct StorageControllerMetricGroup {
/// How many shards are not scheduled into their preferred AZ
pub(crate) storage_controller_schedule_az_violation: measured::Gauge,
/// How many shard locations (secondary or attached) on each node
pub(crate) storage_controller_node_shards: measured::GaugeVec<NodeLabelGroupSet>,
/// How many _attached_ shard locations on each node
pub(crate) storage_controller_node_attached_shards: measured::GaugeVec<NodeLabelGroupSet>,
/// How many _home_ shard locations on each node (i.e. the node's AZ matches the shard's
/// preferred AZ)
pub(crate) storage_controller_node_home_shards: measured::GaugeVec<NodeLabelGroupSet>,
/// How many shards would like to reconcile but were blocked by concurrency limits
pub(crate) storage_controller_pending_reconciles: measured::Gauge,
@@ -132,6 +142,15 @@ impl Default for StorageControllerMetrics {
}
}
#[derive(measured::LabelGroup, Clone)]
#[label(set = NodeLabelGroupSet)]
pub(crate) struct NodeLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) az: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) node_id: &'a str,
}
#[derive(measured::LabelGroup)]
#[label(set = ReconcileCompleteLabelGroupSet)]
pub(crate) struct ReconcileCompleteLabelGroup {

View File

@@ -299,6 +299,7 @@ impl Node {
id: self.id,
availability: self.availability.clone().into(),
scheduling: self.scheduling,
availability_zone_id: self.availability_zone_id.0.clone(),
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port,
listen_pg_addr: self.listen_pg_addr.clone(),

View File

@@ -708,10 +708,11 @@ impl Persistence {
Ok(())
}
/// Note that passing None for a shard clears the preferred AZ (rather than leaving it unmodified)
pub(crate) async fn set_tenant_shard_preferred_azs(
&self,
preferred_azs: Vec<(TenantShardId, AvailabilityZone)>,
) -> DatabaseResult<Vec<(TenantShardId, AvailabilityZone)>> {
preferred_azs: Vec<(TenantShardId, Option<AvailabilityZone>)>,
) -> DatabaseResult<Vec<(TenantShardId, Option<AvailabilityZone>)>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
@@ -722,7 +723,7 @@ impl Persistence {
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set(preferred_az_id.eq(preferred_az.0.clone()))
.set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone())))
.execute(conn)?;
if updated == 1 {

View File

@@ -1,4 +1,4 @@
use crate::{node::Node, tenant_shard::TenantShard};
use crate::{metrics::NodeLabelGroup, node::Node, tenant_shard::TenantShard};
use itertools::Itertools;
use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
use serde::Serialize;
@@ -872,6 +872,33 @@ impl Scheduler {
pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
self.nodes.get(&node_id).unwrap().attached_shard_count
}
/// Some metrics that we only calculate periodically: this is simpler than
/// rigorously updating them on every change.
pub(crate) fn update_metrics(&self) {
for (node_id, node) in &self.nodes {
let node_id_str = format!("{}", node_id);
let label_group = NodeLabelGroup {
az: &node.az.0,
node_id: &node_id_str,
};
crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_node_shards
.set(label_group.clone(), node.shard_count as i64);
crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_node_attached_shards
.set(label_group.clone(), node.attached_shard_count as i64);
crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_node_home_shards
.set(label_group.clone(), node.home_shard_count as i64);
}
}
}
#[cfg(test)]

View File

@@ -2517,7 +2517,7 @@ impl Service {
.map(|t| {
(
t.get_tenant_shard_id().expect("Corrupt shard in database"),
load_in_az.clone(),
Some(load_in_az.clone()),
)
})
.collect(),
@@ -6390,7 +6390,7 @@ impl Service {
/// available. A return value of 0 indicates that everything is fully reconciled already.
fn reconcile_all(&self) -> usize {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let (nodes, tenants, scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
// This function is an efficient place to update lazy statistics, since we are walking
@@ -6451,6 +6451,9 @@ impl Service {
}
}
// Some metrics are calculated from SchedulerNode state, update these periodically
scheduler.update_metrics();
// Process any deferred tenant drops
for (tenant_id, guard) in drop_detached_tenants {
self.maybe_drop_tenant(tenant_id, &mut locked, &guard);
@@ -6509,7 +6512,7 @@ impl Service {
// Shard was dropped between planning and execution;
continue;
};
tracing::info!("Applying optimization: {optimization:?}");
tracing::info!(tenant_shard_id=%tenant_shard_id, "Applying optimization: {optimization:?}");
if shard.apply_optimization(scheduler, optimization) {
optimizations_applied += 1;
if self.maybe_reconcile_shard(shard, nodes).is_some() {

View File

@@ -1708,8 +1708,8 @@ impl TenantShard {
self.intent.preferred_az_id.as_ref()
}
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
self.intent.preferred_az_id = Some(preferred_az_id);
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: Option<AvailabilityZone>) {
self.intent.preferred_az_id = preferred_az_id;
}
/// Returns all the nodes to which this tenant shard is attached according to the