storcon: track preferred AZ for each tenant shard (#8937)

## Problem
We want to do AZ aware scheduling, but don't have enough metadata.

## Summary of changes
Introduce a `preferred_az_id` concept for each managed tenant shard.

In a future PR, the scheduler will use this as a soft preference. 
The idea is to try and keep the shard attachments within the same AZ.
Under the assumption that the compute was placed in the correct AZ,
this reduces the chances of cross AZ trafic from between compute and PS.

In terms of code changes we:
1. Add a new nullable `preferred_az_id` column to the `tenant_shards`
table. Also include an in-memory counterpart.
2. Populate the preferred az on tenant creation and shard splits.
3. Add an endpoint which allows to bulk-set preferred AZs.

(3) gives us the migration path. I'll write a script which queries the
cplane db in the region and sets the preferred az of all shards with an 
active compute to the AZ of said compute. For shards without an active compute, 
I'll use the AZ of the currently attached pageserver
since this is what cplane uses now to schedule computes.
This commit is contained in:
Vlad Lazar
2024-09-06 13:11:17 +01:00
committed by GitHub
parent a1323231bc
commit e86fef05dd
10 changed files with 384 additions and 95 deletions

View File

@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -74,6 +74,17 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, String>,
}
#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsResponse {
pub updated: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId,
@@ -132,6 +143,8 @@ pub struct TenantDescribeResponseShard {
pub is_splitting: bool,
pub scheduling_policy: ShardSchedulingPolicy,
pub preferred_az_id: Option<String>,
}
/// Explicitly migrating a particular shard is a low level operation

View File

@@ -0,0 +1 @@
ALTER TABLE tenant_shards DROP preferred_az_id;

View File

@@ -0,0 +1 @@
ALTER TABLE tenant_shards ADD preferred_az_id VARCHAR;

View File

@@ -14,7 +14,7 @@ use metrics::{BuildInfo, NeonMetrics};
use pageserver_api::controller_api::{
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
TenantCreateRequest,
ShardsPreferredAzsRequest, TenantCreateRequest,
};
use pageserver_api::models::{
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
@@ -688,6 +688,18 @@ async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<
)
}
async fn handle_update_preferred_azs(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state.service.update_shards_preferred_azs(azs_req).await?,
)
}
async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -1174,6 +1186,13 @@ pub fn make_router(
RequestName("control_v1_tenant_policy"),
)
})
.put("/control/v1/preferred_azs", |r| {
named_request_span(
r,
handle_update_preferred_azs,
RequestName("control_v1_preferred_azs"),
)
})
.put("/control/v1/step_down", |r| {
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
})

View File

@@ -105,6 +105,7 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealthOutdated,
GetLeader,
UpdateLeader,
SetPreferredAzs,
}
#[must_use]
@@ -664,6 +665,33 @@ impl Persistence {
Ok(())
}
pub(crate) async fn set_tenant_shard_preferred_azs(
&self,
preferred_azs: Vec<(TenantShardId, String)>,
) -> DatabaseResult<Vec<(TenantShardId, String)>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
let mut shards_updated = Vec::default();
for (tenant_shard_id, preferred_az) in preferred_azs.iter() {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set(preferred_az_id.eq(preferred_az))
.execute(conn)?;
if updated == 1 {
shards_updated.push((*tenant_shard_id, preferred_az.clone()));
}
}
Ok(shards_updated)
})
.await
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
@@ -1050,6 +1078,11 @@ pub(crate) struct TenantShardPersistence {
pub(crate) config: String,
#[serde(default)]
pub(crate) scheduling_policy: String,
// Hint that we should attempt to schedule this tenant shard the given
// availability zone in order to minimise the chances of cross-AZ communication
// with compute.
pub(crate) preferred_az_id: Option<String>,
}
impl TenantShardPersistence {

View File

@@ -41,6 +41,7 @@ diesel::table! {
splitting -> Int2,
config -> Text,
scheduling_policy -> Varchar,
preferred_az_id -> Nullable<Varchar>,
}
}

