Compare commits

...

1 Commits

Author SHA1 Message Date
John Spray
a34e274e08 control_plane/attachment_service: improved Scheduler 2024-02-02 17:54:24 +00:00
4 changed files with 221 additions and 68 deletions

View File

@@ -18,6 +18,23 @@ use crate::compute_hook::ComputeHook;
use crate::node::Node;
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
/// This is a snapshot of [`crate::tenant_state::IntentState`], but it holds NodeId instead of
/// Rc<> references to the scheduler.
#[derive(Debug)]
pub(crate) struct TargetState {
pub(crate) attached: Option<NodeId>,
pub(crate) secondary: Vec<NodeId>,
}
impl TargetState {
pub(crate) fn new(intent: &IntentState) -> Self {
Self {
attached: intent.attached.map(|n| n.id()),
secondary: intent.secondary.iter().map(|n| n.id()).collect(),
}
}
}
/// Object with the lifetime of the background reconcile task that is created
/// for tenants which have a difference between their intent and observed states.
pub(super) struct Reconciler {
@@ -26,7 +43,7 @@ pub(super) struct Reconciler {
pub(super) tenant_shard_id: TenantShardId,
pub(crate) shard: ShardIdentity,
pub(crate) generation: Generation,
pub(crate) intent: IntentState,
pub(crate) intent: TargetState,
pub(crate) config: TenantConfig,
pub(crate) observed: ObservedState,

View File

@@ -1,8 +1,8 @@
use pageserver_api::shard::TenantShardId;
use std::collections::{BTreeMap, HashMap};
use control_plane::attachment_service::NodeSchedulingPolicy;
use std::{collections::HashMap, rc::Rc};
use utils::{http::error::ApiError, id::NodeId};
use crate::{node::Node, tenant_state::TenantState};
use crate::node::Node;
/// Scenarios in which we cannot find a suitable location for a tenant shard
#[derive(thiserror::Error, Debug)]
@@ -19,49 +19,170 @@ impl From<ScheduleError> for ApiError {
}
}
struct AttachedTarget(NodeId);
struct SecondaryTarget(NodeId);
/// A reference to a node which has been scheduled for use as an attached location
pub(crate) struct NodeAttached(Rc<AttachedTarget>);
/// A reference to a node which has been scheduled for use as a secondary location
pub(crate) struct NodeSecondary(Rc<SecondaryTarget>);
impl NodeAttached {
pub(crate) fn id(&self) -> NodeId {
(*self.0).0
}
}
impl std::fmt::Debug for NodeAttached {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id())
}
}
impl NodeSecondary {
pub(crate) fn id(&self) -> NodeId {
(*self.0).0
}
}
impl std::fmt::Debug for NodeSecondary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id())
}
}
/// What's the difference between [`Node`] and [`SchedulerNode`]?
/// - The regular Node type is a snapshot of the configuration of an external node: it holds addresses and ports, so
/// that if you have a Node, you can make pageserver API calls. Nodes get cloned around into
/// async tasks, and an Arc<> of a map of nodes represents a snapshot of which nodes exist when
/// starting such a task.
/// - Scheduler nodes live inside the Scheduler, which lives inside the big Service lock. These are not
/// a handle for external calls, they're just for keeping track of internal per-node state, like which
/// shards are intending to attach to which node.
pub(crate) struct SchedulerNode {
node_id: NodeId,
policy: NodeSchedulingPolicy,
attached: Rc<AttachedTarget>,
secondary: Rc<SecondaryTarget>,
}
impl Drop for SchedulerNode {
fn drop(&mut self) {
// Safety check that we will never unhook a node from the scheduler while
// some tenants are still referencing it. This is important because tenants
// assume that the nodes they reference are always still present.
assert!(Rc::<AttachedTarget>::strong_count(&self.attached) == 1);
assert!(Rc::<SecondaryTarget>::strong_count(&self.secondary) == 1);
}
}
/// Parameter for scheduling that expresses which nodes a shard _doesn't_ want to go on. Typically
/// used to avoid scheduling a secondary location on the same node as an attached location, or vice
/// versa.
pub(crate) struct ShardConstraints {
anti_affinity: Vec<NodeId>,
}
impl ShardConstraints {
pub(crate) fn new() -> Self {
Self {
anti_affinity: Vec::new(),
}
}
pub(crate) fn push_anti_affinity(&mut self, nodes: Vec<NodeId>) {
self.anti_affinity.extend(nodes.into_iter())
}
pub(crate) fn push_anti_attached(&mut self, n: &NodeAttached) {
self.anti_affinity.push(n.id())
}
pub(crate) fn push_anti_secondary(&mut self, n: &NodeSecondary) {
self.anti_affinity.push(n.id())
}
}
pub(crate) struct Scheduler {
tenant_counts: HashMap<NodeId, usize>,
nodes: HashMap<NodeId, SchedulerNode>,
}
impl Scheduler {
pub(crate) fn new(
tenants: &BTreeMap<TenantShardId, TenantState>,
nodes: &HashMap<NodeId, Node>,
) -> Self {
let mut tenant_counts = HashMap::new();
for node_id in nodes.keys() {
tenant_counts.insert(*node_id, 0);
pub(crate) fn new(nodes: &HashMap<NodeId, Node>) -> Self {
Self {
nodes: nodes
.iter()
.map(|(node_id, node)| {
(
*node_id,
SchedulerNode {
node_id: *node_id,
attached: Rc::new(AttachedTarget(*node_id)),
secondary: Rc::new(SecondaryTarget(*node_id)),
policy: node.scheduling,
},
)
})
.collect(),
}
for tenant in tenants.values() {
if let Some(ps) = tenant.intent.attached {
let entry = tenant_counts.entry(ps).or_insert(0);
*entry += 1;
}
}
for (node_id, node) in nodes {
if !node.may_schedule() {
tenant_counts.remove(node_id);
}
}
Self { tenant_counts }
}
pub(crate) fn schedule_shard(
pub(crate) fn node_downgrade(&self, node_attached: NodeAttached) -> NodeSecondary {
// unwrap safety: we never drop a SchedulerNode while it is referenced by tenants: see check
// in [`SchedulerNode::drop`].
let node = self.nodes.get(&node_attached.id()).unwrap();
NodeSecondary(node.secondary.clone())
}
/// For use during startup, when we know a node ID and want to increment the scheduler refcounts while
/// we populate an IntentState
pub(crate) fn node_reference_attached(&self, node_id: NodeId) -> NodeAttached {
let node = self.nodes.get(&node_id).unwrap();
NodeAttached(node.attached.clone())
}
pub(crate) fn node_reference_secondary(&self, node_id: NodeId) -> NodeSecondary {
let node = self.nodes.get(&node_id).unwrap();
NodeSecondary(node.secondary.clone())
}
pub(crate) fn schedule_shard_attached(
&mut self,
hard_exclude: &[NodeId],
) -> Result<NodeId, ScheduleError> {
if self.tenant_counts.is_empty() {
constraints: &ShardConstraints,
) -> Result<NodeAttached, ScheduleError> {
let node = self.schedule_shard(constraints)?;
Ok(NodeAttached(node.attached.clone()))
}
pub(crate) fn schedule_shard_secondary(
&mut self,
constraints: &ShardConstraints,
) -> Result<NodeSecondary, ScheduleError> {
let node = self.schedule_shard(constraints)?;
Ok(NodeSecondary(node.secondary.clone()))
}
fn schedule_shard(
&mut self,
constraints: &ShardConstraints,
) -> Result<&SchedulerNode, ScheduleError> {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
}
let mut tenant_counts: Vec<(NodeId, usize)> = self
.tenant_counts
let mut tenant_counts = HashMap::new();
for node in self.nodes.values() {
tenant_counts.insert(
node.node_id,
Rc::<AttachedTarget>::strong_count(&node.attached)
+ Rc::<SecondaryTarget>::strong_count(&node.secondary),
);
}
let mut tenant_counts: Vec<(NodeId, usize)> = tenant_counts
.iter()
.filter_map(|(k, v)| {
if hard_exclude.contains(k) {
if constraints.anti_affinity.contains(k) {
None
} else {
Some((*k, *v))
@@ -84,6 +205,6 @@ impl Scheduler {
let node_id = tenant_counts.first().unwrap().0;
tracing::info!("scheduler selected node {node_id}");
*self.tenant_counts.get_mut(&node_id).unwrap() += 1;
Ok(node_id)
Ok(self.nodes.get(&node_id).unwrap())
}
}

View File

@@ -181,7 +181,7 @@ impl Service {
// Populate each tenant's intent state
let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
tenant_state.intent_from_observed();
tenant_state.intent_from_observed(&scheduler);
if let Err(e) = tenant_state.schedule(&mut scheduler) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
@@ -259,6 +259,8 @@ impl Service {
let mut tenants = BTreeMap::new();
let scheduler = Scheduler::new(&nodes);
for tsp in tenant_shard_persistence {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
@@ -279,7 +281,8 @@ impl Service {
// it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new();
if tsp.generation_pageserver != i64::MAX {
intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
intent.attached =
Some(scheduler.node_reference_attached(tsp.generation_pageserver as u64))
}
let new_tenant = TenantState {
@@ -1468,7 +1471,10 @@ impl Service {
observed_loc.conf = None;
}
if tenant_state.intent.notify_offline(config_req.node_id) {
if tenant_state
.intent
.notify_offline(&scheduler, config_req.node_id)
{
tenant_state.sequence = tenant_state.sequence.next();
match tenant_state.schedule(&mut scheduler) {
Err(e) => {

View File

@@ -17,8 +17,10 @@ use crate::{
compute_hook::ComputeHook,
node::Node,
persistence::Persistence,
reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
scheduler::{ScheduleError, Scheduler},
reconciler::{
self, attached_location_conf, secondary_location_conf, ReconcileError, Reconciler,
},
scheduler::{NodeAttached, NodeSecondary, ScheduleError, Scheduler, ShardConstraints},
service, PlacementPolicy, Sequence,
};
@@ -73,10 +75,14 @@ pub(crate) struct TenantState {
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
}
#[derive(Default, Clone, Debug)]
/// This is like a NodeId, but carries an Rc reference to a SchedulingNode, so that
/// its existence implicitly keeps the
struct NodeSchedulingRef {}
#[derive(Default, Debug)]
pub(crate) struct IntentState {
pub(crate) attached: Option<NodeId>,
pub(crate) secondary: Vec<NodeId>,
pub(crate) attached: Option<NodeAttached>,
pub(crate) secondary: Vec<NodeSecondary>,
}
#[derive(Default, Clone)]
@@ -176,10 +182,10 @@ impl IntentState {
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
let mut result = Vec::new();
if let Some(p) = self.attached {
result.push(p)
result.push(p.id())
}
result.extend(self.secondary.iter().copied());
result.extend(self.secondary.iter().map(|s| s.id()));
result
}
@@ -188,13 +194,13 @@ impl IntentState {
/// as their attached pageserver.
///
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
self.attached = None;
self.secondary.push(node_id);
true
} else {
false
pub(crate) fn notify_offline(&mut self, scheduler: &Scheduler, node_id: NodeId) -> bool {
match self.attached.take() {
Some(node_attached) if node_attached.id() == node_id => {
self.secondary.push(scheduler.node_downgrade(node_attached));
true
}
_ => false,
}
}
}
@@ -233,7 +239,7 @@ impl TenantState {
/// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
/// to get an intent state that complies with placement policy. The overall goal is to do scheduling
/// in a way that makes use of any configured locations that already exist in the outside world.
pub(crate) fn intent_from_observed(&mut self) {
pub(crate) fn intent_from_observed(&mut self, scheduler: &Scheduler) {
// Choose an attached location by filtering observed locations, and then sorting to get the highest
// generation
let mut attached_locs = self
@@ -258,15 +264,17 @@ impl TenantState {
attached_locs.sort_by_key(|i| i.1);
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
self.intent.attached = Some(*node_id);
self.intent.attached = Some(scheduler.node_reference_attached(*node_id));
}
// All remaining observed locations generate secondary intents. This includes None
// observations, as these may well have some local content on disk that is usable (this
// is an edge case that might occur if we restarted during a migration or other change)
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.secondary.push(*node_id);
if Some(*node_id) != self.intent.attached.map(|a| a.id()) {
self.intent
.secondary
.push(scheduler.node_reference_secondary(*node_id));
}
});
}
@@ -278,7 +286,8 @@ impl TenantState {
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut used_pageservers = self.intent.all_pageservers();
let mut constraints = ShardConstraints::new();
constraints.push_anti_affinity(self.intent.all_pageservers());
let mut modified = false;
use PlacementPolicy::*;
@@ -286,9 +295,9 @@ impl TenantState {
Single => {
// Should have exactly one attached, and zero secondaries
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
let node_id = scheduler.schedule_shard_attached(&constraints)?;
constraints.push_anti_attached(&node_id);
self.intent.attached = Some(node_id);
used_pageservers.push(node_id);
modified = true;
}
if !self.intent.secondary.is_empty() {
@@ -299,16 +308,16 @@ impl TenantState {
Double(secondary_count) => {
// Should have exactly one attached, and N secondaries
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
let node_id = scheduler.schedule_shard_attached(&constraints)?;
constraints.push_anti_attached(&node_id);
self.intent.attached = Some(node_id);
used_pageservers.push(node_id);
modified = true;
}
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
let node_id = scheduler.schedule_shard_secondary(&constraints)?;
constraints.push_anti_secondary(&node_id);
self.intent.secondary.push(node_id);
used_pageservers.push(node_id);
modified = true;
}
}
@@ -334,9 +343,9 @@ impl TenantState {
}
fn dirty(&self) -> bool {
if let Some(node_id) = self.intent.attached {
if let Some(node_attached) = self.intent.attached {
let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
match self.observed.locations.get(&node_id) {
match self.observed.locations.get(&node_attached.id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
return true;
@@ -344,9 +353,9 @@ impl TenantState {
}
}
for node_id in &self.intent.secondary {
for node_secondary in &self.intent.secondary {
let wanted_conf = secondary_location_conf(&self.shard, &self.config);
match self.observed.locations.get(node_id) {
match self.observed.locations.get(&node_secondary.id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
return true;
@@ -407,7 +416,7 @@ impl TenantState {
tenant_shard_id: self.tenant_shard_id,
shard: self.shard,
generation: self.generation,
intent: self.intent.clone(),
intent: reconciler::TargetState::new(&self.intent),
config: self.config.clone(),
observed: self.observed.clone(),
pageservers: pageservers.clone(),