mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
control_plane/attachment_service: better Scheduler
This commit is contained in:
@@ -26,7 +26,7 @@ pub(super) struct Reconciler {
|
||||
pub(super) tenant_shard_id: TenantShardId,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) intent: IntentState,
|
||||
pub(crate) intent: TargetState,
|
||||
pub(crate) config: TenantConfig,
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
@@ -57,6 +57,32 @@ pub(super) struct Reconciler {
|
||||
pub(crate) persistence: Arc<Persistence>,
|
||||
}
|
||||
|
||||
/// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any
|
||||
/// reference counting for Scheduler. The IntentState is what the scheduler works with,
|
||||
/// and the TargetState is just the instruction for a particular Reconciler run.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TargetState {
|
||||
pub(crate) attached: Option<NodeId>,
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
}
|
||||
|
||||
impl TargetState {
|
||||
pub(crate) fn from_intent(intent: &IntentState) -> Self {
|
||||
Self {
|
||||
attached: *intent.get_attached(),
|
||||
secondary: intent.get_secondary().clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn all_pageservers(&self) -> Vec<NodeId> {
|
||||
let mut result = self.secondary.clone();
|
||||
if let Some(node_id) = &self.attached {
|
||||
result.push(*node_id);
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum ReconcileError {
|
||||
#[error(transparent)]
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use crate::node::Node;
|
||||
use std::collections::HashMap;
|
||||
use utils::{http::error::ApiError, id::NodeId};
|
||||
|
||||
use crate::{node::Node, tenant_state::TenantState};
|
||||
|
||||
/// Scenarios in which we cannot find a suitable location for a tenant shard
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ScheduleError {
|
||||
@@ -19,52 +17,88 @@ impl From<ScheduleError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
struct SchedulerNode {
|
||||
/// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
|
||||
shard_count: usize,
|
||||
|
||||
/// Whether this node is currently elegible to have new shards scheduled (this is derived
|
||||
/// from a node's availability state and scheduling policy).
|
||||
may_schedule: bool,
|
||||
}
|
||||
|
||||
pub(crate) struct Scheduler {
|
||||
tenant_counts: HashMap<NodeId, usize>,
|
||||
nodes: HashMap<NodeId, SchedulerNode>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn new(
|
||||
tenants: &BTreeMap<TenantShardId, TenantState>,
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
) -> Self {
|
||||
let mut tenant_counts = HashMap::new();
|
||||
for node_id in nodes.keys() {
|
||||
tenant_counts.insert(*node_id, 0);
|
||||
}
|
||||
|
||||
for tenant in tenants.values() {
|
||||
if let Some(ps) = tenant.intent.attached {
|
||||
let entry = tenant_counts.entry(ps).or_insert(0);
|
||||
*entry += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new(nodes: &HashMap<NodeId, Node>) -> Self {
|
||||
let mut scheduler_nodes = HashMap::new();
|
||||
for (node_id, node) in nodes {
|
||||
if !node.may_schedule() {
|
||||
tenant_counts.remove(node_id);
|
||||
}
|
||||
scheduler_nodes.insert(
|
||||
*node_id,
|
||||
SchedulerNode {
|
||||
shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Self { tenant_counts }
|
||||
Self {
|
||||
nodes: scheduler_nodes,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn node_ref(&mut self, node_id: NodeId) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return;
|
||||
};
|
||||
|
||||
node.shard_count += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn node_deref(&mut self, node_id: NodeId) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return;
|
||||
};
|
||||
|
||||
node.shard_count -= 1;
|
||||
}
|
||||
|
||||
pub(crate) fn node_upsert(&mut self, node_id: NodeId, may_schedule: bool) {
|
||||
use std::collections::hash_map::Entry::*;
|
||||
match self.nodes.entry(node_id) {
|
||||
Occupied(mut entry) => {
|
||||
entry.get_mut().may_schedule = may_schedule;
|
||||
}
|
||||
Vacant(entry) => {
|
||||
entry.insert(SchedulerNode {
|
||||
shard_count: 0,
|
||||
may_schedule,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
if self.tenant_counts.is_empty() {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut tenant_counts: Vec<(NodeId, usize)> = self
|
||||
.tenant_counts
|
||||
.nodes
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
if hard_exclude.contains(k) {
|
||||
if hard_exclude.contains(k) || !v.may_schedule {
|
||||
None
|
||||
} else {
|
||||
Some((*k, *v))
|
||||
Some((*k, v.shard_count))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@@ -83,7 +117,10 @@ impl Scheduler {
|
||||
|
||||
let node_id = tenant_counts.first().unwrap().0;
|
||||
tracing::info!("scheduler selected node {node_id}");
|
||||
*self.tenant_counts.get_mut(&node_id).unwrap() += 1;
|
||||
|
||||
// Note that we do not update shard count here to reflect the scheduling: that
|
||||
// is IntentState's job when the scheduled location is used.
|
||||
|
||||
Ok(node_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,6 +61,8 @@ struct ServiceState {
|
||||
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
|
||||
scheduler: Scheduler,
|
||||
|
||||
compute_hook: Arc<ComputeHook>,
|
||||
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
@@ -72,14 +74,26 @@ impl ServiceState {
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
nodes: HashMap<NodeId, Node>,
|
||||
tenants: BTreeMap<TenantShardId, TenantState>,
|
||||
scheduler: Scheduler,
|
||||
) -> Self {
|
||||
Self {
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
compute_hook: Arc::new(ComputeHook::new(config)),
|
||||
result_tx,
|
||||
}
|
||||
}
|
||||
|
||||
fn parts_mut(
|
||||
&mut self,
|
||||
) -> (
|
||||
&mut Arc<HashMap<NodeId, Node>>,
|
||||
&mut BTreeMap<TenantShardId, TenantState>,
|
||||
&mut Scheduler,
|
||||
) {
|
||||
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -180,8 +194,10 @@ impl Service {
|
||||
// Populate intent and observed states for all tenants, based on reported state on pageservers
|
||||
let shard_count = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
for (tenant_shard_id, (node_id, observed_loc)) in observed {
|
||||
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push((tenant_shard_id, node_id));
|
||||
continue;
|
||||
};
|
||||
@@ -193,10 +209,9 @@ impl Service {
|
||||
}
|
||||
|
||||
// Populate each tenant's intent state
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
|
||||
for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
|
||||
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
|
||||
tenant_state.intent_from_observed();
|
||||
if let Err(e) = tenant_state.schedule(&mut scheduler) {
|
||||
if let Err(e) = tenant_state.schedule(scheduler) {
|
||||
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
|
||||
// not enough pageservers are available. The tenant may well still be available
|
||||
// to clients.
|
||||
@@ -327,6 +342,8 @@ impl Service {
|
||||
|
||||
let mut tenants = BTreeMap::new();
|
||||
|
||||
let mut scheduler = Scheduler::new(&nodes);
|
||||
|
||||
for tsp in tenant_shard_persistence {
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
|
||||
@@ -347,7 +364,10 @@ impl Service {
|
||||
// it with what we can infer: the node for which a generation was most recently issued.
|
||||
let mut intent = IntentState::new();
|
||||
if tsp.generation_pageserver != i64::MAX {
|
||||
intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
|
||||
intent.set_attached(
|
||||
&mut scheduler,
|
||||
Some(NodeId(tsp.generation_pageserver as u64)),
|
||||
);
|
||||
}
|
||||
|
||||
let new_tenant = TenantState {
|
||||
@@ -377,6 +397,7 @@ impl Service {
|
||||
result_tx,
|
||||
nodes,
|
||||
tenants,
|
||||
scheduler,
|
||||
))),
|
||||
config,
|
||||
persistence,
|
||||
@@ -518,8 +539,9 @@ impl Service {
|
||||
};
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let tenant_state = tenants
|
||||
.get_mut(&attach_req.tenant_shard_id)
|
||||
.expect("Checked for existence above");
|
||||
|
||||
@@ -539,7 +561,7 @@ impl Service {
|
||||
generation = ?tenant_state.generation,
|
||||
"issuing",
|
||||
);
|
||||
} else if let Some(ps_id) = tenant_state.intent.attached {
|
||||
} else if let Some(ps_id) = tenant_state.intent.get_attached() {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
%ps_id,
|
||||
@@ -551,7 +573,9 @@ impl Service {
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
}
|
||||
tenant_state.intent.attached = attach_req.node_id;
|
||||
tenant_state
|
||||
.intent
|
||||
.set_attached(scheduler, attach_req.node_id);
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
@@ -576,7 +600,7 @@ impl Service {
|
||||
InspectResponse {
|
||||
attachment: tenant_state.and_then(|s| {
|
||||
s.intent
|
||||
.attached
|
||||
.get_attached()
|
||||
.map(|ps| (s.generation.into().unwrap(), ps))
|
||||
}),
|
||||
}
|
||||
@@ -728,16 +752,15 @@ impl Service {
|
||||
|
||||
let (waiters, response_shards) = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut response_shards = Vec::new();
|
||||
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
|
||||
|
||||
for tenant_shard_id in create_ids {
|
||||
tracing::info!("Creating shard {tenant_shard_id}...");
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
match locked.tenants.entry(tenant_shard_id) {
|
||||
match tenants.entry(tenant_shard_id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
tracing::info!(
|
||||
"Tenant shard {tenant_shard_id} already exists while creating"
|
||||
@@ -747,7 +770,7 @@ impl Service {
|
||||
// attached and secondary locations (independently) away frorm those
|
||||
// pageservers also holding a shard for this tenant.
|
||||
|
||||
entry.get_mut().schedule(&mut scheduler).map_err(|e| {
|
||||
entry.get_mut().schedule(scheduler).map_err(|e| {
|
||||
ApiError::Conflict(format!(
|
||||
"Failed to schedule shard {tenant_shard_id}: {e}"
|
||||
))
|
||||
@@ -758,7 +781,7 @@ impl Service {
|
||||
node_id: entry
|
||||
.get()
|
||||
.intent
|
||||
.attached
|
||||
.get_attached()
|
||||
.expect("We just set pageserver if it was None"),
|
||||
generation: entry.get().generation.into().unwrap(),
|
||||
});
|
||||
@@ -780,7 +803,7 @@ impl Service {
|
||||
}
|
||||
state.config = create_req.config.clone();
|
||||
|
||||
state.schedule(&mut scheduler).map_err(|e| {
|
||||
state.schedule(scheduler).map_err(|e| {
|
||||
ApiError::Conflict(format!(
|
||||
"Failed to schedule shard {tenant_shard_id}: {e}"
|
||||
))
|
||||
@@ -790,7 +813,7 @@ impl Service {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: state
|
||||
.intent
|
||||
.attached
|
||||
.get_attached()
|
||||
.expect("We just set pageserver if it was None"),
|
||||
generation: state.generation.into().unwrap(),
|
||||
});
|
||||
@@ -866,16 +889,11 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let pageservers = locked.nodes.clone();
|
||||
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Maybe we have existing shards
|
||||
let mut create = true;
|
||||
for (shard_id, shard) in locked
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
// Saw an existing shard: this is not a creation
|
||||
create = false;
|
||||
|
||||
@@ -899,7 +917,7 @@ impl Service {
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// TODO: persistence for changes in policy
|
||||
if pageservers.len() > 1 {
|
||||
if nodes.len() > 1 {
|
||||
shard.policy = PlacementPolicy::Double(1)
|
||||
} else {
|
||||
// Convenience for dev/test: if we just have one pageserver, import
|
||||
@@ -909,11 +927,11 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
shard.schedule(&mut scheduler)?;
|
||||
shard.schedule(scheduler)?;
|
||||
|
||||
let maybe_waiter = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
&pageservers,
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
@@ -922,10 +940,10 @@ impl Service {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
|
||||
if let Some(node_id) = shard.intent.attached {
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
result.shards.push(TenantShardLocation {
|
||||
shard_id: *shard_id,
|
||||
node_id,
|
||||
node_id: *node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1002,7 +1020,7 @@ impl Service {
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
@@ -1061,9 +1079,16 @@ impl Service {
|
||||
// Drop in-memory state
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked
|
||||
.tenants
|
||||
.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Dereference Scheduler from shards before dropping them
|
||||
for (_tenant_shard_id, shard) in
|
||||
tenants.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
shard.intent.clear(scheduler);
|
||||
}
|
||||
|
||||
tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
|
||||
tracing::info!(
|
||||
"Deleted tenant {tenant_id}, now have {} tenants",
|
||||
locked.tenants.len()
|
||||
@@ -1097,7 +1122,7 @@ impl Service {
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
@@ -1177,7 +1202,7 @@ impl Service {
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
@@ -1249,13 +1274,13 @@ impl Service {
|
||||
|
||||
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
|
||||
// point to somewhere we haven't attached yet.
|
||||
let Some(node_id) = shard.intent.attached else {
|
||||
let Some(node_id) = shard.intent.get_attached() else {
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot call timeline API on non-attached tenant".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let Some(node) = locked.nodes.get(&node_id) else {
|
||||
let Some(node) = locked.nodes.get(node_id) else {
|
||||
// This should never happen
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Shard refers to nonexistent node"
|
||||
@@ -1280,12 +1305,13 @@ impl Service {
|
||||
|
||||
for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard
|
||||
.intent
|
||||
.attached
|
||||
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot locate a tenant that is not attached"
|
||||
)))?;
|
||||
let node_id =
|
||||
shard
|
||||
.intent
|
||||
.get_attached()
|
||||
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot locate a tenant that is not attached"
|
||||
)))?;
|
||||
|
||||
let node = pageservers
|
||||
.get(&node_id)
|
||||
@@ -1349,35 +1375,34 @@ impl Service {
|
||||
) -> Result<TenantShardMigrateResponse, ApiError> {
|
||||
let waiter = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let pageservers = locked.nodes.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant shard not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
if shard.intent.attached == Some(migrate_req.node_id) {
|
||||
if shard.intent.get_attached() == &Some(migrate_req.node_id) {
|
||||
// No-op case: we will still proceed to wait for reconciliation in case it is
|
||||
// incomplete from an earlier update to the intent.
|
||||
tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
|
||||
} else {
|
||||
let old_attached = shard.intent.attached;
|
||||
let old_attached = *shard.intent.get_attached();
|
||||
|
||||
match shard.policy {
|
||||
PlacementPolicy::Single => {
|
||||
shard.intent.secondary.clear();
|
||||
shard.intent.clear_secondary(scheduler);
|
||||
}
|
||||
PlacementPolicy::Double(_n) => {
|
||||
// If our new attached node was a secondary, it no longer should be.
|
||||
shard.intent.secondary.retain(|s| s != &migrate_req.node_id);
|
||||
shard.intent.remove_secondary(scheduler, migrate_req.node_id);
|
||||
|
||||
// If we were already attached to something, demote that to a secondary
|
||||
if let Some(old_attached) = old_attached {
|
||||
shard.intent.secondary.push(old_attached);
|
||||
shard.intent.push_secondary(scheduler, old_attached);
|
||||
}
|
||||
}
|
||||
PlacementPolicy::Detached => {
|
||||
@@ -1386,7 +1411,9 @@ impl Service {
|
||||
)))
|
||||
}
|
||||
}
|
||||
shard.intent.attached = Some(migrate_req.node_id);
|
||||
shard
|
||||
.intent
|
||||
.set_attached(scheduler, Some(migrate_req.node_id));
|
||||
|
||||
tracing::info!("Migrating: new intent {:?}", shard.intent);
|
||||
shard.sequence = shard.sequence.next();
|
||||
@@ -1394,7 +1421,7 @@ impl Service {
|
||||
|
||||
shard.maybe_reconcile(
|
||||
result_tx,
|
||||
&pageservers,
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
@@ -1478,6 +1505,9 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut new_nodes = (*locked.nodes).clone();
|
||||
|
||||
locked
|
||||
.scheduler
|
||||
.node_upsert(register_req.node_id, new_node.may_schedule());
|
||||
new_nodes.insert(register_req.node_id, new_node);
|
||||
|
||||
locked.nodes = Arc::new(new_nodes);
|
||||
@@ -1494,8 +1524,9 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut new_nodes = (*locked.nodes).clone();
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
|
||||
let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
@@ -1531,11 +1562,13 @@ impl Service {
|
||||
// to wake up and start working.
|
||||
}
|
||||
|
||||
// Update the scheduler, in case the elegibility of the node for new shards has changed
|
||||
scheduler.node_upsert(node.id, node.may_schedule());
|
||||
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes);
|
||||
if offline_transition {
|
||||
for (tenant_shard_id, tenant_state) in &mut locked.tenants {
|
||||
for (tenant_shard_id, tenant_state) in tenants {
|
||||
if let Some(observed_loc) =
|
||||
tenant_state.observed.locations.get_mut(&config_req.node_id)
|
||||
{
|
||||
@@ -1546,7 +1579,7 @@ impl Service {
|
||||
|
||||
if tenant_state.intent.notify_offline(config_req.node_id) {
|
||||
tenant_state.sequence = tenant_state.sequence.next();
|
||||
match tenant_state.schedule(&mut scheduler) {
|
||||
match tenant_state.schedule(scheduler) {
|
||||
Err(e) => {
|
||||
// It is possible that some tenants will become unschedulable when too many pageservers
|
||||
// go offline: in this case there isn't much we can do other than make the issue observable.
|
||||
@@ -1605,18 +1638,14 @@ impl Service {
|
||||
let mut waiters = Vec::new();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
|
||||
let pageservers = locked.nodes.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
for (_tenant_shard_id, shard) in locked
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
shard.schedule(&mut scheduler)?;
|
||||
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
shard.schedule(scheduler)?;
|
||||
|
||||
if let Some(waiter) = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
&pageservers,
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
|
||||
@@ -17,7 +17,9 @@ use crate::{
|
||||
compute_hook::ComputeHook,
|
||||
node::Node,
|
||||
persistence::Persistence,
|
||||
reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
|
||||
reconciler::{
|
||||
attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
|
||||
},
|
||||
scheduler::{ScheduleError, Scheduler},
|
||||
service, PlacementPolicy, Sequence,
|
||||
};
|
||||
@@ -81,8 +83,97 @@ pub(crate) struct TenantState {
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub(crate) struct IntentState {
|
||||
pub(crate) attached: Option<NodeId>,
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
attached: Option<NodeId>,
|
||||
secondary: Vec<NodeId>,
|
||||
}
|
||||
|
||||
impl IntentState {
|
||||
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
|
||||
if self.attached != new_attached {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_deref(old_attached);
|
||||
}
|
||||
if let Some(new_attached) = &new_attached {
|
||||
scheduler.node_ref(*new_attached);
|
||||
}
|
||||
self.attached = new_attached;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
|
||||
debug_assert!(!self.secondary.contains(&new_secondary));
|
||||
scheduler.node_ref(new_secondary);
|
||||
self.secondary.push(new_secondary);
|
||||
}
|
||||
|
||||
/// It is legal to call this with a node that is not currently a secondary: that is a no-op
|
||||
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
|
||||
let index = self.secondary.iter().position(|n| *n == node_id);
|
||||
if let Some(index) = index {
|
||||
scheduler.node_deref(node_id);
|
||||
self.secondary.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
|
||||
for secondary in self.secondary.drain(..) {
|
||||
scheduler.node_deref(secondary);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_deref(old_attached);
|
||||
}
|
||||
|
||||
self.clear_secondary(scheduler);
|
||||
}
|
||||
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
attached: None,
|
||||
secondary: vec![],
|
||||
}
|
||||
}
|
||||
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
|
||||
let mut result = Vec::new();
|
||||
if let Some(p) = self.attached {
|
||||
result.push(p)
|
||||
}
|
||||
|
||||
result.extend(self.secondary.iter().copied());
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) fn get_attached(&self) -> &Option<NodeId> {
|
||||
&self.attached
|
||||
}
|
||||
|
||||
pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
|
||||
&self.secondary
|
||||
}
|
||||
|
||||
/// When a node goes offline, we update intents to avoid using it
|
||||
/// as their attached pageserver.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
self.attached = None;
|
||||
self.secondary.push(node_id);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for IntentState {
|
||||
fn drop(&mut self) {
|
||||
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
|
||||
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@@ -175,39 +266,6 @@ pub(crate) struct ReconcileResult {
|
||||
pub(crate) pending_compute_notification: bool,
|
||||
}
|
||||
|
||||
impl IntentState {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
attached: None,
|
||||
secondary: vec![],
|
||||
}
|
||||
}
|
||||
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
|
||||
let mut result = Vec::new();
|
||||
if let Some(p) = self.attached {
|
||||
result.push(p)
|
||||
}
|
||||
|
||||
result.extend(self.secondary.iter().copied());
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// When a node goes offline, we update intents to avoid using it
|
||||
/// as their attached pageserver.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
self.attached = None;
|
||||
self.secondary.push(node_id);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObservedState {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
@@ -297,12 +355,12 @@ impl TenantState {
|
||||
// Should have exactly one attached, and zero secondaries
|
||||
if self.intent.attached.is_none() {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
self.intent.attached = Some(node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.secondary.clear();
|
||||
self.intent.clear_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
@@ -310,14 +368,14 @@ impl TenantState {
|
||||
// Should have exactly one attached, and N secondaries
|
||||
if self.intent.attached.is_none() {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
self.intent.attached = Some(node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
while self.intent.secondary.len() < secondary_count {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
self.intent.secondary.push(node_id);
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
@@ -325,12 +383,12 @@ impl TenantState {
|
||||
Detached => {
|
||||
// Should have no attached or secondary pageservers
|
||||
if self.intent.attached.is_some() {
|
||||
self.intent.attached = None;
|
||||
self.intent.set_attached(scheduler, None);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.secondary.clear();
|
||||
self.intent.clear_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
@@ -455,7 +513,7 @@ impl TenantState {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
shard: self.shard,
|
||||
generation: self.generation,
|
||||
intent: self.intent.clone(),
|
||||
intent: TargetState::from_intent(&self.intent),
|
||||
config: self.config.clone(),
|
||||
observed: self.observed.clone(),
|
||||
pageservers: pageservers.clone(),
|
||||
|
||||
Reference in New Issue
Block a user