mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 06:00:38 +00:00
Compare commits
1 Commits
add_audit_
...
jcsp/impro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a34e274e08 |
@@ -18,6 +18,23 @@ use crate::compute_hook::ComputeHook;
|
||||
use crate::node::Node;
|
||||
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
|
||||
|
||||
/// This is a snapshot of [`crate::tenant_state::IntentState`], but it holds NodeId instead of
|
||||
/// Rc<> references to the scheduler.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TargetState {
|
||||
pub(crate) attached: Option<NodeId>,
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
}
|
||||
|
||||
impl TargetState {
|
||||
pub(crate) fn new(intent: &IntentState) -> Self {
|
||||
Self {
|
||||
attached: intent.attached.map(|n| n.id()),
|
||||
secondary: intent.secondary.iter().map(|n| n.id()).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
pub(super) struct Reconciler {
|
||||
@@ -26,7 +43,7 @@ pub(super) struct Reconciler {
|
||||
pub(super) tenant_shard_id: TenantShardId,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) intent: IntentState,
|
||||
pub(crate) intent: TargetState,
|
||||
pub(crate) config: TenantConfig,
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use control_plane::attachment_service::NodeSchedulingPolicy;
|
||||
use std::{collections::HashMap, rc::Rc};
|
||||
use utils::{http::error::ApiError, id::NodeId};
|
||||
|
||||
use crate::{node::Node, tenant_state::TenantState};
|
||||
use crate::node::Node;
|
||||
|
||||
/// Scenarios in which we cannot find a suitable location for a tenant shard
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@@ -19,49 +19,170 @@ impl From<ScheduleError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
struct AttachedTarget(NodeId);
|
||||
struct SecondaryTarget(NodeId);
|
||||
|
||||
/// A reference to a node which has been scheduled for use as an attached location
|
||||
pub(crate) struct NodeAttached(Rc<AttachedTarget>);
|
||||
/// A reference to a node which has been scheduled for use as a secondary location
|
||||
pub(crate) struct NodeSecondary(Rc<SecondaryTarget>);
|
||||
|
||||
impl NodeAttached {
|
||||
pub(crate) fn id(&self) -> NodeId {
|
||||
(*self.0).0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NodeAttached {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.id())
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeSecondary {
|
||||
pub(crate) fn id(&self) -> NodeId {
|
||||
(*self.0).0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NodeSecondary {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.id())
|
||||
}
|
||||
}
|
||||
|
||||
/// What's the difference between [`Node`] and [`SchedulerNode`]?
|
||||
/// - The regular Node type is a snapshot of the configuration of an external node: it holds addresses and ports, so
|
||||
/// that if you have a Node, you can make pageserver API calls. Nodes get cloned around into
|
||||
/// async tasks, and an Arc<> of a map of nodes represents a snapshot of which nodes exist when
|
||||
/// starting such a task.
|
||||
/// - Scheduler nodes live inside the Scheduler, which lives inside the big Service lock. These are not
|
||||
/// a handle for external calls, they're just for keeping track of internal per-node state, like which
|
||||
/// shards are intending to attach to which node.
|
||||
pub(crate) struct SchedulerNode {
|
||||
node_id: NodeId,
|
||||
|
||||
policy: NodeSchedulingPolicy,
|
||||
|
||||
attached: Rc<AttachedTarget>,
|
||||
secondary: Rc<SecondaryTarget>,
|
||||
}
|
||||
|
||||
impl Drop for SchedulerNode {
|
||||
fn drop(&mut self) {
|
||||
// Safety check that we will never unhook a node from the scheduler while
|
||||
// some tenants are still referencing it. This is important because tenants
|
||||
// assume that the nodes they reference are always still present.
|
||||
assert!(Rc::<AttachedTarget>::strong_count(&self.attached) == 1);
|
||||
assert!(Rc::<SecondaryTarget>::strong_count(&self.secondary) == 1);
|
||||
}
|
||||
}
|
||||
|
||||
/// Parameter for scheduling that expresses which nodes a shard _doesn't_ want to go on. Typically
|
||||
/// used to avoid scheduling a secondary location on the same node as an attached location, or vice
|
||||
/// versa.
|
||||
pub(crate) struct ShardConstraints {
|
||||
anti_affinity: Vec<NodeId>,
|
||||
}
|
||||
|
||||
impl ShardConstraints {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
anti_affinity: Vec::new(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn push_anti_affinity(&mut self, nodes: Vec<NodeId>) {
|
||||
self.anti_affinity.extend(nodes.into_iter())
|
||||
}
|
||||
|
||||
pub(crate) fn push_anti_attached(&mut self, n: &NodeAttached) {
|
||||
self.anti_affinity.push(n.id())
|
||||
}
|
||||
pub(crate) fn push_anti_secondary(&mut self, n: &NodeSecondary) {
|
||||
self.anti_affinity.push(n.id())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Scheduler {
|
||||
tenant_counts: HashMap<NodeId, usize>,
|
||||
nodes: HashMap<NodeId, SchedulerNode>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn new(
|
||||
tenants: &BTreeMap<TenantShardId, TenantState>,
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
) -> Self {
|
||||
let mut tenant_counts = HashMap::new();
|
||||
for node_id in nodes.keys() {
|
||||
tenant_counts.insert(*node_id, 0);
|
||||
pub(crate) fn new(nodes: &HashMap<NodeId, Node>) -> Self {
|
||||
Self {
|
||||
nodes: nodes
|
||||
.iter()
|
||||
.map(|(node_id, node)| {
|
||||
(
|
||||
*node_id,
|
||||
SchedulerNode {
|
||||
node_id: *node_id,
|
||||
attached: Rc::new(AttachedTarget(*node_id)),
|
||||
secondary: Rc::new(SecondaryTarget(*node_id)),
|
||||
policy: node.scheduling,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
|
||||
for tenant in tenants.values() {
|
||||
if let Some(ps) = tenant.intent.attached {
|
||||
let entry = tenant_counts.entry(ps).or_insert(0);
|
||||
*entry += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for (node_id, node) in nodes {
|
||||
if !node.may_schedule() {
|
||||
tenant_counts.remove(node_id);
|
||||
}
|
||||
}
|
||||
|
||||
Self { tenant_counts }
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard(
|
||||
pub(crate) fn node_downgrade(&self, node_attached: NodeAttached) -> NodeSecondary {
|
||||
// unwrap safety: we never drop a SchedulerNode while it is referenced by tenants: see check
|
||||
// in [`SchedulerNode::drop`].
|
||||
let node = self.nodes.get(&node_attached.id()).unwrap();
|
||||
NodeSecondary(node.secondary.clone())
|
||||
}
|
||||
|
||||
/// For use during startup, when we know a node ID and want to increment the scheduler refcounts while
|
||||
/// we populate an IntentState
|
||||
pub(crate) fn node_reference_attached(&self, node_id: NodeId) -> NodeAttached {
|
||||
let node = self.nodes.get(&node_id).unwrap();
|
||||
NodeAttached(node.attached.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn node_reference_secondary(&self, node_id: NodeId) -> NodeSecondary {
|
||||
let node = self.nodes.get(&node_id).unwrap();
|
||||
NodeSecondary(node.secondary.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard_attached(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
if self.tenant_counts.is_empty() {
|
||||
constraints: &ShardConstraints,
|
||||
) -> Result<NodeAttached, ScheduleError> {
|
||||
let node = self.schedule_shard(constraints)?;
|
||||
Ok(NodeAttached(node.attached.clone()))
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard_secondary(
|
||||
&mut self,
|
||||
constraints: &ShardConstraints,
|
||||
) -> Result<NodeSecondary, ScheduleError> {
|
||||
let node = self.schedule_shard(constraints)?;
|
||||
Ok(NodeSecondary(node.secondary.clone()))
|
||||
}
|
||||
|
||||
fn schedule_shard(
|
||||
&mut self,
|
||||
constraints: &ShardConstraints,
|
||||
) -> Result<&SchedulerNode, ScheduleError> {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut tenant_counts: Vec<(NodeId, usize)> = self
|
||||
.tenant_counts
|
||||
let mut tenant_counts = HashMap::new();
|
||||
for node in self.nodes.values() {
|
||||
tenant_counts.insert(
|
||||
node.node_id,
|
||||
Rc::<AttachedTarget>::strong_count(&node.attached)
|
||||
+ Rc::<SecondaryTarget>::strong_count(&node.secondary),
|
||||
);
|
||||
}
|
||||
|
||||
let mut tenant_counts: Vec<(NodeId, usize)> = tenant_counts
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
if hard_exclude.contains(k) {
|
||||
if constraints.anti_affinity.contains(k) {
|
||||
None
|
||||
} else {
|
||||
Some((*k, *v))
|
||||
@@ -84,6 +205,6 @@ impl Scheduler {
|
||||
let node_id = tenant_counts.first().unwrap().0;
|
||||
tracing::info!("scheduler selected node {node_id}");
|
||||
*self.tenant_counts.get_mut(&node_id).unwrap() += 1;
|
||||
Ok(node_id)
|
||||
Ok(self.nodes.get(&node_id).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ impl Service {
|
||||
// Populate each tenant's intent state
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
|
||||
for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
|
||||
tenant_state.intent_from_observed();
|
||||
tenant_state.intent_from_observed(&scheduler);
|
||||
if let Err(e) = tenant_state.schedule(&mut scheduler) {
|
||||
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
|
||||
// not enough pageservers are available. The tenant may well still be available
|
||||
@@ -259,6 +259,8 @@ impl Service {
|
||||
|
||||
let mut tenants = BTreeMap::new();
|
||||
|
||||
let scheduler = Scheduler::new(&nodes);
|
||||
|
||||
for tsp in tenant_shard_persistence {
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
|
||||
@@ -279,7 +281,8 @@ impl Service {
|
||||
// it with what we can infer: the node for which a generation was most recently issued.
|
||||
let mut intent = IntentState::new();
|
||||
if tsp.generation_pageserver != i64::MAX {
|
||||
intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
|
||||
intent.attached =
|
||||
Some(scheduler.node_reference_attached(tsp.generation_pageserver as u64))
|
||||
}
|
||||
|
||||
let new_tenant = TenantState {
|
||||
@@ -1468,7 +1471,10 @@ impl Service {
|
||||
observed_loc.conf = None;
|
||||
}
|
||||
|
||||
if tenant_state.intent.notify_offline(config_req.node_id) {
|
||||
if tenant_state
|
||||
.intent
|
||||
.notify_offline(&scheduler, config_req.node_id)
|
||||
{
|
||||
tenant_state.sequence = tenant_state.sequence.next();
|
||||
match tenant_state.schedule(&mut scheduler) {
|
||||
Err(e) => {
|
||||
|
||||
@@ -17,8 +17,10 @@ use crate::{
|
||||
compute_hook::ComputeHook,
|
||||
node::Node,
|
||||
persistence::Persistence,
|
||||
reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
|
||||
scheduler::{ScheduleError, Scheduler},
|
||||
reconciler::{
|
||||
self, attached_location_conf, secondary_location_conf, ReconcileError, Reconciler,
|
||||
},
|
||||
scheduler::{NodeAttached, NodeSecondary, ScheduleError, Scheduler, ShardConstraints},
|
||||
service, PlacementPolicy, Sequence,
|
||||
};
|
||||
|
||||
@@ -73,10 +75,14 @@ pub(crate) struct TenantState {
|
||||
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
/// This is like a NodeId, but carries an Rc reference to a SchedulingNode, so that
|
||||
/// its existence implicitly keeps the
|
||||
struct NodeSchedulingRef {}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub(crate) struct IntentState {
|
||||
pub(crate) attached: Option<NodeId>,
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
pub(crate) attached: Option<NodeAttached>,
|
||||
pub(crate) secondary: Vec<NodeSecondary>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@@ -176,10 +182,10 @@ impl IntentState {
|
||||
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
|
||||
let mut result = Vec::new();
|
||||
if let Some(p) = self.attached {
|
||||
result.push(p)
|
||||
result.push(p.id())
|
||||
}
|
||||
|
||||
result.extend(self.secondary.iter().copied());
|
||||
result.extend(self.secondary.iter().map(|s| s.id()));
|
||||
|
||||
result
|
||||
}
|
||||
@@ -188,13 +194,13 @@ impl IntentState {
|
||||
/// as their attached pageserver.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
self.attached = None;
|
||||
self.secondary.push(node_id);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
pub(crate) fn notify_offline(&mut self, scheduler: &Scheduler, node_id: NodeId) -> bool {
|
||||
match self.attached.take() {
|
||||
Some(node_attached) if node_attached.id() == node_id => {
|
||||
self.secondary.push(scheduler.node_downgrade(node_attached));
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,7 +239,7 @@ impl TenantState {
|
||||
/// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
|
||||
/// to get an intent state that complies with placement policy. The overall goal is to do scheduling
|
||||
/// in a way that makes use of any configured locations that already exist in the outside world.
|
||||
pub(crate) fn intent_from_observed(&mut self) {
|
||||
pub(crate) fn intent_from_observed(&mut self, scheduler: &Scheduler) {
|
||||
// Choose an attached location by filtering observed locations, and then sorting to get the highest
|
||||
// generation
|
||||
let mut attached_locs = self
|
||||
@@ -258,15 +264,17 @@ impl TenantState {
|
||||
|
||||
attached_locs.sort_by_key(|i| i.1);
|
||||
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
|
||||
self.intent.attached = Some(*node_id);
|
||||
self.intent.attached = Some(scheduler.node_reference_attached(*node_id));
|
||||
}
|
||||
|
||||
// All remaining observed locations generate secondary intents. This includes None
|
||||
// observations, as these may well have some local content on disk that is usable (this
|
||||
// is an edge case that might occur if we restarted during a migration or other change)
|
||||
self.observed.locations.keys().for_each(|node_id| {
|
||||
if Some(*node_id) != self.intent.attached {
|
||||
self.intent.secondary.push(*node_id);
|
||||
if Some(*node_id) != self.intent.attached.map(|a| a.id()) {
|
||||
self.intent
|
||||
.secondary
|
||||
.push(scheduler.node_reference_secondary(*node_id));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -278,7 +286,8 @@ impl TenantState {
|
||||
|
||||
// Build the set of pageservers already in use by this tenant, to avoid scheduling
|
||||
// more work on the same pageservers we're already using.
|
||||
let mut used_pageservers = self.intent.all_pageservers();
|
||||
let mut constraints = ShardConstraints::new();
|
||||
constraints.push_anti_affinity(self.intent.all_pageservers());
|
||||
let mut modified = false;
|
||||
|
||||
use PlacementPolicy::*;
|
||||
@@ -286,9 +295,9 @@ impl TenantState {
|
||||
Single => {
|
||||
// Should have exactly one attached, and zero secondaries
|
||||
if self.intent.attached.is_none() {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
let node_id = scheduler.schedule_shard_attached(&constraints)?;
|
||||
constraints.push_anti_attached(&node_id);
|
||||
self.intent.attached = Some(node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
if !self.intent.secondary.is_empty() {
|
||||
@@ -299,16 +308,16 @@ impl TenantState {
|
||||
Double(secondary_count) => {
|
||||
// Should have exactly one attached, and N secondaries
|
||||
if self.intent.attached.is_none() {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
let node_id = scheduler.schedule_shard_attached(&constraints)?;
|
||||
constraints.push_anti_attached(&node_id);
|
||||
self.intent.attached = Some(node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
while self.intent.secondary.len() < secondary_count {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
let node_id = scheduler.schedule_shard_secondary(&constraints)?;
|
||||
constraints.push_anti_secondary(&node_id);
|
||||
self.intent.secondary.push(node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
@@ -334,9 +343,9 @@ impl TenantState {
|
||||
}
|
||||
|
||||
fn dirty(&self) -> bool {
|
||||
if let Some(node_id) = self.intent.attached {
|
||||
if let Some(node_attached) = self.intent.attached {
|
||||
let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
match self.observed.locations.get(&node_attached.id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
return true;
|
||||
@@ -344,9 +353,9 @@ impl TenantState {
|
||||
}
|
||||
}
|
||||
|
||||
for node_id in &self.intent.secondary {
|
||||
for node_secondary in &self.intent.secondary {
|
||||
let wanted_conf = secondary_location_conf(&self.shard, &self.config);
|
||||
match self.observed.locations.get(node_id) {
|
||||
match self.observed.locations.get(&node_secondary.id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
return true;
|
||||
@@ -407,7 +416,7 @@ impl TenantState {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
shard: self.shard,
|
||||
generation: self.generation,
|
||||
intent: self.intent.clone(),
|
||||
intent: reconciler::TargetState::new(&self.intent),
|
||||
config: self.config.clone(),
|
||||
observed: self.observed.clone(),
|
||||
pageservers: pageservers.clone(),
|
||||
|
||||
Reference in New Issue
Block a user