View File

@@ -25,7 +25,7 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
ScheduleOptimizationAction,
@@ -41,10 +41,11 @@ use itertools::Itertools;
use pageserver_api::{
controller_api::{
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
ShardsPreferredAzsResponse, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
SecondaryProgress, TenantConfigRequest, TimelineArchivalConfigRequest,
@@ -353,6 +354,12 @@ impl From<DatabaseError> for ApiError {
}
}
enum InitialShardScheduleOutcome {
Scheduled(TenantCreateResponseShard),
NotScheduled,
ShardScheduleError(ScheduleError),
}
pub struct Service {
inner: Arc<std::sync::RwLock<ServiceState>>,
config: Config,
@@ -1452,6 +1459,7 @@ impl Service {
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
};
match self.persistence.insert_tenant_shards(vec![tsp]).await {
@@ -2023,6 +2031,7 @@ impl Service {
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
})
.collect();
@@ -2046,81 +2055,67 @@ impl Service {
};
let mut schedule_context = ScheduleContext::default();
let (waiters, response_shards) = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut schedule_error = None;
let mut response_shards = Vec::new();
let mut schcedule_error = None;
for tenant_shard_id in create_ids {
tracing::info!("Creating shard {tenant_shard_id}...");
use std::collections::btree_map::Entry;
match tenants.entry(tenant_shard_id) {
Entry::Occupied(mut entry) => {
tracing::info!(
"Tenant shard {tenant_shard_id} already exists while creating"
);
let outcome = self
.do_initial_shard_scheduling(
tenant_shard_id,
initial_generation,
&create_req.shard_parameters,
create_req.config.clone(),
placement_policy.clone(),
&mut schedule_context,
)
.await;
// TODO: schedule() should take an anti-affinity expression that pushes
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.
match outcome {
InitialShardScheduleOutcome::Scheduled(resp) => response_shards.push(resp),
InitialShardScheduleOutcome::NotScheduled => {}
InitialShardScheduleOutcome::ShardScheduleError(err) => {
schedule_error = Some(err);
}
}
}
entry
.get_mut()
.schedule(scheduler, &mut schedule_context)
.map_err(|e| {
ApiError::Conflict(format!(
"Failed to schedule shard {tenant_shard_id}: {e}"
let preferred_azs = {
let locked = self.inner.read().unwrap();
response_shards
.iter()
.filter_map(|resp| {
let az_id = locked
.nodes
.get(&resp.node_id)
.map(|n| n.get_availability_zone_id().to_string())?;
Some((resp.shard_id, az_id))
})
.collect::<Vec<_>>()
};
// Note that we persist the preferred AZ for the new shards separately.
// In theory, we could "peek" the scheduler to determine where the shard will
// land, but the subsequent "real" call into the scheduler might select a different
// node. Hence, we do this awkward update to keep things consistent.
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
})?;
if let Some(node_id) = entry.get().intent.get_attached() {
let generation = entry
.get()
.generation
.expect("Generation is set when in attached mode");
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
});
}
continue;
}
Entry::Vacant(entry) => {
let state = entry.insert(TenantShard::new(
tenant_shard_id,
ShardIdentity::from_params(
tenant_shard_id.shard_number,
&create_req.shard_parameters,
),
placement_policy.clone(),
));
state.generation = initial_generation;
state.config = create_req.config.clone();
if let Err(e) = state.schedule(scheduler, &mut schedule_context) {
schcedule_error = Some(e);
}
// Only include shards in result if we are attaching: the purpose
// of the response is to tell the caller where the shards are attached.
if let Some(node_id) = state.intent.get_attached() {
let generation = state
.generation
.expect("Generation is set when in attached mode");
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
});
{
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
};
}
// If we failed to schedule shards, then they are still created in the controller,
@@ -2128,17 +2123,19 @@ impl Service {
// tries to e.g. create a tenant whose placement policy requires more nodes than
// are present in the system. We do this here rather than in the above loop, to
// avoid situations where we only create a subset of shards in the tenant.
if let Some(e) = schcedule_error {
if let Some(e) = schedule_error {
return Err(ApiError::Conflict(format!(
"Failed to schedule shard(s): {e}"
)));
}
let waiters = tenants
let waiters = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes))
.collect::<Vec<_>>();
(waiters, response_shards)
.collect::<Vec<_>>()
};
Ok((
@@ -2149,6 +2146,78 @@ impl Service {
))
}
/// Helper for tenant creation that does the scheduling for an individual shard. Covers both the
/// case of a new tenant and a pre-existing one.
async fn do_initial_shard_scheduling(
&self,
tenant_shard_id: TenantShardId,
initial_generation: Option<Generation>,
shard_params: &ShardParameters,
config: TenantConfig,
placement_policy: PlacementPolicy,
schedule_context: &mut ScheduleContext,
) -> InitialShardScheduleOutcome {
let mut locked = self.inner.write().unwrap();
let (_nodes, tenants, scheduler) = locked.parts_mut();
use std::collections::btree_map::Entry;
match tenants.entry(tenant_shard_id) {
Entry::Occupied(mut entry) => {
tracing::info!("Tenant shard {tenant_shard_id} already exists while creating");
// TODO: schedule() should take an anti-affinity expression that pushes
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.
if let Err(err) = entry.get_mut().schedule(scheduler, schedule_context) {
return InitialShardScheduleOutcome::ShardScheduleError(err);
}
if let Some(node_id) = entry.get().intent.get_attached() {
let generation = entry
.get()
.generation
.expect("Generation is set when in attached mode");
InitialShardScheduleOutcome::Scheduled(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
})
} else {
InitialShardScheduleOutcome::NotScheduled
}
}
Entry::Vacant(entry) => {
let state = entry.insert(TenantShard::new(
tenant_shard_id,
ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params),
placement_policy,
));
state.generation = initial_generation;
state.config = config;
if let Err(e) = state.schedule(scheduler, schedule_context) {
return InitialShardScheduleOutcome::ShardScheduleError(e);
}
// Only include shards in result if we are attaching: the purpose
// of the response is to tell the caller where the shards are attached.
if let Some(node_id) = state.intent.get_attached() {
let generation = state
.generation
.expect("Generation is set when in attached mode");
InitialShardScheduleOutcome::Scheduled(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
})
} else {
InitialShardScheduleOutcome::NotScheduled
}
}
}
}
/// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
/// wait for reconciliation to complete before responding.
async fn await_waiters(
@@ -3511,6 +3580,7 @@ impl Service {
is_pending_compute_notification: shard.pending_compute_notification,
is_splitting: matches!(shard.splitting, SplitState::Splitting),
scheduling_policy: *shard.get_scheduling_policy(),
preferred_az_id: shard.preferred_az().map(ToString::to_string),
})
}
@@ -4214,9 +4284,10 @@ impl Service {
config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::Splitting,
// Scheduling policies do not carry through to children
// Scheduling policies and preferred AZ do not carry through to children
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
});
}
@@ -4336,6 +4407,47 @@ impl Service {
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
// Now that we have scheduled the child shards, attempt to set their preferred AZ
// to that of the pageserver they've been attached on.
let preferred_azs = {
let locked = self.inner.read().unwrap();
child_locations
.iter()
.filter_map(|(tid, node_id, _stripe_size)| {
let az_id = locked
.nodes
.get(node_id)
.map(|n| n.get_availability_zone_id().to_string())?;
Some((*tid, az_id))
})
.collect::<Vec<_>>()
};
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
});
match updated {
Ok(updated) => {
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}
Err(err) => {
tracing::warn!("Failed to persist preferred AZs after split: {err}");
}
}
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps, stripe_size) in child_locations {
@@ -6497,4 +6609,35 @@ impl Service {
) -> Result<(), DatabaseError> {
self.persistence.safekeeper_upsert(record).await
}
pub(crate) async fn update_shards_preferred_azs(
&self,
req: ShardsPreferredAzsRequest,
) -> Result<ShardsPreferredAzsResponse, ApiError> {
let preferred_azs = req.preferred_az_ids.into_iter().collect::<Vec<_>>();
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred AZs: {err}"
))
})?;
let mut updated_in_mem_and_db = Vec::default();
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
let shard = locked.tenants.get_mut(&tid);
if let Some(shard) = shard {
shard.set_preferred_az(az_id);
updated_in_mem_and_db.push(tid);
}
}
Ok(ShardsPreferredAzsResponse {
updated: updated_in_mem_and_db,
})
}
}

View File

@@ -140,6 +140,10 @@ pub(crate) struct TenantShard {
// Support/debug tool: if something is going wrong or flapping with scheduling, this may
// be set to a non-active state to avoid making changes while the issue is fixed.
scheduling_policy: ShardSchedulingPolicy,
// We should attempt to schedule this shard in the provided AZ to
// decrease chances of cross-AZ compute.
preferred_az_id: Option<String>,
}
#[derive(Default, Clone, Debug, Serialize)]
@@ -463,6 +467,7 @@ impl TenantShard {
last_error: Arc::default(),
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_az_id: None,
}
}
@@ -1297,6 +1302,7 @@ impl TenantShard {
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
preferred_az_id: tsp.preferred_az_id,
})
}
@@ -1312,8 +1318,17 @@ impl TenantShard {
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
preferred_az_id: self.preferred_az_id.clone(),
}
}
pub(crate) fn preferred_az(&self) -> Option<&str> {
self.preferred_az_id.as_deref()
}
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
self.preferred_az_id = Some(preferred_az_id);
}
}
#[cfg(test)]

View File

@@ -2560,7 +2560,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_describe(self, tenant_id: TenantId):
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int}
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int, preferred_az_id: str}
"""
response = self.request(
"GET",
@@ -2886,6 +2886,17 @@ class NeonStorageController(MetricsGetter, LogUtils):
return None
raise e
def set_preferred_azs(self, preferred_azs: dict[TenantShardId, str]) -> list[TenantShardId]:
response = self.request(
"PUT",
f"{self.api}/control/v1/preferred_azs",
headers=self.headers(TokenScope.ADMIN),
json={str(tid): az for tid, az in preferred_azs.items()},
)
response.raise_for_status()
return [TenantShardId.parse(tid) for tid in response.json()["updated"]]
def __enter__(self) -> "NeonStorageController":
return self

View File

@@ -2512,3 +2512,55 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
del d[key]
return compared[0] == compared[1]
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
def assign_az(ps_cfg):
az = f"az-{ps_cfg['id']}"
ps_cfg["availability_zone"] = az
neon_env_builder.pageserver_config_override = assign_az
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tids = [TenantId.generate() for _ in range(0, 3)]
for tid in tids:
env.storage_controller.tenant_create(tid)
shards = env.storage_controller.tenant_describe(tid)["shards"]
assert len(shards) == 1
attached_to = shards[0]["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
assert shards[0]["preferred_az_id"] == expected_az
updated = env.storage_controller.set_preferred_azs(
{TenantShardId(tid, 0, 0): "foo" for tid in tids}
)
assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids])
for tid in tids:
shards = env.storage_controller.tenant_describe(tid)["shards"]
assert len(shards) == 1
assert shards[0]["preferred_az_id"] == "foo"
# Generate a layer to avoid shard split handling on ps from tripping
# up on debug assert.
timeline_id = TimelineId.generate()
env.neon_cli.create_timeline("bar", tids[0], timeline_id)
workload = Workload(env, tids[0], timeline_id, branch_name="bar")
workload.init()
workload.write_rows(256)
workload.validate()
env.storage_controller.tenant_shard_split(tids[0], shard_count=2)
shards = env.storage_controller.tenant_describe(tids[0])["shards"]
assert len(shards) == 2
for shard in shards:
attached_to = shard["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == expected_az